make PersistingEvent MutableBehavior

fix some tests, snapshots still not working
This commit is contained in:
Patrik Nordwall 2018-03-07 13:28:11 +01:00 committed by Konrad `ktoso` Malawski
parent 44445140f5
commit e61f833dc3
6 changed files with 226 additions and 234 deletions

View file

@ -48,7 +48,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
) )
val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped)
newState newState
} }
@ -68,7 +68,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
sender = senderNotKnownBecauseAkkaTyped) sender = senderNotKnownBecauseAkkaTyped)
)) ))
setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped)
newState newState
} else state } else state
@ -77,7 +77,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
// reply is sent to `selfUntypedAdapted`, it is important to target that one // reply is sent to `selfUntypedAdapted`, it is important to target that one
setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntypedAdapted) setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntyped)
} }
protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = { protected def returnRecoveryPermit(setup: EventsourcedSetup[_, _, _], reason: String): Unit = {
@ -93,11 +93,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
* to the running [[PersistentActor]]. * to the running [[PersistentActor]].
*/ */
protected def loadSnapshot(criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { protected def loadSnapshot(criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntypedAdapted) setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntyped)
} }
protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = { protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = {
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntypedAdapted) setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntyped)
} }
} }

View file

@ -45,6 +45,8 @@ private[akka] class EventsourcedRecoveringSnapshot[C, E, S](
def createBehavior(): Behavior[InternalProtocol] = { def createBehavior(): Behavior[InternalProtocol] = {
startRecoveryTimer() startRecoveryTimer()
loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr)
withMdc { withMdc {
Behaviors.immutable { Behaviors.immutable {
case (_, SnapshotterResponse(r)) onSnapshotterResponse(r) case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)

View file

@ -40,7 +40,7 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](
withMdc { withMdc {
Behaviors.immutable[InternalProtocol] { Behaviors.immutable[InternalProtocol] {
case (_, InternalProtocol.RecoveryPermitGranted) // FIXME types case (_, InternalProtocol.RecoveryPermitGranted)
becomeRecovering() becomeRecovering()
case (_, other) case (_, other)
@ -70,8 +70,7 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](
private def requestRecoveryPermit(): Unit = { private def requestRecoveryPermit(): Unit = {
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs) // IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
val selfUntyped = setup.context.self.toUntyped setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfUntyped)
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped)
} }
} }

View file

@ -39,8 +39,7 @@ import scala.collection.immutable
final case class EventsourcedState[State]( final case class EventsourcedState[State](
seqNr: Long, seqNr: Long,
state: State, state: State
pendingInvocations: immutable.Seq[PendingHandlerInvocation] = Nil
) { ) {
def nextSequenceNr(): EventsourcedState[State] = def nextSequenceNr(): EventsourcedState[State] =
@ -49,16 +48,6 @@ import scala.collection.immutable
def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] = def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] =
if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this
def popApplyPendingInvocation(repr: PersistentRepr): EventsourcedState[State] = {
val (headSeq, remainingInvocations) = pendingInvocations.splitAt(1)
headSeq.head.handler(repr.payload)
copy(
pendingInvocations = remainingInvocations,
seqNr = repr.sequenceNr
)
}
def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = { def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = {
val updated = setup.eventHandler(state, event) val updated = setup.eventHandler(state, event)
copy(state = updated) copy(state = updated)
@ -76,30 +65,13 @@ import scala.collection.immutable
import EventsourcedRunning.EventsourcedState import EventsourcedRunning.EventsourcedState
def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = { def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = {
withMdc("run-cmnds") {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
case (_, SnapshotterResponse(r)) Behaviors.unhandled
case (_, JournalResponse(r)) Behaviors.unhandled
case (_, IncomingCommand(c: C @unchecked)) onCommand(state, c)
}
}
}
private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = { def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" phase
)
Behaviors.withMdc((_: Any) mdc, wrapped)
}
private def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(setup.commandContext, state.state, cmd) val effect = setup.commandHandler(setup.commandContext, state.state, cmd)
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
} }
@tailrec private def applyEffects( @tailrec def applyEffects(
msg: Any, msg: Any,
state: EventsourcedState[S], state: EventsourcedState[S],
effect: EffectImpl[E, S], effect: EffectImpl[E, S],
@ -124,14 +96,12 @@ import scala.collection.immutable
val newState2 = internalPersist(newState, eventToPersist) val newState2 = internalPersist(newState, eventToPersist)
val handler: Any Unit = { x // TODO is x the new state? val handler: Any Unit = { _
if (setup.snapshotWhen(newState2.state, event, newState2.seqNr)) if (setup.snapshotWhen(newState2.state, event, newState2.seqNr))
internalSaveSnapshot(state) internalSaveSnapshot(newState2)
} }
val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil
// FIXME applySideEffects is missing
persistingEvents(newState2, pendingInvocations, sideEffects) persistingEvents(newState2, pendingInvocations, sideEffects)
case PersistAll(events) case PersistAll(events)
@ -140,16 +110,12 @@ import scala.collection.immutable
// the invalid event, in case such validation is implemented in the event handler. // the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event // also, ensure that there is an event handler for each single event
var count = events.size var count = events.size
// var seqNr = state.seqNr var seqNr = state.seqNr
val (newState, shouldSnapshotAfterPersist) = val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
events.foldLeft((state, false)) {
case ((currentState, snapshot), event) case ((currentState, snapshot), event)
val value = currentState seqNr += 1
.nextSequenceNr() // FIXME seqNr is also incremented in internalPersistAll val shouldSnapshot = snapshot || setup.snapshotWhen(currentState.state, event, seqNr)
.applyEvent(setup, event) (currentState.applyEvent(setup, event), shouldSnapshot)
val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr)
(value, shouldSnapshot)
} }
val eventsToPersist = events.map(tagEvent) val eventsToPersist = events.map(tagEvent)
@ -159,14 +125,12 @@ import scala.collection.immutable
val handler: Any Unit = { _ val handler: Any Unit = { _
count -= 1 count -= 1
if (count == 0) { if (count == 0) {
// // FIXME the result of applying side effects is ignored
// val b = applySideEffects(sideEffects, newState)
if (shouldSnapshotAfterPersist) if (shouldSnapshotAfterPersist)
internalSaveSnapshot(newState) internalSaveSnapshot(newState2)
} }
} }
val pendingInvocations = events map { event val pendingInvocations = events.map { event
StashingHandlerInvocation(event, handler) StashingHandlerInvocation(event, handler)
} }
@ -189,11 +153,21 @@ import scala.collection.immutable
} }
} }
private def tagEvent(event: E): Any = { def tagEvent(event: E): Any = {
val tags = setup.tagger(event) val tags = setup.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags) if (tags.isEmpty) event else Tagged(event, tags)
} }
withMdc("run-cmnds") {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
case (_, SnapshotterResponse(r)) Behaviors.unhandled
case (_, JournalResponse(r)) Behaviors.unhandled
case (_, IncomingCommand(c: C @unchecked)) onCommand(state, c)
}
}
}
// =============================================== // ===============================================
def persistingEvents( def persistingEvents(
@ -201,33 +175,31 @@ import scala.collection.immutable
pendingInvocations: immutable.Seq[PendingHandlerInvocation], pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]] sideEffects: immutable.Seq[ChainableEffect[_, S]]
): Behavior[InternalProtocol] = { ): Behavior[InternalProtocol] = {
withMdc { withMdc("run-persist-evnts") {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { Behaviors.mutable[EventsourcedBehavior.InternalProtocol](_ new PersistingEvents(state, pendingInvocations, sideEffects))
case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)
case (_, JournalResponse(r)) onJournalResponse(state, pendingInvocations, sideEffects, r)
case (_, in: IncomingCommand[C @unchecked]) onCommand(state, in)
}
} }
} }
private def withMdc(wrapped: Behavior[InternalProtocol]) = { class PersistingEvents(
val mdc = Map( var state: EventsourcedState[S],
"persistenceId" setup.persistenceId, var pendingInvocations: immutable.Seq[PendingHandlerInvocation],
"phase" "run-persist-evnts" var sideEffects: immutable.Seq[ChainableEffect[_, S]])
) extends MutableBehavior[EventsourcedBehavior.InternalProtocol] {
Behaviors.withMdc((_: Any) mdc, wrapped) override def onMessage(msg: EventsourcedBehavior.InternalProtocol): Behavior[EventsourcedBehavior.InternalProtocol] = {
msg match {
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) onJournalResponse(r)
case in: IncomingCommand[C @unchecked] onCommand(in)
}
} }
def onCommand(state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
stash(cmd) stash(cmd)
Behaviors.same this
} }
final def onJournalResponse( final def onJournalResponse(
state: EventsourcedState[S],
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]],
response: Response): Behavior[InternalProtocol] = { response: Response): Behavior[InternalProtocol] = {
setup.log.debug("Received Journal response: {}", response) setup.log.debug("Received Journal response: {}", response)
response match { response match {
@ -235,21 +207,24 @@ import scala.collection.immutable
// instanceId mismatch can happen for persistAsync and defer in case of actor restart // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler // while message is in flight, in that case we ignore the call to the handler
if (id == setup.writerIdentity.instanceId) { if (id == setup.writerIdentity.instanceId) {
val newState = state.popApplyPendingInvocation(p) state = state.updateLastSequenceNr(p)
// FIXME is the order of pendingInvocations not reversed?
pendingInvocations.head.handler(p.payload)
pendingInvocations = pendingInvocations.tail
// only once all things are applied we can revert back // only once all things are applied we can revert back
if (newState.pendingInvocations.nonEmpty) Behaviors.same if (pendingInvocations.nonEmpty) this
else tryUnstash(applySideEffects(sideEffects, newState)) else tryUnstash(applySideEffects(sideEffects, state))
} else Behaviors.same } else this
case WriteMessageRejected(p, cause, id) case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart // instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded // while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) { if (id == setup.writerIdentity.instanceId) {
val newState = state.updateLastSequenceNr(p) state = state.updateLastSequenceNr(p)
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design)
tryUnstash(applySideEffects(sideEffects, newState)) tryUnstash(applySideEffects(sideEffects, state))
} else Behaviors.same } else this
case WriteMessageFailure(p, cause, id) case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart // instanceId mismatch can happen for persistAsync and defer in case of actor restart
@ -257,19 +232,19 @@ import scala.collection.immutable
if (id == setup.writerIdentity.instanceId) { if (id == setup.writerIdentity.instanceId) {
// onWriteMessageComplete() -> tryBecomeHandlingCommands // onWriteMessageComplete() -> tryBecomeHandlingCommands
onPersistFailureThenStop(cause, p.payload, p.sequenceNr) onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
} else Behaviors.same } else this
case WriteMessagesSuccessful case WriteMessagesSuccessful
// ignore // ignore
Behaviors.same this
case WriteMessagesFailed(_) case WriteMessagesFailed(_)
// ignore // ignore
Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects this // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _: LoopMessageSuccess case _: LoopMessageSuccess
// ignore, should never happen as there is no persistAsync in typed // ignore, should never happen as there is no persistAsync in typed
Behaviors.same Behaviors.unhandled
} }
} }
@ -295,17 +270,28 @@ import scala.collection.immutable
response match { response match {
case SaveSnapshotSuccess(meta) case SaveSnapshotSuccess(meta)
setup.context.log.debug("Save snapshot successful: " + meta) setup.context.log.debug("Save snapshot successful: " + meta)
Behaviors.same this
case SaveSnapshotFailure(meta, ex) case SaveSnapshotFailure(meta, ex)
setup.context.log.error(ex, "Save snapshot failed! " + meta) setup.context.log.error(ex, "Save snapshot failed! " + meta)
Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop this // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
} }
} }
}
// -------------------------- // --------------------------
private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = {
val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" phase
)
Behaviors.withMdc((_: Any) mdc, wrapped)
}
def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
var res: Behavior[InternalProtocol] = Behaviors.same var res: Behavior[InternalProtocol] = handlingCommands(state)
val it = effects.iterator val it = effects.iterator
// if at least one effect results in a `stop`, we need to stop // if at least one effect results in a `stop`, we need to stop

View file

@ -147,7 +147,12 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
EventsourcedRequestingRecoveryPermit(setup) EventsourcedRequestingRecoveryPermit(setup)
} }
}.widen[Command] { case c InternalProtocol.IncomingCommand(c) } // TODO this is nice, same way applicable to mutable style }.widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.narrow[Command]
} }
/** /**

View file

@ -38,7 +38,7 @@ object PersistentBehaviorSpec {
val config = ConfigFactory.parseString( val config = ConfigFactory.parseString(
s""" s"""
akka.loglevel = INFO akka.loglevel = DEBUG
# akka.persistence.typed.log-stashing = INFO # akka.persistence.typed.log-stashing = INFO
akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore" akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore"