diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 322218d28b..674d60475f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -8,7 +8,6 @@ import akka.actor.typed.Behavior.StoppedBehavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors.MutableBehavior import akka.annotation.InternalApi -import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation } import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged @@ -102,20 +101,15 @@ private[akka] object EventsourcedRunning { val newState2 = internalPersist(newState, eventToPersist) - val handler: Any ⇒ Unit = { _ ⇒ - if (setup.snapshotWhen(newState2.state, event, newState2.seqNr)) - internalSaveSnapshot(newState2) - } - val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil + val shouldSnapshotAfterPersist = setup.snapshotWhen(newState2.state, event, newState2.seqNr) - persistingEvents(newState2, pendingInvocations, sideEffects) + persistingEvents(newState2, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects) case PersistAll(events) ⇒ if (events.nonEmpty) { // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event - var count = events.size var seqNr = state.seqNr val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { case ((currentState, snapshot), event) ⇒ @@ -128,19 +122,7 @@ private[akka] object EventsourcedRunning { val newState2 = internalPersistAll(eventsToPersist, newState) - val handler: Any ⇒ Unit = { _ ⇒ - count -= 1 - if (count == 0) { - if (shouldSnapshotAfterPersist) - internalSaveSnapshot(newState2) - } - } - - val pendingInvocations = events.map { event ⇒ - StashingHandlerInvocation(event, handler) - } - - persistingEvents(newState2, pendingInvocations, sideEffects) + persistingEvents(newState2, events.size, shouldSnapshotAfterPersist, sideEffects) } else { // run side-effects even when no events are emitted @@ -177,21 +159,25 @@ private[akka] object EventsourcedRunning { // =============================================== def persistingEvents( - state: EventsourcedState[S], - pendingInvocations: immutable.Seq[PendingHandlerInvocation], - sideEffects: immutable.Seq[ChainableEffect[_, S]] + state: EventsourcedState[S], + numberOfEvents: Int, + shouldSnapshotAfterPersist: Boolean, + sideEffects: immutable.Seq[ChainableEffect[_, S]] ): Behavior[InternalProtocol] = { withMdc(setup, MDC.PersistingEvents) { - new PersistingEvents(state, pendingInvocations, sideEffects) + new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects) } } class PersistingEvents( - var state: EventsourcedState[S], - var pendingInvocations: immutable.Seq[PendingHandlerInvocation], - var sideEffects: immutable.Seq[ChainableEffect[_, S]]) + var state: EventsourcedState[S], + numberOfEvents: Int, + shouldSnapshotAfterPersist: Boolean, + var sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends MutableBehavior[EventsourcedBehavior.InternalProtocol] { + private var eventCounter = 0 + override def onMessage(msg: EventsourcedBehavior.InternalProtocol): Behavior[EventsourcedBehavior.InternalProtocol] = { msg match { case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) @@ -211,30 +197,37 @@ private[akka] object EventsourcedRunning { final def onJournalResponse( response: Response): Behavior[InternalProtocol] = { setup.log.debug("Received Journal response: {}", response) + + def onWriteResponse(p: PersistentRepr): Behavior[InternalProtocol] = { + state = state.updateLastSequenceNr(p) + eventCounter += 1 + + // only once all things are applied we can revert back + if (eventCounter < numberOfEvents) this + else { + if (shouldSnapshotAfterPersist) + internalSaveSnapshot(state) + + tryUnstash(applySideEffects(sideEffects, state)) + } + } + response match { case WriteMessageSuccess(p, id) ⇒ - if (id == setup.writerIdentity.instanceId) { - state = state.updateLastSequenceNr(p) - // FIXME is the order of pendingInvocations not reversed? - pendingInvocations.head.handler(p.payload) - pendingInvocations = pendingInvocations.tail - - // only once all things are applied we can revert back - if (pendingInvocations.nonEmpty) this - else tryUnstash(applySideEffects(sideEffects, state)) - } else this + if (id == setup.writerIdentity.instanceId) + onWriteResponse(p) + else this case WriteMessageRejected(p, cause, id) ⇒ if (id == setup.writerIdentity.instanceId) { - state = state.updateLastSequenceNr(p) onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) - tryUnstash(applySideEffects(sideEffects, state)) + onWriteResponse(p) } else this case WriteMessageFailure(p, cause, id) ⇒ - if (id == setup.writerIdentity.instanceId) { + if (id == setup.writerIdentity.instanceId) onPersistFailureThenStop(cause, p.payload, p.sequenceNr) - } else this + else this case WriteMessagesSuccessful ⇒ // ignore