From e61f833dc341fb6b572be253b9c1683850c51a14 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Mar 2018 13:28:11 +0100 Subject: [PATCH] make PersistingEvent MutableBehavior fix some tests, snapshots still not working --- .../EventsourcedJournalInteractions.scala | 10 +- .../EventsourcedRecoveringSnapshot.scala | 2 + ...EventsourcedRequestingRecoveryPermit.scala | 5 +- .../typed/internal/EventsourcedRunning.scala | 434 +++++++++--------- .../typed/scaladsl/PersistentBehaviors.scala | 7 +- .../scaladsl/PersistentBehaviorSpec.scala | 2 +- 6 files changed, 226 insertions(+), 234 deletions(-) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala index 8dc9bb74b3..8bd2845141 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala @@ -48,7 +48,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { ) val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync - setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) + setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped) newState } @@ -68,7 +68,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { sender = senderNotKnownBecauseAkkaTyped) )) - setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) + setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped) newState } else state @@ -77,7 +77,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) // reply is sent to `selfUntypedAdapted`, it is important to target that one - setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntypedAdapted) + setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntyped) } protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = { @@ -93,11 +93,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { * to the running [[PersistentActor]]. */ protected def loadSnapshot(criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { - setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntypedAdapted) + setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntyped) } protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = { - setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntypedAdapted) + setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntyped) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala index 1644401985..3822171c32 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala @@ -45,6 +45,8 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S]( def createBehavior(): Behavior[InternalProtocol] = { startRecoveryTimer() + loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr) + withMdc { Behaviors.immutable { case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala index a0d007a69d..82aa663bf1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -40,7 +40,7 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S]( withMdc { Behaviors.immutable[InternalProtocol] { - case (_, InternalProtocol.RecoveryPermitGranted) ⇒ // FIXME types + case (_, InternalProtocol.RecoveryPermitGranted) ⇒ becomeRecovering() case (_, other) ⇒ @@ -70,8 +70,7 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S]( private def requestRecoveryPermit(): Unit = { import akka.actor.typed.scaladsl.adapter._ // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) - val selfUntyped = setup.context.self.toUntyped - setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped) + setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfUntyped) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index d6312c6fa8..2d7eb39b8a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -38,9 +38,8 @@ import scala.collection.immutable @InternalApi private[akka] object EventsourcedRunning { final case class EventsourcedState[State]( - seqNr: Long, - state: State, - pendingInvocations: immutable.Seq[PendingHandlerInvocation] = Nil + seqNr: Long, + state: State ) { def nextSequenceNr(): EventsourcedState[State] = @@ -49,16 +48,6 @@ import scala.collection.immutable def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] = if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this - def popApplyPendingInvocation(repr: PersistentRepr): EventsourcedState[State] = { - val (headSeq, remainingInvocations) = pendingInvocations.splitAt(1) - headSeq.head.handler(repr.payload) - - copy( - pendingInvocations = remainingInvocations, - seqNr = repr.sequenceNr - ) - } - def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = { val updated = setup.eventHandler(state, event) copy(state = updated) @@ -76,6 +65,99 @@ import scala.collection.immutable import EventsourcedRunning.EventsourcedState def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = { + + def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { + val effect = setup.commandHandler(setup.commandContext, state.state, cmd) + applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + } + + @tailrec def applyEffects( + msg: Any, + state: EventsourcedState[S], + effect: EffectImpl[E, S], + sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil + ): Behavior[InternalProtocol] = { + import setup.log + + if (log.isDebugEnabled) + log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) + + effect match { + case CompositeEffect(eff, currentSideEffects) ⇒ + // unwrap and accumulate effects + applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) + + case Persist(event) ⇒ + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + val newState = state.applyEvent(setup, event) + val eventToPersist = tagEvent(event) + + val newState2 = internalPersist(newState, eventToPersist) + + val handler: Any ⇒ Unit = { _ ⇒ + if (setup.snapshotWhen(newState2.state, event, newState2.seqNr)) + internalSaveSnapshot(newState2) + } + val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil + + persistingEvents(newState2, pendingInvocations, sideEffects) + + case PersistAll(events) ⇒ + if (events.nonEmpty) { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + var count = events.size + var seqNr = state.seqNr + val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) { + case ((currentState, snapshot), event) ⇒ + seqNr += 1 + val shouldSnapshot = snapshot || setup.snapshotWhen(currentState.state, event, seqNr) + (currentState.applyEvent(setup, event), shouldSnapshot) + } + + val eventsToPersist = events.map(tagEvent) + + val newState2 = internalPersistAll(eventsToPersist, newState) + + val handler: Any ⇒ Unit = { _ ⇒ + count -= 1 + if (count == 0) { + if (shouldSnapshotAfterPersist) + internalSaveSnapshot(newState2) + } + } + + val pendingInvocations = events.map { event ⇒ + StashingHandlerInvocation(event, handler) + } + + persistingEvents(newState2, pendingInvocations, sideEffects) + + } else { + // run side-effects even when no events are emitted + tryUnstash(applySideEffects(sideEffects, state)) + } + + case _: PersistNothing.type @unchecked ⇒ + tryUnstash(applySideEffects(sideEffects, state)) + + case _: Unhandled.type @unchecked ⇒ + applySideEffects(sideEffects, state) + Behavior.unhandled + + case c: ChainableEffect[_, S] ⇒ + applySideEffect(c, state) + } + } + + def tagEvent(event: E): Any = { + val tags = setup.tagger(event) + if (tags.isEmpty) event else Tagged(event, tags) + } + withMdc("run-cmnds") { Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { case (_, SnapshotterResponse(r)) ⇒ Behaviors.unhandled @@ -83,8 +165,122 @@ import scala.collection.immutable case (_, IncomingCommand(c: C @unchecked)) ⇒ onCommand(state, c) } } + } + // =============================================== + + def persistingEvents( + state: EventsourcedState[S], + pendingInvocations: immutable.Seq[PendingHandlerInvocation], + sideEffects: immutable.Seq[ChainableEffect[_, S]] + ): Behavior[InternalProtocol] = { + withMdc("run-persist-evnts") { + Behaviors.mutable[EventsourcedBehavior.InternalProtocol](_ ⇒ new PersistingEvents(state, pendingInvocations, sideEffects)) + } + } + + class PersistingEvents( + var state: EventsourcedState[S], + var pendingInvocations: immutable.Seq[PendingHandlerInvocation], + var sideEffects: immutable.Seq[ChainableEffect[_, S]]) + extends MutableBehavior[EventsourcedBehavior.InternalProtocol] { + + override def onMessage(msg: EventsourcedBehavior.InternalProtocol): Behavior[EventsourcedBehavior.InternalProtocol] = { + msg match { + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case JournalResponse(r) ⇒ onJournalResponse(r) + case in: IncomingCommand[C @unchecked] ⇒ onCommand(in) + } + } + + def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { + stash(cmd) + this + } + + final def onJournalResponse( + response: Response): Behavior[InternalProtocol] = { + setup.log.debug("Received Journal response: {}", response) + response match { + case WriteMessageSuccess(p, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case we ignore the call to the handler + if (id == setup.writerIdentity.instanceId) { + state = state.updateLastSequenceNr(p) + // FIXME is the order of pendingInvocations not reversed? + pendingInvocations.head.handler(p.payload) + pendingInvocations = pendingInvocations.tail + + // only once all things are applied we can revert back + if (pendingInvocations.nonEmpty) this + else tryUnstash(applySideEffects(sideEffects, state)) + } else this + + case WriteMessageRejected(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == setup.writerIdentity.instanceId) { + state = state.updateLastSequenceNr(p) + onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) + tryUnstash(applySideEffects(sideEffects, state)) + } else this + + case WriteMessageFailure(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == setup.writerIdentity.instanceId) { + // onWriteMessageComplete() -> tryBecomeHandlingCommands + onPersistFailureThenStop(cause, p.payload, p.sequenceNr) + } else this + + case WriteMessagesSuccessful ⇒ + // ignore + this + + case WriteMessagesFailed(_) ⇒ + // ignore + this // it will be stopped by the first WriteMessageFailure message; not applying side effects + + case _: LoopMessageSuccess ⇒ + // ignore, should never happen as there is no persistAsync in typed + Behaviors.unhandled + } + } + + // private def onWriteMessageComplete(): Unit = + // tryBecomeHandlingCommands() + + private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { + setup.log.error( + cause, + "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", + event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage) + } + + private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = { + setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", + event.getClass.getName, seqNr, setup.persistenceId) + + // FIXME see #24479 for reconsidering the stopping behaviour + Behaviors.stopped + } + + private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { + response match { + case SaveSnapshotSuccess(meta) ⇒ + setup.context.log.debug("Save snapshot successful: " + meta) + this + case SaveSnapshotFailure(meta, ex) ⇒ + setup.context.log.error(ex, "Save snapshot failed! " + meta) + this // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop + } + } + + } + + // -------------------------- + private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = { val mdc = Map( "persistenceId" → setup.persistenceId, @@ -94,218 +290,8 @@ import scala.collection.immutable Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) } - private def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { - val effect = setup.commandHandler(setup.commandContext, state.state, cmd) - applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? - } - - @tailrec private def applyEffects( - msg: Any, - state: EventsourcedState[S], - effect: EffectImpl[E, S], - sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil - ): Behavior[InternalProtocol] = { - import setup.log - - if (log.isDebugEnabled) - log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) - - effect match { - case CompositeEffect(eff, currentSideEffects) ⇒ - // unwrap and accumulate effects - applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) - - case Persist(event) ⇒ - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - val newState = state.applyEvent(setup, event) - val eventToPersist = tagEvent(event) - - val newState2 = internalPersist(newState, eventToPersist) - - val handler: Any ⇒ Unit = { x ⇒ // TODO is x the new state? - if (setup.snapshotWhen(newState2.state, event, newState2.seqNr)) - internalSaveSnapshot(state) - } - val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil - - // FIXME applySideEffects is missing - - persistingEvents(newState2, pendingInvocations, sideEffects) - - case PersistAll(events) ⇒ - if (events.nonEmpty) { - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - var count = events.size - // var seqNr = state.seqNr - val (newState, shouldSnapshotAfterPersist) = - events.foldLeft((state, false)) { - case ((currentState, snapshot), event) ⇒ - val value = currentState - .nextSequenceNr() // FIXME seqNr is also incremented in internalPersistAll - .applyEvent(setup, event) - - val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr) - (value, shouldSnapshot) - } - - val eventsToPersist = events.map(tagEvent) - - val newState2 = internalPersistAll(eventsToPersist, newState) - - val handler: Any ⇒ Unit = { _ ⇒ - count -= 1 - if (count == 0) { - // // FIXME the result of applying side effects is ignored - // val b = applySideEffects(sideEffects, newState) - if (shouldSnapshotAfterPersist) - internalSaveSnapshot(newState) - } - } - - val pendingInvocations = events map { event ⇒ - StashingHandlerInvocation(event, handler) - } - - persistingEvents(newState2, pendingInvocations, sideEffects) - - } else { - // run side-effects even when no events are emitted - tryUnstash(applySideEffects(sideEffects, state)) - } - - case _: PersistNothing.type @unchecked ⇒ - tryUnstash(applySideEffects(sideEffects, state)) - - case _: Unhandled.type @unchecked ⇒ - applySideEffects(sideEffects, state) - Behavior.unhandled - - case c: ChainableEffect[_, S] ⇒ - applySideEffect(c, state) - } - } - - private def tagEvent(event: E): Any = { - val tags = setup.tagger(event) - if (tags.isEmpty) event else Tagged(event, tags) - } - - // =============================================== - - def persistingEvents( - state: EventsourcedState[S], - pendingInvocations: immutable.Seq[PendingHandlerInvocation], - sideEffects: immutable.Seq[ChainableEffect[_, S]] - ): Behavior[InternalProtocol] = { - withMdc { - Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { - case (_, SnapshotterResponse(r)) ⇒ onSnapshotterResponse(r) - case (_, JournalResponse(r)) ⇒ onJournalResponse(state, pendingInvocations, sideEffects, r) - case (_, in: IncomingCommand[C @unchecked]) ⇒ onCommand(state, in) - } - } - } - - private def withMdc(wrapped: Behavior[InternalProtocol]) = { - val mdc = Map( - "persistenceId" → setup.persistenceId, - "phase" → "run-persist-evnts" - ) - - Behaviors.withMdc((_: Any) ⇒ mdc, wrapped) - } - - def onCommand(state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { - stash(cmd) - Behaviors.same - } - - final def onJournalResponse( - state: EventsourcedState[S], - pendingInvocations: immutable.Seq[PendingHandlerInvocation], - sideEffects: immutable.Seq[ChainableEffect[_, S]], - response: Response): Behavior[InternalProtocol] = { - setup.log.debug("Received Journal response: {}", response) - response match { - case WriteMessageSuccess(p, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case we ignore the call to the handler - if (id == setup.writerIdentity.instanceId) { - val newState = state.popApplyPendingInvocation(p) - - // only once all things are applied we can revert back - if (newState.pendingInvocations.nonEmpty) Behaviors.same - else tryUnstash(applySideEffects(sideEffects, newState)) - } else Behaviors.same - - case WriteMessageRejected(p, cause, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case the handler has already been discarded - if (id == setup.writerIdentity.instanceId) { - val newState = state.updateLastSequenceNr(p) - onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) - tryUnstash(applySideEffects(sideEffects, newState)) - } else Behaviors.same - - case WriteMessageFailure(p, cause, id) ⇒ - // instanceId mismatch can happen for persistAsync and defer in case of actor restart - // while message is in flight, in that case the handler has already been discarded - if (id == setup.writerIdentity.instanceId) { - // onWriteMessageComplete() -> tryBecomeHandlingCommands - onPersistFailureThenStop(cause, p.payload, p.sequenceNr) - } else Behaviors.same - - case WriteMessagesSuccessful ⇒ - // ignore - Behaviors.same - - case WriteMessagesFailed(_) ⇒ - // ignore - Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects - - case _: LoopMessageSuccess ⇒ - // ignore, should never happen as there is no persistAsync in typed - Behaviors.same - } - } - - // private def onWriteMessageComplete(): Unit = - // tryBecomeHandlingCommands() - - private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { - setup.log.error( - cause, - "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", - event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage) - } - - private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = { - setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", - event.getClass.getName, seqNr, setup.persistenceId) - - // FIXME see #24479 for reconsidering the stopping behaviour - Behaviors.stopped - } - - private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { - response match { - case SaveSnapshotSuccess(meta) ⇒ - setup.context.log.debug("Save snapshot successful: " + meta) - Behaviors.same - case SaveSnapshotFailure(meta, ex) ⇒ - setup.context.log.error(ex, "Save snapshot failed! " + meta) - Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop - } - } - - // -------------------------- - def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { - var res: Behavior[InternalProtocol] = Behaviors.same + var res: Behavior[InternalProtocol] = handlingCommands(state) val it = effects.iterator // if at least one effect results in a `stop`, we need to stop diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 058832f11a..5bd4a5789d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -147,7 +147,12 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( EventsourcedRequestingRecoveryPermit(setup) } - }.widen[Command] { case c ⇒ InternalProtocol.IncomingCommand(c) } // TODO this is nice, same way applicable to mutable style + }.widen[Any] { + case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted + case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) + case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) + }.narrow[Command] } /** diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index e3758dfff7..102ef3d919 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -38,7 +38,7 @@ object PersistentBehaviorSpec { val config = ConfigFactory.parseString( s""" - akka.loglevel = INFO + akka.loglevel = DEBUG # akka.persistence.typed.log-stashing = INFO akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore"