From c4d4dd3875123e7a51cdd79de93258cd0b849288 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 24 Jul 2019 03:47:25 +0200 Subject: [PATCH] Persistence typed context (#27404) * Add context parameters to persistence typed * Add hooks to capture event-written --- .../typed/internal/ExternalInteractions.scala | 14 ++++++++++++++ .../akka/persistence/typed/internal/Running.scala | 14 +++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index ff7bb3c23f..095b6ad2da 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -14,10 +14,14 @@ import akka.actor.typed.Signal import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi +import akka.annotation.InternalStableApi + import akka.persistence.JournalProtocol.ReplayMessages import akka.persistence.SnapshotProtocol.LoadSnapshot import akka.persistence._ +import akka.util.unused + /** INTERNAL API */ @InternalApi private[akka] trait JournalInteractions[C, E, S] { @@ -27,6 +31,8 @@ private[akka] trait JournalInteractions[C, E, S] { type EventOrTagged = Any // `Any` since can be `E` or `Tagged` protected def internalPersist( + ctx: ActorContext[_], + cmd: Any, state: Running.RunningState[S], event: EventOrTagged, eventAdapterManifest: String): Running.RunningState[S] = { @@ -41,6 +47,8 @@ private[akka] trait JournalInteractions[C, E, S] { writerUuid = setup.writerIdentity.writerUuid, sender = ActorRef.noSender) + onWriteInitiated(ctx, cmd, repr) + val write = AtomicWrite(repr) :: Nil setup.journal .tell(JournalProtocol.WriteMessages(write, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped) @@ -48,6 +56,12 @@ private[akka] trait JournalInteractions[C, E, S] { newState } + @InternalStableApi + private[akka] def onWriteInitiated( + @unused ctx: ActorContext[_], + @unused cmd: Any, + @unused repr: PersistentRepr): Unit = () + protected def internalPersistAll( state: Running.RunningState[S], events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 3597ace0c2..dd389102cd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -150,7 +150,7 @@ private[akka] object Running { val eventToPersist = adaptEvent(event) val eventAdapterManifest = setup.eventAdapter.manifest(event) - val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest) + val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) @@ -265,12 +265,18 @@ private[akka] object Running { state = state.updateLastSequenceNr(p) eventCounter += 1 + onWriteSuccess(setup.context, p) + // only once all things are applied we can revert back if (eventCounter < numberOfEvents) this else { visibleState = state if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) { - tryUnstashOne(applySideEffects(sideEffects, state)) + val newState = applySideEffects(sideEffects, state) + + onWriteDone(setup.context, p) + + tryUnstashOne(newState) } else { internalSaveSnapshot(state) storingSnapshot(state, sideEffects, shouldSnapshotAfterPersist) @@ -516,5 +522,7 @@ private[akka] object Running { @unused reason: Throwable, @unused event: Any, @unused sequenceNr: Long): Unit = () - + @InternalStableApi + private[akka] def onWriteSuccess(@unused ctx: ActorContext[_], @unused event: PersistentRepr): Unit = () + private[akka] def onWriteDone(@unused ctx: ActorContext[_], @unused event: PersistentRepr): Unit = () }