Merge pull request #26595 from akka/wip-26584-snap-delete-patriknw

Typed Persistence: fix snapshot deletion algorithm, #26584
This commit is contained in:
Patrik Nordwall 2019-03-22 15:10:28 +01:00 committed by GitHub
commit 2c1d721fb7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 156 deletions

View file

@ -72,14 +72,14 @@ final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwabl
def getTarget(): DeletionTarget = target
}
final case class DeleteMessagesCompleted(toSequenceNr: Long) extends EventSourcedSignal {
final case class DeleteEventsCompleted(toSequenceNr: Long) extends EventSourcedSignal {
/**
* Java API
*/
def getToSequenceNr(): Long = toSequenceNr
}
final case class DeleteMessagesFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
final case class DeleteEventsFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
/**
* Java API

View file

@ -32,7 +32,7 @@ final case class RetentionCriteria(snapshotEveryNEvents: Long, keepNSnapshots: L
*/
def toSequenceNumber(lastSequenceNr: Long): Long = {
// Delete old events, retain the latest
lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents)
math.max(0, lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents))
}
}

View file

@ -20,7 +20,7 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.DeleteMessagesFailed
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeletionTarget
@ -96,7 +96,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
ctx.log.warning("Failed to delete snapshot with meta [{}] due to [{}].", meta, failure)
case DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), failure) =>
ctx.log.warning("Failed to delete snapshots given criteria [{}] due to [{}].", criteria, failure)
case DeleteMessagesFailed(toSequenceNr, failure) =>
case DeleteEventsFailed(toSequenceNr, failure) =>
ctx.log.warning("Failed to delete messages toSequenceNr [{}] due to [{}].", toSequenceNr, failure)
}

View file

@ -143,6 +143,7 @@ private[akka] trait SnapshotInteractions[C, E, S] {
}
protected def internalSaveSnapshot(state: Running.RunningState[S]): Unit = {
setup.log.debug("Saving snapshot sequenceNr [{}]", state.seqNr)
if (state.state == null)
throw new IllegalStateException("A snapshot must not be a null state.")
else
@ -151,16 +152,17 @@ private[akka] trait SnapshotInteractions[C, E, S] {
setup.selfUntyped)
}
/** Deletes the snapshot identified by `sequenceNr`. */
/** Deletes the snapshots up to and including the `sequenceNr`. */
protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
setup.log.debug("Deleting snapshot to [{}]", toSequenceNr)
val deleteTo = toSequenceNr - 1
val deleteFrom = math.max(0, setup.retention.toSequenceNumber(deleteTo))
val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = deleteFrom, maxSequenceNr = deleteTo)
setup.log.debug("Deleting snapshots from [{}] to [{}]", deleteFrom, deleteTo)
setup.snapshotStore
.tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfUntyped)
if (toSequenceNr > 0) {
// We could use 0 as fromSequenceNr to delete all older snapshots, but that might be inefficient for
// large ranges depending on how it's implemented in the snapshot plugin. Therefore we use the
// same window as defined for how much to keep in the retention criteria
val fromSequenceNr = setup.retention.toSequenceNumber(toSequenceNr)
val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = fromSequenceNr, maxSequenceNr = toSequenceNr)
setup.log.debug("Deleting snapshots from sequenceNr [{}] to [{}]", fromSequenceNr, toSequenceNr)
setup.snapshotStore
.tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfUntyped)
}
}
}

View file

@ -20,8 +20,8 @@ import akka.persistence.journal.Tagged
import akka.persistence.typed.Callback
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeleteMessagesCompleted
import akka.persistence.typed.DeleteMessagesFailed
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventRejectedException
import akka.persistence.typed.SideEffect
@ -97,8 +97,8 @@ private[akka] object Running {
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case SnapshotterResponse(r) => onSnapshotterResponse(r)
case JournalResponse(r) => onJournalResponse(r)
case JournalResponse(r) => onDeleteEventsJournalResponse(r)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r)
case _ => Behaviors.unhandled
}
@ -113,53 +113,6 @@ private[akka] object Running {
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
/** Handle journal responses for non-persist events workloads. */
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
val signal = response match {
case DeleteMessagesSuccess(toSequenceNr) =>
setup.log.debug("Persistent messages to [{}] deleted successfully.", toSequenceNr)
internalDeleteSnapshots(toSequenceNr)
Some(DeleteMessagesCompleted(toSequenceNr))
case DeleteMessagesFailure(e, toSequenceNr) =>
Some(DeleteMessagesFailed(toSequenceNr, e))
case _ =>
None
}
signal match {
case Some(sig) =>
setup.onSignal(sig)
this
case None =>
Behaviors.unhandled // unexpected journal response
}
}
/** Handle snapshot responses for non-persist events workloads. */
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
val signal = response match {
case DeleteSnapshotsSuccess(criteria) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria)))
case DeleteSnapshotsFailure(criteria, error) =>
Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), error))
case DeleteSnapshotSuccess(meta) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(meta)))
case DeleteSnapshotFailure(meta, error) =>
Some(DeleteSnapshotsFailed(DeletionTarget.Individual(meta), error))
case _ =>
None
}
signal match {
case Some(sig) =>
setup.onSignal(sig)
this
case None =>
Behaviors.unhandled // unexpected snapshot response
}
}
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],
@ -270,11 +223,9 @@ private[akka] object Running {
msg match {
case JournalResponse(r) => onJournalResponse(r)
case in: IncomingCommand[C @unchecked] => onCommand(in)
case SnapshotterResponse(r) =>
setup.log.warning("Unexpected SnapshotterResponse {}", r)
Behaviors.unhandled
case RecoveryTickEvent(_) => Behaviors.unhandled
case RecoveryPermitGranted => Behaviors.unhandled
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r)
case RecoveryTickEvent(_) => Behaviors.unhandled
case RecoveryPermitGranted => Behaviors.unhandled
}
}
@ -332,8 +283,7 @@ private[akka] object Running {
this // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _ =>
// ignore all other messages, since they relate to recovery handling which we're not dealing with in Running phase
Behaviors.unhandled
onDeleteEventsJournalResponse(response)
}
}
@ -369,9 +319,9 @@ private[akka] object Running {
// # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess.
setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta)
if (setup.retention.deleteEventsOnSnapshot)
internalDeleteEvents(e, state) // if successful, DeleteMessagesSuccess then internalDeleteSnapshots
internalDeleteEvents(e, state)
else
internalDeleteSnapshots(meta.sequenceNr)
internalDeleteSnapshots(setup.retention.toSequenceNumber(meta.sequenceNr))
Some(SnapshotCompleted(meta))
@ -380,17 +330,20 @@ private[akka] object Running {
Some(SnapshotFailed(meta, error))
case _ =>
onDeleteSnapshotResponse(response)
None
}
setup.log.debug("Received snapshot event [{}], returning signal [{}].", response, signal)
signal.foreach(setup.onSignal _)
setup.log.debug("Received snapshot response [{}], emitting signal [{}].", response, signal)
signal.foreach(setup.onSignal)
}
Behaviors
.receiveMessage[InternalProtocol] {
case cmd: IncomingCommand[C] @unchecked =>
onCommand(cmd)
case JournalResponse(r) =>
onDeleteEventsJournalResponse(r)
case SnapshotterResponse(r) =>
onSnapshotterResponse(r)
tryUnstashOne(applySideEffects(sideEffects, state))
@ -445,4 +398,59 @@ private[akka] object Running {
}
}
/**
* Handle journal responses for non-persist events workloads.
* These are performed in the background and may happen in all phases.
*/
def onDeleteEventsJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
val signal = response match {
case DeleteMessagesSuccess(toSequenceNr) =>
setup.log.debug("Persistent events to sequenceNr [{}] deleted successfully.", toSequenceNr)
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
internalDeleteSnapshots(toSequenceNr - 1)
Some(DeleteEventsCompleted(toSequenceNr))
case DeleteMessagesFailure(e, toSequenceNr) =>
Some(DeleteEventsFailed(toSequenceNr, e))
case _ =>
None
}
signal match {
case Some(sig) =>
setup.onSignal(sig)
Behaviors.same
case None =>
Behaviors.unhandled // unexpected journal response
}
}
/**
* Handle snapshot responses for non-persist events workloads.
* These are performed in the background and may happen in all phases.
*/
def onDeleteSnapshotResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
val signal = response match {
case DeleteSnapshotsSuccess(criteria) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria)))
case DeleteSnapshotsFailure(criteria, error) =>
Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), error))
case DeleteSnapshotSuccess(meta) =>
Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(meta)))
case DeleteSnapshotFailure(meta, error) =>
Some(DeleteSnapshotsFailed(DeletionTarget.Individual(meta), error))
case _ =>
None
}
signal match {
case Some(sig) =>
setup.onSignal(sig)
Behaviors.same
case None =>
Behaviors.unhandled // unexpected snapshot response
}
}
}

View file

