EventsourcedRunning, internalPersist, internalPersistAll

* I don't think the JournalInteractions should be conserned with
  next behavior
This commit is contained in:
Patrik Nordwall 2018-03-07 09:04:39 +01:00 committed by Konrad `ktoso` Malawski
parent a05f50a0c0
commit 44445140f5
3 changed files with 211 additions and 225 deletions

View file

@ -33,11 +33,8 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
type EventOrTagged = Any // `Any` since can be `E` or `Tagged` type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
protected def internalPersist( protected def internalPersist(
state: EventsourcedRunning.EventsourcedState[S], state: EventsourcedRunning.EventsourcedState[S],
event: EventOrTagged, event: EventOrTagged): EventsourcedRunning.EventsourcedState[S] = {
sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[InternalProtocol] = {
// pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
val pendingInvocations = StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit]) :: Nil
val newState = state.nextSequenceNr() val newState = state.nextSequenceNr()
@ -53,35 +50,28 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) setup.journal.tell(JournalProtocol.WriteMessages(eventBatch, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted)
EventsourcedRunning.PersistingEvents[C, E, S](setup, state, pendingInvocations, sideEffects) newState
} }
protected def internalPersistAll( protected def internalPersistAll(
events: immutable.Seq[EventOrTagged], events: immutable.Seq[EventOrTagged],
state: EventsourcedRunning.EventsourcedState[S], state: EventsourcedRunning.EventsourcedState[S]): EventsourcedRunning.EventsourcedState[S] = {
sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[InternalProtocol] = {
if (events.nonEmpty) { if (events.nonEmpty) {
val pendingInvocations = events map { event
// pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
}
val newState = state.nextSequenceNr() val newState = state.nextSequenceNr()
val senderNotKnownBecauseAkkaTyped = null val senderNotKnownBecauseAkkaTyped = null
val write = AtomicWrite(events.map(event PersistentRepr( val write = AtomicWrite(events.map(event PersistentRepr(
event, event,
persistenceId = setup.persistenceId, persistenceId = setup.persistenceId,
sequenceNr = newState.seqNr, sequenceNr = newState.seqNr, // FIXME increment for each event
writerUuid = setup.writerIdentity.writerUuid, writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped) sender = senderNotKnownBecauseAkkaTyped)
)) ))
setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted) setup.journal.tell(JournalProtocol.WriteMessages(write :: Nil, setup.selfUntypedAdapted, setup.writerIdentity.instanceId), setup.selfUntypedAdapted)
EventsourcedRunning.PersistingEvents(setup, state, pendingInvocations, sideEffects) newState
} else Behaviors.same } else state
} }
protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {

View file

@ -177,7 +177,7 @@ private[persistence] class EventsourcedRecoveringEvents[C, E, S](
returnRecoveryPermit(setup, "recovery completed successfully") returnRecoveryPermit(setup, "recovery completed successfully")
setup.recoveryCompleted(setup.commandContext, state.state) setup.recoveryCompleted(setup.commandContext, state.state)
val running = EventsourcedRunning.HandlingCommands[C, E, S]( val running = EventsourcedRunning[C, E, S](
setup, setup,
EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state) EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state)
) )

View file

