save snapshot after replaying events if retention criteria indicates missing snapshot #29631
This commit is contained in:
parent
028ad29164
commit
b34f9eb445
5 changed files with 78 additions and 24 deletions
|
|
@ -0,0 +1,7 @@
|
|||
# modified state of internal class
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.copy")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.apply")
|
||||
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.ReplayingEvents#ReplayingState.unapply")
|
||||
# moved constructor of internal class
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.typed.internal.Running.apply")
|
||||
|
|
@ -20,13 +20,17 @@ import akka.persistence.typed.RecoveryCompleted
|
|||
import akka.persistence.typed.RecoveryFailed
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.persistence.typed.SingleEventSeq
|
||||
import akka.persistence.typed.internal.BehaviorSetup.SnapshotWithoutRetention
|
||||
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState }
|
||||
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
|
||||
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
|
||||
import akka.persistence.typed.internal.Running.startReplicationStream
|
||||
import akka.util.OptionVal
|
||||
import akka.util.PrettyDuration._
|
||||
import akka.util.unused
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
/***
|
||||
* INTERNAL API
|
||||
*
|
||||
|
|
@ -53,7 +57,8 @@ private[akka] object ReplayingEvents {
|
|||
receivedPoisonPill: Boolean,
|
||||
recoveryStartTime: Long,
|
||||
version: VersionVector,
|
||||
seenSeqNrPerReplica: Map[ReplicaId, Long])
|
||||
seenSeqNrPerReplica: Map[ReplicaId, Long],
|
||||
eventsReplayed: Long)
|
||||
|
||||
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
|
||||
Behaviors.setup { _ =>
|
||||
|
|
@ -121,7 +126,7 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
|
||||
def handleEvent(event: E): Unit = {
|
||||
eventForErrorReporting = OptionVal.Some(event)
|
||||
state = state.copy(seqNr = repr.sequenceNr)
|
||||
state = state.copy(seqNr = repr.sequenceNr, eventsReplayed = state.eventsReplayed + 1)
|
||||
|
||||
val replicatedMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] =
|
||||
setup.replication match {
|
||||
|
|
@ -279,18 +284,25 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
|
||||
Behaviors.stopped
|
||||
else {
|
||||
val running =
|
||||
Running[C, E, S](
|
||||
setup,
|
||||
Running.RunningState[S](
|
||||
seqNr = state.seqNr,
|
||||
state = state.state,
|
||||
receivedPoisonPill = state.receivedPoisonPill,
|
||||
state.version,
|
||||
seenPerReplica = state.seenSeqNrPerReplica,
|
||||
replicationControl = Map.empty))
|
||||
|
||||
tryUnstashOne(running)
|
||||
val runningState = Running.RunningState[S](
|
||||
seqNr = state.seqNr,
|
||||
state = state.state,
|
||||
receivedPoisonPill = state.receivedPoisonPill,
|
||||
state.version,
|
||||
seenPerReplica = state.seenSeqNrPerReplica,
|
||||
replicationControl = Map.empty)
|
||||
val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
|
||||
val initialRunningState = setup.replication match {
|
||||
case Some(replication) => startReplicationStream(setup, runningState, replication)
|
||||
case None => runningState
|
||||
}
|
||||
setup.retention match {
|
||||
case criteria: SnapshotCountRetentionCriteriaImpl if criteria.snapshotEveryNEvents <= state.eventsReplayed =>
|
||||
internalSaveSnapshot(initialRunningState)
|
||||
new running.StoringSnapshot(initialRunningState, immutable.Seq.empty, SnapshotWithoutRetention)
|
||||
case _ =>
|
||||
tryUnstashOne(new running.HandlingCommands(initialRunningState))
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
setup.cancelRecoveryTimer()
|
||||
|
|
|
|||
|
|
@ -173,7 +173,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
|
|||
receivedPoisonPill,
|
||||
System.nanoTime(),
|
||||
version,
|
||||
seenPerReplica))
|
||||
seenPerReplica,
|
||||
eventsReplayed = 0))
|
||||
|
||||
case LoadSnapshotFailed(cause) =>
|
||||
onRecoveryFailure(cause)
|
||||
|
|
|
|||
|
|
@ -108,15 +108,6 @@ private[akka] object Running {
|
|||
}
|
||||
}
|
||||
|
||||
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = {
|
||||
val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
|
||||
val initialState = setup.replication match {
|
||||
case Some(replication) => startReplicationStream(setup, state, replication)
|
||||
case None => state
|
||||
}
|
||||
new running.HandlingCommands(initialState)
|
||||
}
|
||||
|
||||
def startReplicationStream[C, E, S](
|
||||
setup: BehaviorSetup[C, E, S],
|
||||
state: RunningState[S],
|
||||
|
|
|
|||
|
|
@ -557,5 +557,48 @@ class EventSourcedBehaviorRetentionSpec
|
|||
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
|
||||
}
|
||||
|
||||
"snapshot on recovery if expected snapshot is missing" in {
|
||||
val pid = nextPid()
|
||||
val snapshotSignalProbe = TestProbe[WrappedSignal]()
|
||||
|
||||
{
|
||||
val persistentActor =
|
||||
spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))))
|
||||
(1 to 5).foreach(_ => persistentActor ! Increment)
|
||||
snapshotSignalProbe.expectNoMessage()
|
||||
|
||||
persistentActor ! StopIt
|
||||
val watchProbe = TestProbe()
|
||||
watchProbe.expectTerminated(persistentActor)
|
||||
}
|
||||
|
||||
{
|
||||
val persistentActor = spawn(
|
||||
Behaviors.setup[Command](ctx =>
|
||||
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))
|
||||
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))
|
||||
|
||||
val replyProbe = TestProbe[State]()
|
||||
persistentActor ! GetValue(replyProbe.ref)
|
||||
snapshotSignalProbe.expectSnapshotCompleted(5)
|
||||
replyProbe.expectMessage(State(5, Vector(0, 1, 2, 3, 4)))
|
||||
|
||||
persistentActor ! StopIt
|
||||
val watchProbe = TestProbe()
|
||||
watchProbe.expectTerminated(persistentActor)
|
||||
}
|
||||
|
||||
{
|
||||
val persistentActor = spawn(
|
||||
Behaviors.setup[Command](ctx =>
|
||||
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))
|
||||
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))
|
||||
|
||||
val replyProbe = TestProbe[State]()
|
||||
persistentActor ! GetValue(replyProbe.ref)
|
||||
snapshotSignalProbe.expectNoMessage()
|
||||
replyProbe.expectMessage(State(5, Vector(0, 1, 2, 3, 4)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue