Persistence typed context (#27404)

* Add context parameters to persistence typed
* Add hooks to capture event-written
This commit is contained in:
Arnout Engelen 2019-07-24 03:47:25 +02:00 committed by Helena Edelson
parent 743c542e18
commit c4d4dd3875
2 changed files with 25 additions and 3 deletions

View file

@ -14,10 +14,14 @@ import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.persistence._
import akka.util.unused
/** INTERNAL API */
@InternalApi
private[akka] trait JournalInteractions[C, E, S] {
@ -27,6 +31,8 @@ private[akka] trait JournalInteractions[C, E, S] {
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
protected def internalPersist(
ctx: ActorContext[_],
cmd: Any,
state: Running.RunningState[S],
event: EventOrTagged,
eventAdapterManifest: String): Running.RunningState[S] = {
@ -41,6 +47,8 @@ private[akka] trait JournalInteractions[C, E, S] {
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
onWriteInitiated(ctx, cmd, repr)
val write = AtomicWrite(repr) :: Nil
setup.journal
.tell(JournalProtocol.WriteMessages(write, setup.selfUntyped, setup.writerIdentity.instanceId), setup.selfUntyped)
@ -48,6 +56,12 @@ private[akka] trait JournalInteractions[C, E, S] {
newState
}
@InternalStableApi
private[akka] def onWriteInitiated(
@unused ctx: ActorContext[_],
@unused cmd: Any,
@unused repr: PersistentRepr): Unit = ()
protected def internalPersistAll(
state: Running.RunningState[S],
events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {

View file

@ -150,7 +150,7 @@ private[akka] object Running {
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest)
val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest)
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
@ -265,12 +265,18 @@ private[akka] object Running {
state = state.updateLastSequenceNr(p)
eventCounter += 1
onWriteSuccess(setup.context, p)
// only once all things are applied we can revert back
if (eventCounter < numberOfEvents) this
else {
visibleState = state
if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) {
tryUnstashOne(applySideEffects(sideEffects, state))
val newState = applySideEffects(sideEffects, state)
onWriteDone(setup.context, p)
tryUnstashOne(newState)
} else {
internalSaveSnapshot(state)
storingSnapshot(state, sideEffects, shouldSnapshotAfterPersist)
@ -516,5 +522,7 @@ private[akka] object Running {
@unused reason: Throwable,
@unused event: Any,
@unused sequenceNr: Long): Unit = ()
@InternalStableApi
private[akka] def onWriteSuccess(@unused ctx: ActorContext[_], @unused event: PersistentRepr): Unit = ()
private[akka] def onWriteDone(@unused ctx: ActorContext[_], @unused event: PersistentRepr): Unit = ()
}