@ -35,7 +35,7 @@ import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteMessagesCompleted
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
@ -148,7 +148,7 @@ object EventSourcedBehaviorSpec {
persistenceId,
loggingActor = TestProbe[String].ref,
probe = TestProbe[(State, Event)].ref,
snapshotProbe = TestProbe[Try[Done]].ref,
snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref,
retentionProbe = TestProbe[Try[EventSourcedSignal]].ref)
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(
@ -158,14 +158,14 @@ object EventSourcedBehaviorSpec {
persistenceId,
loggingActor = logging,
probe = TestProbe[(State, Event)].ref,
TestProbe[Try[Done]].ref,
TestProbe[Try[SnapshotMetadata]].ref,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]])(
snapshotProbe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref)
@ -176,10 +176,13 @@ object EventSourcedBehaviorSpec {
persistenceId,
TestProbe[String].ref,
probe,
TestProbe[Try[Done]].ref,
TestProbe[Try[SnapshotMetadata]].ref,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(
def counterWithSnapshotProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
@ -192,7 +195,7 @@ object EventSourcedBehaviorSpec {
def counterWithSnapshotAndRetentionProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probeS: ActorRef[Try[Done]],
probeS: ActorRef[Try[SnapshotMetadata]],
probeR: ActorRef[Try[EventSourcedSignal]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
@ -208,7 +211,7 @@ object EventSourcedBehaviorSpec {
persistenceId: PersistenceId,
loggingActor: ActorRef[String],
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]],
snapshotProbe: ActorRef[Try[SnapshotMetadata]],
retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
@ -311,8 +314,8 @@ object EventSourcedBehaviorSpec {
State(state.value + delta, state.history :+ state.value)
}).receiveSignal {
case RecoveryCompleted(_) ()
case SnapshotCompleted(_)
snapshotProbe ! Success(Done)
case SnapshotCompleted(metadata)
snapshotProbe ! Success(metadata)
case SnapshotFailed(_, failure)
snapshotProbe ! Failure(failure)
case e: EventSourcedSignal =>
@ -336,6 +339,9 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
private def unexpected(signal: EventSourcedSignal): Unit =
fail(s"Unexpected signal [$signal].")
"A typed persistent actor" must {
"persist an event" in {
@ -487,7 +493,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
"snapshot via predicate" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) =>
@ -499,7 +505,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
val replyProbe = TestProbe[State]()
c ! Increment
snapshotProbe.expectMessage(Success(Done))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! LogThenStop
@ -517,7 +523,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
"check all events for snapshot in PersistAll" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val snapshotAtTwo = Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) =>
s.value == 2
@ -530,7 +536,8 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
snapshotProbe.expectMessage(Success(Done))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
c ! LogThenStop
watchProbe.expectMessage("Terminated")
@ -556,13 +563,13 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val snapshotProbe = TestProbe[Try[Done]]()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val c2 = spawn(
Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref).snapshotEvery(2)))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))
val watchProbeC2 = watcher(c2)
c2 ! Increment
snapshotProbe.expectMessage(Try(Done))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
c2 ! LogThenStop
watchProbeC2.expectMessage("Terminated")
@ -576,7 +583,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
"snapshot every N sequence nrs when persisting multiple events" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val c =
spawn(Behaviors.setup[Command](ctx => counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotEvery(2)))
val watchProbe = watcher(c)
@ -585,7 +592,8 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
snapshotProbe.expectMessage(Try(Done))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
c ! LogThenStop
watchProbe.expectMessage("Terminated")
@ -750,10 +758,9 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
}
"delete snapshots automatically, based on criteria" in {
val unexpected = (signal: EventSourcedSignal) => fail(s"Unexpected signal [$signal].")
val snapshotEvery = 3
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
@ -761,74 +768,118 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotEvery(2)
.withRetention(
RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = false))))
.snapshotEvery(snapshotEvery)
.withRetention(RetentionCriteria(snapshotEveryNEvents = snapshotEvery, keepNSnapshots = 2))))
persistentActor ! IncrementWithPersistAll(3)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
snapshotProbe.expectMessage(Try(Done))
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 3
minSequenceNr shouldEqual 0
case signal => unexpected(signal)
}
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 6
minSequenceNr shouldEqual 0
case signal => unexpected(signal)
}
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 9
minSequenceNr shouldEqual 3
case signal => unexpected(signal)
}
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 12
minSequenceNr shouldEqual 6
case signal => unexpected(signal)
}
retentionProbe.expectNoMessage()
}
"optionally delete both old events and snapshots" in {
val snapshotEvery = 3
val pid = nextPid
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotEvery(snapshotEvery)
.withRetention(RetentionCriteria(
snapshotEveryNEvents = snapshotEvery,
keepNSnapshots = 2,
deleteEventsOnSnapshot = true))))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
// Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one".
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
maxSequenceNr shouldEqual 2
minSequenceNr shouldEqual 0
case signal => unexpected(signal)
}
persistentActor ! IncrementWithPersistAll(3)
snapshotProbe.expectMessage(Try(Done))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 5
minSequenceNr shouldEqual 1
minSequenceNr shouldEqual 0
case signal => unexpected(signal)
}
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(6, Vector(0, 1, 2, 3, 4, 5)))
}
"optionally delete both old messages and snapshots" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotEvery(2)
.withRetention(
RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = true))))
persistentActor ! IncrementWithPersistAll(10)
persistentActor ! GetValue(replyProbe.ref)
val initialState = replyProbe.expectMessageType[State]
initialState.value shouldEqual 10
initialState.history shouldEqual (0 until initialState.value).toVector
snapshotProbe.expectMessage(Try(Done))
val firstDeleteMessages = retentionProbe.expectMessageType[Success[DeleteMessagesCompleted]].value
firstDeleteMessages.toSequenceNr shouldEqual 6 // 10 - 2 * 2
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 5 // 10 / 2
minSequenceNr shouldEqual 1
case signal => fail(s"Unexpected signal [$signal].")
maxSequenceNr shouldEqual 8
minSequenceNr shouldEqual 2
case signal => unexpected(signal)
}
persistentActor ! IncrementWithPersistAll(10)
snapshotProbe.expectMessage(Try(Done))
val secondDeleteMessages = retentionProbe.expectMessageType[Success[DeleteMessagesCompleted]].value
secondDeleteMessages.toSequenceNr shouldEqual 16 // 20 - 2 * 2
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match {
case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) =>
maxSequenceNr shouldEqual 11
minSequenceNr shouldEqual 5
case signal => unexpected(signal)
}
persistentActor ! GetValue(replyProbe.ref)
val state = replyProbe.expectMessageType[State]
state.value shouldEqual 20
state.history shouldEqual (0 until state.value).toVector
retentionProbe.expectNoMessage()
}
"fail fast if persistenceId is null" in {

View file

@ -1440,7 +1440,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
}
val probes = Vector.fill(10)(TestProbe())
(probes.zip(commands)).foreach {
probes.zip(commands).foreach {
case (p, c) =>
persistentActor.tell(c, p.ref)
}
@ -1530,7 +1530,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectMsg("a-2")
expectMsg("d-3")
expectMsg("d-4")
expectNoMsg(100.millis)
expectNoMessage(100.millis)
}
test(deferringAsyncWithPersistActor)
@ -1543,7 +1543,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectMsg("pa-a-2")
expectMsg("d-a-3")
expectMsg("d-a-4")
expectNoMsg(100.millis)
expectNoMessage(100.millis)
}
test(deferringAsyncWithAsyncPersistActor)
@ -1569,7 +1569,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
p2.expectMsg("pa-b-5")
p2.expectMsg("d-b-6")
expectNoMsg(100.millis)
expectNoMessage(100.millis)
}
test(deferringAsyncMixedCallsPPADDPADPersistActor)
@ -1581,7 +1581,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectMsg("d-1")
expectMsg("d-2")
expectMsg("d-3")
expectNoMsg(100.millis)
expectNoMessage(100.millis)
}
test(deferringAsyncWithNoPersistCallsPersistActor)
@ -1602,7 +1602,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
p2.expectMsg("pa-b-2")
p2.expectMsg("d-b-3")
p2.expectMsg("d-b-4")
expectNoMsg(100.millis)
expectNoMessage(100.millis)
}
test(deferringAsyncWithAsyncPersistActor)

View file

@ -124,7 +124,7 @@ class SnapshotFailureRobustnessSpec
TestEvent.Mute(EventFilter[java.io.NotSerializableException](start = "Error loading snapshot")))
system.eventStream.subscribe(testActor, classOf[Logging.Error])
try {
val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
expectMsgType[Logging.Error].message.toString should startWith("Error loading snapshot")
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) =>
@ -133,7 +133,7 @@ class SnapshotFailureRobustnessSpec
}
expectMsg("kablama-2")
expectMsg(RecoveryCompleted)
expectNoMsg(1 second)
expectNoMessage(1 second)
} finally {
system.eventStream.unsubscribe(testActor, classOf[Logging.Error])
system.eventStream.publish(TestEvent.UnMute(EventFilter.error(start = "Error loading snapshot [")))
@ -182,7 +182,7 @@ class SnapshotFailureRobustnessSpec
expectMsg(1)
p ! DeleteSnapshot(1)
expectMsgPF() {
case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, timestamp), cause) =>
case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, _), cause) =>
// ok, expected failure
cause.getMessage should include("Failed to delete")
}
@ -199,7 +199,7 @@ class SnapshotFailureRobustnessSpec
val criteria = SnapshotSelectionCriteria(maxSequenceNr = 10)
p ! DeleteSnapshots(criteria)
expectMsgPF() {
case DeleteSnapshotsFailure(criteria, cause) =>
case DeleteSnapshotsFailure(_, cause) =>
// ok, expected failure
cause.getMessage should include("Failed to delete")
}