diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala index b26f6f3cee..8dc9bb74b3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala @@ -33,11 +33,8 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { type EventOrTagged = Any // `Any` since can be `E` or `Tagged` protected def internalPersist( - state: EventsourcedRunning.EventsourcedState[S], - event: EventOrTagged, - sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[InternalProtocol] = { - // pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - val pendingInvocations = StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) :: Nil + state: EventsourcedRunning.EventsourcedState[S], + event: EventOrTagged): EventsourcedRunning.EventsourcedState[S] = { val newState = state.nextSequenceNr() @@ -53,35 +50,28 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) - EventsourcedRunning.PersistingEvents[C, E, S](setup, state, pendingInvocations, sideEffects) + newState } protected def internalPersistAll( - events: immutable.Seq[EventOrTagged], - state: EventsourcedRunning.EventsourcedState[S], - sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any ⇒ Unit): Behavior[InternalProtocol] = { + events: immutable.Seq[EventOrTagged], + state: EventsourcedRunning.EventsourcedState[S]): EventsourcedRunning.EventsourcedState[S] = { if (events.nonEmpty) { - - val pendingInvocations = events map { event ⇒ - // pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - } - val newState = state.nextSequenceNr() val senderNotKnownBecauseAkkaTyped = null val write = AtomicWrite(events.map(event ⇒ PersistentRepr( event, persistenceId = setup.persistenceId, - sequenceNr = newState.seqNr, + sequenceNr = newState.seqNr, // FIXME increment for each event writerUuid = setup.writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped) )) setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) - EventsourcedRunning.PersistingEvents(setup, state, pendingInvocations, sideEffects) - } else Behaviors.same + newState + } else state } protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala index 5c2114366d..b8d65c722c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala @@ -177,7 +177,7 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S]( returnRecoveryPermit(setup, "recovery completed successfully") setup.recoveryCompleted(setup.commandContext, state.state) - val running = EventsourcedRunning.HandlingCommands[C, E, S]( + val running = EventsourcedRunning[C, E, S]( setup, EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state) ) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 662dd0a19f..d6312c6fa8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -65,250 +65,246 @@ import scala.collection.immutable } } - // =============================================== + def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = + new EventsourcedRunning(setup).handlingCommands(state) +} - object HandlingCommands { - def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = - new HandlingCommands(setup).createBehavior(state) - } +// =============================================== - class HandlingCommands[C, E, S](override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { +@InternalApi private[akka] class EventsourcedRunning[C, E, S](override val setup: EventsourcedSetup[C, E, S]) + extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { + import EventsourcedRunning.EventsourcedState - def createBehavior(state: EventsourcedState[S]): Behavior[InternalProtocol] = { - withMdc { - Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { - case (_, SnapshotterResponse(r)) ⇒ Behaviors.unhandled - case (_, JournalResponse(r)) ⇒ Behaviors.unhandled - case (_, IncomingCommand(c: C @unchecked)) ⇒ onCommand(state, c) - } + def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = { + withMdc("run-cmnds") { + Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { + case (_, SnapshotterResponse(r)) ⇒ Behaviors.unhandled + case (_, JournalResponse(r)) ⇒ Behaviors.unhandled + case (_, IncomingCommand(c: C @unchecked)) ⇒ onCommand(state, c) } } + } - private def withMdc(wrapped: Behavior[InternalProtocol]) = { - val mdc = Map( - "persistenceId" → setup.persistenceId, - "phase" → "run-cmnds" - ) + private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → phase + ) - Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) - } + Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) + } - private def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { - val effect = setup.commandHandler(setup.commandContext, state.state, cmd) - applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? - } + private def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { + val effect = setup.commandHandler(setup.commandContext, state.state, cmd) + applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + } - @tailrec private def applyEffects( - msg: Any, - state: EventsourcedState[S], - effect: EffectImpl[E, S], - sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil - ): Behavior[InternalProtocol] = { - import setup.log + @tailrec private def applyEffects( + msg: Any, + state: EventsourcedState[S], + effect: EffectImpl[E, S], + sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil + ): Behavior[InternalProtocol] = { + import setup.log - if (log.isDebugEnabled) - log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) + if (log.isDebugEnabled) + log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) - effect match { - case CompositeEffect(eff, currentSideEffects) ⇒ - // unwrap and accumulate effects - applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) + effect match { + case CompositeEffect(eff, currentSideEffects) ⇒ + // unwrap and accumulate effects + applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) - case Persist(event) ⇒ + case Persist(event) ⇒ + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + val newState = state.applyEvent(setup, event) + val eventToPersist = tagEvent(event) + + val newState2 = internalPersist(newState, eventToPersist) + + val handler: Any ⇒ Unit = { x ⇒ // TODO is x the new state? + if (setup.snapshotWhen(newState2.state, event, newState2.seqNr)) + internalSaveSnapshot(state) + } + val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil + + // FIXME applySideEffects is missing + + persistingEvents(newState2, pendingInvocations, sideEffects) + + case PersistAll(events) ⇒ + if (events.nonEmpty) { // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event - val newState = state.applyEvent(setup, event) - val eventToPersist = tagEvent(event) + var count = events.size + // var seqNr = state.seqNr + val (newState, shouldSnapshotAfterPersist) = + events.foldLeft((state, false)) { + case ((currentState, snapshot), event) ⇒ + val value = currentState + .nextSequenceNr() // FIXME seqNr is also incremented in internalPersistAll + .applyEvent(setup, event) - internalPersist(state, eventToPersist, sideEffects) { _ ⇒ - if (setup.snapshotWhen(newState.state, event, newState.seqNr)) - internalSaveSnapshot(state) - } - - case PersistAll(events) ⇒ - if (events.nonEmpty) { - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - var count = events.size - // var seqNr = state.seqNr - val (newState, shouldSnapshotAfterPersist) = - events.foldLeft((state, false)) { - case ((currentState, snapshot), event) ⇒ - val value = currentState - .nextSequenceNr() - .applyEvent(setup, event) - - val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr) - (value, shouldSnapshot) - } - // state = newState - - val eventsToPersist = events.map(tagEvent) - - internalPersistAll(eventsToPersist, newState, sideEffects) { _ ⇒ - count -= 1 - if (count == 0) { - // FIXME the result of applying side effects is ignored - val b = applySideEffects(sideEffects, newState) - - if (shouldSnapshotAfterPersist) - internalSaveSnapshot(newState) - } + val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr) + (value, shouldSnapshot) + } + + val eventsToPersist = events.map(tagEvent) + + val newState2 = internalPersistAll(eventsToPersist, newState) + + val handler: Any ⇒ Unit = { _ ⇒ + count -= 1 + if (count == 0) { + // // FIXME the result of applying side effects is ignored + // val b = applySideEffects(sideEffects, newState) + if (shouldSnapshotAfterPersist) + internalSaveSnapshot(newState) } - } else { - // run side-effects even when no events are emitted - tryUnstash(applySideEffects(sideEffects, state)) } - case _: PersistNothing.type @unchecked ⇒ + val pendingInvocations = events map { event ⇒ + StashingHandlerInvocation(event, handler) + } + + persistingEvents(newState2, pendingInvocations, sideEffects) + + } else { + // run side-effects even when no events are emitted tryUnstash(applySideEffects(sideEffects, state)) + } - case _: Unhandled.type @unchecked ⇒ - applySideEffects(sideEffects, state) - Behavior.unhandled + case _: PersistNothing.type @unchecked ⇒ + tryUnstash(applySideEffects(sideEffects, state)) - case c: ChainableEffect[_, S] ⇒ - applySideEffect(c, state) - } - } - - private def tagEvent(event: E): Any = { - val tags = setup.tagger(event) - if (tags.isEmpty) event else Tagged(event, tags) + case _: Unhandled.type @unchecked ⇒ + applySideEffects(sideEffects, state) + Behavior.unhandled + + case c: ChainableEffect[_, S] ⇒ + applySideEffect(c, state) } + } + private def tagEvent(event: E): Any = { + val tags = setup.tagger(event) + if (tags.isEmpty) event else Tagged(event, tags) } // =============================================== - object PersistingEvents { - def apply[C, E, S]( - setup: EventsourcedSetup[C, E, S], - state: EventsourcedState[S], - pendingInvocations: immutable.Seq[PendingHandlerInvocation], - sideEffects: immutable.Seq[ChainableEffect[_, S]] - ): Behavior[InternalProtocol] = - new PersistingEvents(setup).createBehavior(state, pendingInvocations, sideEffects) - + def persistingEvents( + state: EventsourcedState[S], + pendingInvocations: immutable.Seq[PendingHandlerInvocation], + sideEffects: immutable.Seq[ChainableEffect[_, S]] + ): Behavior[InternalProtocol] = { + withMdc { + Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { + case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) + case (_, JournalResponse(r)) ⇒ onJournalResponse(state, pendingInvocations, sideEffects, r) + case (_, in: IncomingCommand[C @unchecked]) ⇒ onCommand(state, in) + } + } } - class PersistingEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { + private def withMdc(wrapped: Behavior[InternalProtocol]) = { + val mdc = Map( + "persistenceId" → setup.persistenceId, + "phase" → "run-persist-evnts" + ) - def createBehavior( - state: EventsourcedState[S], - pendingInvocations: immutable.Seq[PendingHandlerInvocation], - sideEffects: immutable.Seq[ChainableEffect[_, S]] - ): Behavior[InternalProtocol] = { - withMdc { - Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { - case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) - case (_, JournalResponse(r)) ⇒ onJournalResponse(state, pendingInvocations, sideEffects, r) - case (_, in: IncomingCommand[C @unchecked]) ⇒ onCommand(state, in) - } - } + Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) + } + + def onCommand(state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { + stash(cmd) + Behaviors.same + } + + final def onJournalResponse( + state: EventsourcedState[S], + pendingInvocations: immutable.Seq[PendingHandlerInvocation], + sideEffects: immutable.Seq[ChainableEffect[_, S]], + response: Response): Behavior[InternalProtocol] = { + setup.log.debug("Received Journal response: {}", response) + response match { + case WriteMessageSuccess(p, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case we ignore the call to the handler + if (id == setup.writerIdentity.instanceId) { + val newState = state.popApplyPendingInvocation(p) + + // only once all things are applied we can revert back + if (newState.pendingInvocations.nonEmpty) Behaviors.same + else tryUnstash(applySideEffects(sideEffects, newState)) + } else Behaviors.same + + case WriteMessageRejected(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == setup.writerIdentity.instanceId) { + val newState = state.updateLastSequenceNr(p) + onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) + tryUnstash(applySideEffects(sideEffects, newState)) + } else Behaviors.same + + case WriteMessageFailure(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == setup.writerIdentity.instanceId) { + // onWriteMessageComplete() -> tryBecomeHandlingCommands + onPersistFailureThenStop(cause, p.payload, p.sequenceNr) + } else Behaviors.same + + case WriteMessagesSuccessful ⇒ + // ignore + Behaviors.same + + case WriteMessagesFailed(_) ⇒ + // ignore + Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects + + case _: LoopMessageSuccess ⇒ + // ignore, should never happen as there is no persistAsync in typed + Behaviors.same } + } - private def withMdc(wrapped: Behavior[InternalProtocol]) = { - val mdc = Map( - "persistenceId" → setup.persistenceId, - "phase" → "run-persist-evnts" - ) + // private def onWriteMessageComplete(): Unit = + // tryBecomeHandlingCommands() - Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) + private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { + setup.log.error( + cause, + "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", + event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage) + } + + private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = { + setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", + event.getClass.getName, seqNr, setup.persistenceId) + + // FIXME see #24479 for reconsidering the stopping behaviour + Behaviors.stopped + } + + private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { + response match { + case SaveSnapshotSuccess(meta) ⇒ + setup.context.log.debug("Save snapshot successful: " + meta) + Behaviors.same + case SaveSnapshotFailure(meta, ex) ⇒ + setup.context.log.error(ex, "Save snapshot failed! " + meta) + Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop } - - def onCommand(state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { - stash(cmd) - Behaviors.same - } - - final def onJournalResponse( - state: EventsourcedState[S], - pendingInvocations: immutable.Seq[PendingHandlerInvocation], - sideEffects: immutable.Seq[ChainableEffect[_, S]], - response: Response): Behavior[InternalProtocol] = { - setup.log.debug("Received Journal response: {}", response) - response match { - case WriteMessageSuccess(p, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case we ignore the call to the handler - if (id == setup.writerIdentity.instanceId) { - val newState = state.popApplyPendingInvocation(p) - - // only once all things are applied we can revert back - if (newState.pendingInvocations.nonEmpty) Behaviors.same - else tryUnstash(applySideEffects(sideEffects, newState)) - } else Behaviors.same - - case WriteMessageRejected(p, cause, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case the handler has already been discarded - if (id == setup.writerIdentity.instanceId) { - val newState = state.updateLastSequenceNr(p) - onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) - tryUnstash(applySideEffects(sideEffects, newState)) - } else Behaviors.same - - case WriteMessageFailure(p, cause, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case the handler has already been discarded - if (id == setup.writerIdentity.instanceId) { - // onWriteMessageComplete() -> tryBecomeHandlingCommands - onPersistFailureThenStop(cause, p.payload, p.sequenceNr) - } else Behaviors.same - - case WriteMessagesSuccessful ⇒ - // ignore - Behaviors.same - - case WriteMessagesFailed(_) ⇒ - // ignore - Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects - - case _: LoopMessageSuccess ⇒ - // ignore, should never happen as there is no persistAsync in typed - Behaviors.same - } - } - - // private def onWriteMessageComplete(): Unit = - // tryBecomeHandlingCommands() - - private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { - setup.log.error( - cause, - "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", - event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage) - } - - private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = { - setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", - event.getClass.getName, seqNr, setup.persistenceId) - - // FIXME see #24479 for reconsidering the stopping behaviour - Behaviors.stopped - } - - private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { - response match { - case SaveSnapshotSuccess(meta) ⇒ - setup.context.log.debug("Save snapshot successful: " + meta) - Behaviors.same - case SaveSnapshotFailure(meta, ex) ⇒ - setup.context.log.error(ex, "Save snapshot failed! " + meta) - Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop - } - } - } // -------------------------- - def applySideEffects[S](effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { + def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { var res: Behavior[InternalProtocol] = Behaviors.same val it = effects.iterator @@ -325,7 +321,7 @@ import scala.collection.immutable res } - def applySideEffect[S](effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { + def applySideEffect(effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { case _: Stop.type @unchecked ⇒ Behaviors.stopped