From b34f9eb445a25d33209aa63f9a6651d4688a24ae Mon Sep 17 00:00:00 2001 From: Domantas Petrauskas Date: Wed, 4 Nov 2020 10:32:17 +0200 Subject: [PATCH] save snapshot after replaying events if retention criteria indicates missing snapshot #29631 --- ...29672-snapshot-after-event-replay.excludes | 7 +++ .../typed/internal/ReplayingEvents.scala | 40 +++++++++++------ .../typed/internal/ReplayingSnapshot.scala | 3 +- .../persistence/typed/internal/Running.scala | 9 ---- .../EventSourcedBehaviorRetentionSpec.scala | 43 +++++++++++++++++++ 5 files changed, 78 insertions(+), 24 deletions(-) create mode 100644 akka-persistence-typed/src/main/mima-filters/2.6.10.backwards.excludes/pr-29672-snapshot-after-event-replay.excludes diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.10.backwards.excludes/pr-29672-snapshot-after-event-replay.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.10.backwards.excludes/pr-29672-snapshot-after-event-replay.excludes new file mode 100644 index 0000000000..f60ceec622 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.10.backwards.excludes/pr-29672-snapshot-after-event-replay.excludes @@ -0,0 +1,7 @@ +# modified state of internal class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.unapply") +# moved constructor of internal class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.Running.apply") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index a9d62c7ba6..97c11830ae 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -20,13 +20,17 @@ import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryFailed import akka.persistence.typed.ReplicaId import akka.persistence.typed.SingleEventSeq +import akka.persistence.typed.internal.BehaviorSetup.SnapshotWithoutRetention import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState } import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible +import akka.persistence.typed.internal.Running.startReplicationStream import akka.util.OptionVal import akka.util.PrettyDuration._ import akka.util.unused +import scala.collection.immutable + /*** * INTERNAL API * @@ -53,7 +57,8 @@ private[akka] object ReplayingEvents { receivedPoisonPill: Boolean, recoveryStartTime: Long, version: VersionVector, - seenSeqNrPerReplica: Map[ReplicaId, Long]) + seenSeqNrPerReplica: Map[ReplicaId, Long], + eventsReplayed: Long) def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = Behaviors.setup { _ => @@ -121,7 +126,7 @@ private[akka] final class ReplayingEvents[C, E, S]( def handleEvent(event: E): Unit = { eventForErrorReporting = OptionVal.Some(event) - state = state.copy(seqNr = repr.sequenceNr) + state = state.copy(seqNr = repr.sequenceNr, eventsReplayed = state.eventsReplayed + 1) val replicatedMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] = setup.replication match { @@ -279,18 +284,25 @@ private[akka] final class ReplayingEvents[C, E, S]( if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else { - val running = - Running[C, E, S]( - setup, - Running.RunningState[S]( - seqNr = state.seqNr, - state = state.state, - receivedPoisonPill = state.receivedPoisonPill, - state.version, - seenPerReplica = state.seenSeqNrPerReplica, - replicationControl = Map.empty)) - - tryUnstashOne(running) + val runningState = Running.RunningState[S]( + seqNr = state.seqNr, + state = state.state, + receivedPoisonPill = state.receivedPoisonPill, + state.version, + seenPerReplica = state.seenSeqNrPerReplica, + replicationControl = Map.empty) + val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) + val initialRunningState = setup.replication match { + case Some(replication) => startReplicationStream(setup, runningState, replication) + case None => runningState + } + setup.retention match { + case criteria: SnapshotCountRetentionCriteriaImpl if criteria.snapshotEveryNEvents <= state.eventsReplayed => + internalSaveSnapshot(initialRunningState) + new running.StoringSnapshot(initialRunningState, immutable.Seq.empty, SnapshotWithoutRetention) + case _ => + tryUnstashOne(new running.HandlingCommands(initialRunningState)) + } } } finally { setup.cancelRecoveryTimer() diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 4257995eb1..f611b87464 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -173,7 +173,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup receivedPoisonPill, System.nanoTime(), version, - seenPerReplica)) + seenPerReplica, + eventsReplayed = 0)) case LoadSnapshotFailed(cause) => onRecoveryFailure(cause) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index dfcd3cb2f4..659b138d9e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -108,15 +108,6 @@ private[akka] object Running { } } - def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = { - val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) - val initialState = setup.replication match { - case Some(replication) => startReplicationStream(setup, state, replication) - case None => state - } - new running.HandlingCommands(initialState) - } - def startReplicationStream[C, E, S]( setup: BehaviorSetup[C, E, S], state: RunningState[S], diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala index 6e47cd74f8..8709b9f18f 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala @@ -557,5 +557,48 @@ class EventSourcedBehaviorRetentionSpec snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) } + "snapshot on recovery if expected snapshot is missing" in { + val pid = nextPid() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + + { + val persistentActor = + spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)))) + (1 to 5).foreach(_ => persistentActor ! Increment) + snapshotSignalProbe.expectNoMessage() + + persistentActor ! StopIt + val watchProbe = TestProbe() + watchProbe.expectTerminated(persistentActor) + } + + { + val persistentActor = spawn( + Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) + + val replyProbe = TestProbe[State]() + persistentActor ! GetValue(replyProbe.ref) + snapshotSignalProbe.expectSnapshotCompleted(5) + replyProbe.expectMessage(State(5, Vector(0, 1, 2, 3, 4))) + + persistentActor ! StopIt + val watchProbe = TestProbe() + watchProbe.expectTerminated(persistentActor) + } + + { + val persistentActor = spawn( + Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) + + val replyProbe = TestProbe[State]() + persistentActor ! GetValue(replyProbe.ref) + snapshotSignalProbe.expectNoMessage() + replyProbe.expectMessage(State(5, Vector(0, 1, 2, 3, 4))) + } + } } }