Hook in akka-persistence-typed (#27464)

This commit is contained in:
Arnout Engelen 2019-08-08 10:25:05 +02:00 committed by GitHub
parent 38cda5147f
commit 70a0af1036
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 3 deletions

View file

@ -63,6 +63,8 @@ private[akka] trait JournalInteractions[C, E, S] {
@unused repr: PersistentRepr): Unit = () @unused repr: PersistentRepr): Unit = ()
protected def internalPersistAll( protected def internalPersistAll(
ctx: ActorContext[_],
cmd: Any,
state: Running.RunningState[S], state: Running.RunningState[S],
events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = { events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {
if (events.nonEmpty) { if (events.nonEmpty) {
@ -79,6 +81,8 @@ private[akka] trait JournalInteractions[C, E, S] {
writerUuid = setup.writerIdentity.writerUuid, writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender) sender = ActorRef.noSender)
} }
onWritesInitiated(ctx, cmd, writes)
val write = AtomicWrite(writes) val write = AtomicWrite(writes)
setup.journal.tell( setup.journal.tell(
@ -89,6 +93,12 @@ private[akka] trait JournalInteractions[C, E, S] {
} else state } else state
} }
@InternalStableApi
private[akka] def onWritesInitiated(
@unused ctx: ActorContext[_],
@unused cmd: Any,
@unused repr: immutable.Seq[PersistentRepr]): Unit = ()
protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
setup.journal ! ReplayMessages( setup.journal ! ReplayMessages(

View file

@ -172,7 +172,7 @@ private[akka] object Running {
val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt))) val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))
val newState2 = internalPersistAll(newState, eventsToPersist) val newState2 = internalPersistAll(setup.context, msg, newState, eventsToPersist)
persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects) persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)
@ -268,8 +268,10 @@ private[akka] object Running {
onWriteSuccess(setup.context, p) onWriteSuccess(setup.context, p)
// only once all things are applied we can revert back // only once all things are applied we can revert back
if (eventCounter < numberOfEvents) this if (eventCounter < numberOfEvents) {
else { onWriteDone(setup.context, p)
this
} else {
visibleState = state visibleState = state
if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) { if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) {
val newState = applySideEffects(sideEffects, state) val newState = applySideEffects(sideEffects, state)