@ -65,250 +65,246 @@ import scala.collection.immutable
} }
} }
// =============================================== def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] =
new EventsourcedRunning(setup).handlingCommands(state)
}
object HandlingCommands { // ===============================================
def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] =
new HandlingCommands(setup).createBehavior(state)
}
class HandlingCommands[C, E, S](override val setup: EventsourcedSetup[C, E, S]) @InternalApi private[akka] class EventsourcedRunning[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import EventsourcedRunning.EventsourcedState
def createBehavior(state: EventsourcedState[S]): Behavior[InternalProtocol] = { def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = {
withMdc { withMdc("run-cmnds") {
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
case (_, SnapshotterResponse(r)) Behaviors.unhandled case (_, SnapshotterResponse(r)) Behaviors.unhandled
case (_, JournalResponse(r)) Behaviors.unhandled case (_, JournalResponse(r)) Behaviors.unhandled
case (_, IncomingCommand(c: C @unchecked)) onCommand(state, c) case (_, IncomingCommand(c: C @unchecked)) onCommand(state, c)
}
} }
} }
}
private def withMdc(wrapped: Behavior[InternalProtocol]) = { private def withMdc(phase: String)(wrapped: Behavior[InternalProtocol]) = {
val mdc = Map( val mdc = Map(
"persistenceId" setup.persistenceId, "persistenceId" setup.persistenceId,
"phase" "run-cmnds" "phase" phase
) )
Behaviors.withMdc((_: Any) mdc, wrapped) Behaviors.withMdc((_: Any) mdc, wrapped)
} }
private def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { private def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(setup.commandContext, state.state, cmd) val effect = setup.commandHandler(setup.commandContext, state.state, cmd)
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
} }
@tailrec private def applyEffects( @tailrec private def applyEffects(
msg: Any, msg: Any,
state: EventsourcedState[S], state: EventsourcedState[S],
effect: EffectImpl[E, S], effect: EffectImpl[E, S],
sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil
): Behavior[InternalProtocol] = { ): Behavior[InternalProtocol] = {
import setup.log import setup.log
if (log.isDebugEnabled) if (log.isDebugEnabled)
log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
effect match { effect match {
case CompositeEffect(eff, currentSideEffects) case CompositeEffect(eff, currentSideEffects)
// unwrap and accumulate effects // unwrap and accumulate effects
applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) applyEffects(msg, state, eff, currentSideEffects ++ sideEffects)
case Persist(event) case Persist(event)
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
val newState = state.applyEvent(setup, event)
val eventToPersist = tagEvent(event)
val newState2 = internalPersist(newState, eventToPersist)
val handler: Any Unit = { x // TODO is x the new state?
if (setup.snapshotWhen(newState2.state, event, newState2.seqNr))
internalSaveSnapshot(state)
}
val pendingInvocations = StashingHandlerInvocation(event, handler) :: Nil
// FIXME applySideEffects is missing
persistingEvents(newState2, pendingInvocations, sideEffects)
case PersistAll(events)
if (events.nonEmpty) {
// apply the event before persist so that validation exception is handled before persisting // apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler. // the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event // also, ensure that there is an event handler for each single event
val newState = state.applyEvent(setup, event) var count = events.size
val eventToPersist = tagEvent(event) // var seqNr = state.seqNr
val (newState, shouldSnapshotAfterPersist) =
events.foldLeft((state, false)) {
case ((currentState, snapshot), event)
val value = currentState
.nextSequenceNr() // FIXME seqNr is also incremented in internalPersistAll
.applyEvent(setup, event)
internalPersist(state, eventToPersist, sideEffects) { _ val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr)
if (setup.snapshotWhen(newState.state, event, newState.seqNr)) (value, shouldSnapshot)
internalSaveSnapshot(state) }
}
val eventsToPersist = events.map(tagEvent)
case PersistAll(events)
if (events.nonEmpty) { val newState2 = internalPersistAll(eventsToPersist, newState)
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler. val handler: Any Unit = { _
// also, ensure that there is an event handler for each single event count -= 1
var count = events.size if (count == 0) {
// var seqNr = state.seqNr // // FIXME the result of applying side effects is ignored
val (newState, shouldSnapshotAfterPersist) = // val b = applySideEffects(sideEffects, newState)
events.foldLeft((state, false)) { if (shouldSnapshotAfterPersist)
case ((currentState, snapshot), event) internalSaveSnapshot(newState)
val value = currentState
.nextSequenceNr()
.applyEvent(setup, event)
val shouldSnapshot = snapshot || setup.snapshotWhen(value.state, event, value.seqNr)
(value, shouldSnapshot)
}
// state = newState
val eventsToPersist = events.map(tagEvent)
internalPersistAll(eventsToPersist, newState, sideEffects) { _
count -= 1
if (count == 0) {
// FIXME the result of applying side effects is ignored
val b = applySideEffects(sideEffects, newState)
if (shouldSnapshotAfterPersist)
internalSaveSnapshot(newState)
}
} }
} else {
// run side-effects even when no events are emitted
tryUnstash(applySideEffects(sideEffects, state))
} }
case _: PersistNothing.type @unchecked val pendingInvocations = events map { event
StashingHandlerInvocation(event, handler)
}
persistingEvents(newState2, pendingInvocations, sideEffects)
} else {
// run side-effects even when no events are emitted
tryUnstash(applySideEffects(sideEffects, state)) tryUnstash(applySideEffects(sideEffects, state))
}
case _: Unhandled.type @unchecked case _: PersistNothing.type @unchecked
applySideEffects(sideEffects, state) tryUnstash(applySideEffects(sideEffects, state))
Behavior.unhandled
case c: ChainableEffect[_, S] case _: Unhandled.type @unchecked
applySideEffect(c, state) applySideEffects(sideEffects, state)
} Behavior.unhandled
}
case c: ChainableEffect[_, S]
private def tagEvent(event: E): Any = { applySideEffect(c, state)
val tags = setup.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
} }
}
private def tagEvent(event: E): Any = {
val tags = setup.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
} }
// =============================================== // ===============================================
object PersistingEvents { def persistingEvents(
def apply[C, E, S]( state: EventsourcedState[S],
setup: EventsourcedSetup[C, E, S], pendingInvocations: immutable.Seq[PendingHandlerInvocation],
state: EventsourcedState[S], sideEffects: immutable.Seq[ChainableEffect[_, S]]
pendingInvocations: immutable.Seq[PendingHandlerInvocation], ): Behavior[InternalProtocol] = {
sideEffects: immutable.Seq[ChainableEffect[_, S]] withMdc {
): Behavior[InternalProtocol] = Behaviors.immutable[EventsourcedBehavior.InternalProtocol] {
new PersistingEvents(setup).createBehavior(state, pendingInvocations, sideEffects) case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)
case (_, JournalResponse(r)) onJournalResponse(state, pendingInvocations, sideEffects, r)
case (_, in: IncomingCommand[C @unchecked]) onCommand(state, in)
}
}
} }
class PersistingEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S]) private def withMdc(wrapped: Behavior[InternalProtocol]) = {
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { val mdc = Map(
"persistenceId" setup.persistenceId,
"phase" "run-persist-evnts"
)
def createBehavior( Behaviors.withMdc((_: Any) mdc, wrapped)
state: EventsourcedState[S], }
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]] def onCommand(state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
): Behavior[InternalProtocol] = { stash(cmd)
withMdc { Behaviors.same
Behaviors.immutable[EventsourcedBehavior.InternalProtocol] { }
case (_, SnapshotterResponse(r)) onSnapshotterResponse(r)
case (_, JournalResponse(r)) onJournalResponse(state, pendingInvocations, sideEffects, r) final def onJournalResponse(
case (_, in: IncomingCommand[C @unchecked]) onCommand(state, in) state: EventsourcedState[S],
} pendingInvocations: immutable.Seq[PendingHandlerInvocation],
} sideEffects: immutable.Seq[ChainableEffect[_, S]],
response: Response): Behavior[InternalProtocol] = {
setup.log.debug("Received Journal response: {}", response)
response match {
case WriteMessageSuccess(p, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == setup.writerIdentity.instanceId) {
val newState = state.popApplyPendingInvocation(p)
// only once all things are applied we can revert back
if (newState.pendingInvocations.nonEmpty) Behaviors.same
else tryUnstash(applySideEffects(sideEffects, newState))
} else Behaviors.same
case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) {
val newState = state.updateLastSequenceNr(p)
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design)
tryUnstash(applySideEffects(sideEffects, newState))
} else Behaviors.same
case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) {
// onWriteMessageComplete() -> tryBecomeHandlingCommands
onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
} else Behaviors.same
case WriteMessagesSuccessful
// ignore
Behaviors.same
case WriteMessagesFailed(_)
// ignore
Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _: LoopMessageSuccess
// ignore, should never happen as there is no persistAsync in typed
Behaviors.same
} }
}
private def withMdc(wrapped: Behavior[InternalProtocol]) = { // private def onWriteMessageComplete(): Unit =
val mdc = Map( // tryBecomeHandlingCommands()
"persistenceId" setup.persistenceId,
"phase" "run-persist-evnts"
)
Behaviors.withMdc((_: Any) mdc, wrapped) private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
setup.log.error(
cause,
"Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage)
}
private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = {
setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, setup.persistenceId)
// FIXME see #24479 for reconsidering the stopping behaviour
Behaviors.stopped
}
private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
response match {
case SaveSnapshotSuccess(meta)
setup.context.log.debug("Save snapshot successful: " + meta)
Behaviors.same
case SaveSnapshotFailure(meta, ex)
setup.context.log.error(ex, "Save snapshot failed! " + meta)
Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
} }
def onCommand(state: EventsourcedRunning.EventsourcedState[S], cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
stash(cmd)
Behaviors.same
}
final def onJournalResponse(
state: EventsourcedState[S],
pendingInvocations: immutable.Seq[PendingHandlerInvocation],
sideEffects: immutable.Seq[ChainableEffect[_, S]],
response: Response): Behavior[InternalProtocol] = {
setup.log.debug("Received Journal response: {}", response)
response match {
case WriteMessageSuccess(p, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == setup.writerIdentity.instanceId) {
val newState = state.popApplyPendingInvocation(p)
// only once all things are applied we can revert back
if (newState.pendingInvocations.nonEmpty) Behaviors.same
else tryUnstash(applySideEffects(sideEffects, newState))
} else Behaviors.same
case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) {
val newState = state.updateLastSequenceNr(p)
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design)
tryUnstash(applySideEffects(sideEffects, newState))
} else Behaviors.same
case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == setup.writerIdentity.instanceId) {
// onWriteMessageComplete() -> tryBecomeHandlingCommands
onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
} else Behaviors.same
case WriteMessagesSuccessful
// ignore
Behaviors.same
case WriteMessagesFailed(_)
// ignore
Behaviors.same // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _: LoopMessageSuccess
// ignore, should never happen as there is no persistAsync in typed
Behaviors.same
}
}
// private def onWriteMessageComplete(): Unit =
// tryBecomeHandlingCommands()
private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
setup.log.error(
cause,
"Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage)
}
private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = {
setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, setup.persistenceId)
// FIXME see #24479 for reconsidering the stopping behaviour
Behaviors.stopped
}
private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
response match {
case SaveSnapshotSuccess(meta)
setup.context.log.debug("Save snapshot successful: " + meta)
Behaviors.same
case SaveSnapshotFailure(meta, ex)
setup.context.log.error(ex, "Save snapshot failed! " + meta)
Behaviors.same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
}
}
} }
// -------------------------- // --------------------------
def applySideEffects[S](effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = { def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]], state: EventsourcedState[S]): Behavior[InternalProtocol] = {
var res: Behavior[InternalProtocol] = Behaviors.same var res: Behavior[InternalProtocol] = Behaviors.same
val it = effects.iterator val it = effects.iterator
@ -325,7 +321,7 @@ import scala.collection.immutable
res res
} }
def applySideEffect[S](effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { def applySideEffect(effect: ChainableEffect[_, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match {
case _: Stop.type @unchecked case _: Stop.type @unchecked
Behaviors.stopped Behaviors.stopped