simplify pendingInvocations
This commit is contained in:
parent
5be89dea71
commit
7304ae981b
1 changed files with 35 additions and 42 deletions
|
|
@ -8,7 +8,6 @@ import akka.actor.typed.Behavior.StoppedBehavior
|
|||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation }
|
||||
import akka.persistence.JournalProtocol._
|
||||
import akka.persistence._
|
||||
import akka.persistence.journal.Tagged
|
||||
|
|
@ -102,20 +101,15 @@ private[akka] object EventsourcedRunning {
|
|||
|
||||
val newState2 = internalPersist(newState, eventToPersist)
|
||||
|
||||
val handler: Any ⇒ Unit = { _ ⇒
|
||||
if (setup.snapshotWhen(newState2.state, event, newState2.seqNr))
|
||||
internalSaveSnapshot(newState2)
|
||||
}
|
||||
val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil
|
||||
val shouldSnapshotAfterPersist = setup.snapshotWhen(newState2.state, event, newState2.seqNr)
|
||||
|
||||
persistingEvents(newState2, pendingInvocations, sideEffects)
|
||||
persistingEvents(newState2, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects)
|
||||
|
||||
case PersistAll(events) ⇒
|
||||
if (events.nonEmpty) {
|
||||
// apply the event before persist so that validation exception is handled before persisting
|
||||
// the invalid event, in case such validation is implemented in the event handler.
|
||||
// also, ensure that there is an event handler for each single event
|
||||
var count = events.size
|
||||
var seqNr = state.seqNr
|
||||
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
|
||||
case ((currentState, snapshot), event) ⇒
|
||||
|
|
@ -128,19 +122,7 @@ private[akka] object EventsourcedRunning {
|
|||
|
||||
val newState2 = internalPersistAll(eventsToPersist, newState)
|
||||
|
||||
val handler: Any ⇒ Unit = { _ ⇒
|
||||
count -= 1
|
||||
if (count == 0) {
|
||||
if (shouldSnapshotAfterPersist)
|
||||
internalSaveSnapshot(newState2)
|
||||
}
|
||||
}
|
||||
|
||||
val pendingInvocations = events.map { event ⇒
|
||||
StashingHandlerInvocation(event, handler)
|
||||
}
|
||||
|
||||
persistingEvents(newState2, pendingInvocations, sideEffects)
|
||||
persistingEvents(newState2, events.size, shouldSnapshotAfterPersist, sideEffects)
|
||||
|
||||
} else {
|
||||
// run side-effects even when no events are emitted
|
||||
|
|
@ -177,21 +159,25 @@ private[akka] object EventsourcedRunning {
|
|||
// ===============================================
|
||||
|
||||
def persistingEvents(
|
||||
state: EventsourcedState[S],
|
||||
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
|
||||
sideEffects: immutable.Seq[ChainableEffect[_, S]]
|
||||
state: EventsourcedState[S],
|
||||
numberOfEvents: Int,
|
||||
shouldSnapshotAfterPersist: Boolean,
|
||||
sideEffects: immutable.Seq[ChainableEffect[_, S]]
|
||||
): Behavior[InternalProtocol] = {
|
||||
withMdc(setup, MDC.PersistingEvents) {
|
||||
new PersistingEvents(state, pendingInvocations, sideEffects)
|
||||
new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects)
|
||||
}
|
||||
}
|
||||
|
||||
class PersistingEvents(
|
||||
var state: EventsourcedState[S],
|
||||
var pendingInvocations: immutable.Seq[PendingHandlerInvocation],
|
||||
var sideEffects: immutable.Seq[ChainableEffect[_, S]])
|
||||
var state: EventsourcedState[S],
|
||||
numberOfEvents: Int,
|
||||
shouldSnapshotAfterPersist: Boolean,
|
||||
var sideEffects: immutable.Seq[ChainableEffect[_, S]])
|
||||
extends MutableBehavior[EventsourcedBehavior.InternalProtocol] {
|
||||
|
||||
private var eventCounter = 0
|
||||
|
||||
override def onMessage(msg: EventsourcedBehavior.InternalProtocol): Behavior[EventsourcedBehavior.InternalProtocol] = {
|
||||
msg match {
|
||||
case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r)
|
||||
|
|
@ -211,30 +197,37 @@ private[akka] object EventsourcedRunning {
|
|||
final def onJournalResponse(
|
||||
response: Response): Behavior[InternalProtocol] = {
|
||||
setup.log.debug("Received Journal response: {}", response)
|
||||
|
||||
def onWriteResponse(p: PersistentRepr): Behavior[InternalProtocol] = {
|
||||
state = state.updateLastSequenceNr(p)
|
||||
eventCounter += 1
|
||||
|
||||
// only once all things are applied we can revert back
|
||||
if (eventCounter < numberOfEvents) this
|
||||
else {
|
||||
if (shouldSnapshotAfterPersist)
|
||||
internalSaveSnapshot(state)
|
||||
|
||||
tryUnstash(applySideEffects(sideEffects, state))
|
||||
}
|
||||
}
|
||||
|
||||
response match {
|
||||
case WriteMessageSuccess(p, id) ⇒
|
||||
if (id == setup.writerIdentity.instanceId) {
|
||||
state = state.updateLastSequenceNr(p)
|
||||
// FIXME is the order of pendingInvocations not reversed?
|
||||
pendingInvocations.head.handler(p.payload)
|
||||
pendingInvocations = pendingInvocations.tail
|
||||
|
||||
// only once all things are applied we can revert back
|
||||
if (pendingInvocations.nonEmpty) this
|
||||
else tryUnstash(applySideEffects(sideEffects, state))
|
||||
} else this
|
||||
if (id == setup.writerIdentity.instanceId)
|
||||
onWriteResponse(p)
|
||||
else this
|
||||
|
||||
case WriteMessageRejected(p, cause, id) ⇒
|
||||
if (id == setup.writerIdentity.instanceId) {
|
||||
state = state.updateLastSequenceNr(p)
|
||||
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design)
|
||||
tryUnstash(applySideEffects(sideEffects, state))
|
||||
onWriteResponse(p)
|
||||
} else this
|
||||
|
||||
case WriteMessageFailure(p, cause, id) ⇒
|
||||
if (id == setup.writerIdentity.instanceId) {
|
||||
if (id == setup.writerIdentity.instanceId)
|
||||
onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
|
||||
} else this
|
||||
else this
|
||||
|
||||
case WriteMessagesSuccessful ⇒
|
||||
// ignore
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue