EventsourcedBehaviorRetentionSpec race fix (#27532)

* Not working for mysterious reasons

* Fix for #27507 race in test

Logging at debug to try if there is a race when testing on Jenkins

Also, fail fast if expecting a signal from a TestProbe (never happens)
Some minor logging improvements for event sourced behavior as well.

* Don't send batches of events when snapshot and event deletion is enabled

When events are deleted the event sourced actor switches to running until it gets
an ack back from the journal about deleting events, then it deletes snapshots if that
is enabled. This means events and snapshotting could happen inbetween making the
ordering of SnapshotComplete and DeleteSnapshotsComplete signals non deterministic.
This commit is contained in:
Johan Andrén 2019-10-01 12:23:13 +02:00 committed by Arnout Engelen
parent e0192470ec
commit f6ebff6e27
4 changed files with 277 additions and 262 deletions

View file

@ -143,9 +143,10 @@ private[akka] trait JournalInteractions[C, E, S] {
if (toSequenceNr > 0) {
val self = setup.selfClassic
if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr)
if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) {
setup.log.debug("Deleting events up to sequenceNr [{}]", toSequenceNr)
setup.journal ! JournalProtocol.DeleteMessagesTo(setup.persistenceId.id, toSequenceNr, self)
else
} else
self ! DeleteMessagesFailure(
new RuntimeException(
s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"),

View file

@ -353,12 +353,13 @@ private[akka] object Running {
case SaveSnapshotSuccess(meta) =>
setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta)
if (snapshotReason == SnapshotWithRetention) {
// deletion of old events and snspahots are triggered by the SaveSnapshotSuccess
// deletion of old events and snapshots are triggered by the SaveSnapshotSuccess
setup.retention match {
case DisabledRetentionCriteria => // no further actions
case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) =>
// deleteEventsOnSnapshot == true, deletion of old events
val deleteEventsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr)
// snapshot deletion then happens on event deletion success in Running.onDeleteEventsJournalResponse
internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr)
case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) =>
// deleteEventsOnSnapshot == false, deletion of old snapshots
@ -377,8 +378,13 @@ private[akka] object Running {
None
}
setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal)
signal.foreach(setup.onSignal(state.state, _, catchAndLog = false))
signal match {
case Some(signal) =>
setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal)
setup.onSignal(state.state, signal, catchAndLog = false)
case None =>
setup.log.debug("Received snapshot response [{}], no signal emitted.", response)
}
}
Behaviors

View file

@ -7,34 +7,32 @@ package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventSourcedSignal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.serialization.jackson.CborSerializable
import akka.util.unused
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import scala.concurrent.duration._
object EventSourcedBehaviorRetentionSpec {
object EventSourcedBehaviorRetentionSpec extends Matchers {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
@ -51,61 +49,20 @@ object EventSourcedBehaviorRetentionSpec {
final case class GetValue(replyTo: ActorRef[State]) extends Command
final case object StopIt extends Command
final case class WrappedSignal(signal: EventSourcedSignal)
sealed trait Event extends CborSerializable
final case class Incremented(delta: Int) extends Event
final case class State(value: Int, history: Vector[Int]) extends CborSerializable
def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] =
Behaviors.setup(ctx => counter(ctx, persistenceId))
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
probe = TestProbe[(State, Event)].ref,
snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref,
retentionProbe = TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, probe, TestProbe[Try[SnapshotMetadata]].ref, TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[Try[SnapshotMetadata]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(
ctx,
persistenceId,
TestProbe[(State, Event)].ref,
snapshotProbe = probe,
TestProbe[Try[EventSourcedSignal]].ref)
def counterWithSnapshotAndRetentionProbe(
ctx: ActorContext[Command],
persistenceId: PersistenceId,
probeS: ActorRef[Try[SnapshotMetadata]],
probeR: ActorRef[Try[EventSourcedSignal]])(
implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[(State, Event)].ref, snapshotProbe = probeS, retentionProbe = probeR)
def counter(
@unused ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[SnapshotMetadata]],
retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = {
probe: Option[ActorRef[(State, Event)]] = None,
snapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None,
eventSignalProbe: Option[ActorRef[Try[EventSourcedSignal]]] = None)
: EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
@ -128,18 +85,43 @@ object EventSourcedBehaviorRetentionSpec {
eventHandler = (state, evt) =>
evt match {
case Incremented(delta) =>
probe ! ((state, evt))
probe.foreach(_ ! ((state, evt)))
State(state.value + delta, state.history :+ state.value)
}).receiveSignal {
case (_, RecoveryCompleted) => ()
case (_, SnapshotCompleted(metadata)) =>
snapshotProbe ! Success(metadata)
case (_, SnapshotFailed(_, failure)) =>
snapshotProbe ! Failure(failure)
case (_, sc: SnapshotCompleted) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(sc))
case (_, sf: SnapshotFailed) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(sf))
case (_, dc: DeleteSnapshotsCompleted) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(dc))
case (_, dsf: DeleteSnapshotsFailed) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(dsf))
case (_, e: EventSourcedSignal) =>
retentionProbe ! Success(e)
eventSignalProbe.foreach(_ ! Success(e))
}
}
implicit class WrappedSignalProbeAssert(probe: TestProbe[WrappedSignal]) {
def expectSnapshotCompleted(sequenceNumber: Int): SnapshotCompleted = {
val wrapped = probe.expectMessageType[WrappedSignal]
wrapped.signal shouldBe a[SnapshotCompleted]
val completed = wrapped.signal.asInstanceOf[SnapshotCompleted]
completed.metadata.sequenceNr should ===(sequenceNumber)
completed
}
def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr: Long): DeleteSnapshotsCompleted = {
val wrapped = probe.expectMessageType[WrappedSignal]
wrapped.signal shouldBe a[DeleteSnapshotsCompleted]
val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted]
signal.target should ===(
DeletionTarget.Criteria(
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))
signal
}
}
}
class EventSourcedBehaviorRetentionSpec
@ -158,33 +140,35 @@ class EventSourcedBehaviorRetentionSpec
val pid = nextPid()
val c = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! StopIt
watchProbe.expectMessage("Terminated")
val watchProbe = TestProbe()
watchProbe.expectTerminated(c)
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val snapshotProbe = createTestProbe[WrappedSignal]()
val c2 = spawn(
Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref)
counter(ctx, pid, probe = Some(probeC2.ref), snapshotSignalProbe = Some(snapshotProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))
val watchProbeC2 = watcher(c2)
c2 ! Increment
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectSnapshotCompleted(2)
c2 ! StopIt
watchProbeC2.expectMessage("Terminated")
watchProbe.expectTerminated(c2)
val probeC3 = TestProbe[(State, Event)]()
val c3 = spawn(
Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC3.ref)
counter(ctx, pid, Some(probeC3.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
// this time it should have been snapshotted so no events to replay
probeC3.expectNoMessage()
@ -194,27 +178,27 @@ class EventSourcedBehaviorRetentionSpec
"snapshot every N sequence nrs when persisting multiple events" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val c =
spawn(
Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref)
counter(ctx, pid, None, Some(snapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotSignalProbe.expectSnapshotCompleted(3)
c ! StopIt
watchProbe.expectMessage("Terminated")
val watchProbe = TestProbe()
watchProbe.expectTerminated(c)
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(
Behaviors.setup[Command](ctx =>
counterWithProbe(ctx, pid, probeC2.ref)
counter(ctx, pid, Some(probeC2.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2))))
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
@ -223,26 +207,26 @@ class EventSourcedBehaviorRetentionSpec
"snapshot via predicate" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val snapshotSignalProbe = TestProbe[WrappedSignal]
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)).snapshotWhen { (_, _, _) =>
true
}
}
val c = spawn(alwaysSnapshot)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
snapshotSignalProbe.expectSnapshotCompleted(1)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! StopIt
watchProbe.expectMessage("Terminated")
val watchProbe = TestProbe()
watchProbe.expectTerminated(c)
val probe = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref)))
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probe.ref))))
// state should be rebuilt from snapshot, no events replayed
// Fails as snapshot is async (i think)
probe.expectNoMessage()
@ -253,13 +237,13 @@ class EventSourcedBehaviorRetentionSpec
"check all events for snapshot in PersistAll" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]
val snapshotSignalProbe = TestProbe[WrappedSignal]
val snapshotAtTwo = Behaviors.setup[Command](ctx =>
counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)).snapshotWhen { (s, _, _) =>
s.value == 2
})
val c: ActorRef[Command] = spawn(snapshotAtTwo)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
@ -267,293 +251,309 @@ class EventSourcedBehaviorRetentionSpec
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
// snapshot at seqNr 3 because of persistAll
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotSignalProbe.expectSnapshotCompleted(3)
c ! StopIt
watchProbe.expectMessage("Terminated")
val watchProbe = TestProbe()
watchProbe.expectTerminated(c)
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref)))
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probeC2.ref))))
// middle event triggered all to be snapshot
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
}
def expectDeleteSnapshotCompleted(
retentionProbe: TestProbe[Try[EventSourcedSignal]],
maxSequenceNr: Long,
minSequenceNr: Long): Unit = {
retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===(
DeleteSnapshotsCompleted(DeletionTarget.Criteria(
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr))))
}
"delete snapshots automatically, based on criteria" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2))))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectSnapshotCompleted(9)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
expectDeleteSnapshotCompleted(retentionProbe, 6, 0)
expectDeleteSnapshotCompleted(retentionProbe, 9, 3)
expectDeleteSnapshotCompleted(retentionProbe, 12, 6)
snapshotSignalProbe.expectSnapshotCompleted(12)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0)
snapshotSignalProbe.expectSnapshotCompleted(15)
snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
snapshotSignalProbe.expectSnapshotCompleted(18)
snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6)
retentionProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
"optionally delete both old events and snapshots" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx =>
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention(
// tests the Java API as well
RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot)))
val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.withRetention(
// tests the Java API as well
RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot)))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectSnapshotCompleted(9)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
// Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one".
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18)
// one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering
// if sending many commands in one go is not deterministic
persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
persistentActor ! Increment // 13
persistentActor ! Increment // 14
persistentActor ! Increment // 11
persistentActor ! Increment // 15
snapshotSignalProbe.expectSnapshotCompleted(15)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
expectDeleteSnapshotCompleted(retentionProbe, 8, 2)
persistentActor ! Increment // 16
persistentActor ! Increment // 17
persistentActor ! Increment // 18
snapshotSignalProbe.expectSnapshotCompleted(18)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
expectDeleteSnapshotCompleted(retentionProbe, 11, 5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5)
retentionProbe.expectNoMessage()
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
"be possible to combine snapshotWhen and retention criteria" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx =>
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))
val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1))))
(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, (0 until 3).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
retentionProbe.expectNoMessage()
snapshotSignalProbe.expectSnapshotCompleted(3)
eventProbe.expectNoMessage()
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
snapshotSignalProbe.expectSnapshotCompleted(5)
snapshotSignalProbe.expectSnapshotCompleted(10)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13)
snapshotSignalProbe.expectSnapshotCompleted(13)
// no deletes triggered by snapshotWhen
retentionProbe.expectNoMessage()
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
(14 to 16).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15)
expectDeleteSnapshotCompleted(retentionProbe, 10, 5)
retentionProbe.expectNoMessage()
snapshotSignalProbe.expectSnapshotCompleted(15)
snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5)
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
}
"be possible to combine snapshotWhen and retention criteria withDeleteEventsOnSnapshot" in {
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](
ctx =>
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))
val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13)
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))
(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, (0 until 3).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
retentionProbe.expectNoMessage()
snapshotSignalProbe.expectSnapshotCompleted(2) // every-2 through criteria
snapshotSignalProbe.expectSnapshotCompleted(3) // snapshotWhen
// no event deletes or snapshot deletes after snapshotWhen
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
expectDeleteSnapshotCompleted(retentionProbe, 1, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
// one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering
// if sending many commands in one go is not deterministic
persistentActor ! Increment // 4
snapshotSignalProbe.expectSnapshotCompleted(4) // every-2 through criteria
persistentActor ! Increment // 5
persistentActor ! Increment // 6
snapshotSignalProbe.expectSnapshotCompleted(6) // every-2 through criteria
(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13)
persistentActor ! Increment // 7
persistentActor ! Increment // 8
snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through criteria
// triggers delete up to snapshot no 2
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot
persistentActor ! Increment // 9
persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through criteria
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through criteria
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
persistentActor ! Increment // 13
snapshotSignalProbe.expectSnapshotCompleted(13) // snapshotWhen
// no deletes triggered by snapshotWhen
retentionProbe.expectNoMessage()
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
(14 to 16).foreach(_ => persistentActor ! Increment)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(14)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8
expectDeleteSnapshotCompleted(retentionProbe, 7, 1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(16)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10
expectDeleteSnapshotCompleted(retentionProbe, 9, 3)
retentionProbe.expectNoMessage()
persistentActor ! Increment // 14
snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8
snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1)
persistentActor ! Increment // 15
persistentActor ! Increment // 16
snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10
snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
}
}
"be possible to snapshot every event" in {
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val replyProbe = TestProbe[State]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref)
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3))))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4)
expectDeleteSnapshotCompleted(retentionProbe, 1, 0)
snapshotSignalProbe.expectSnapshotCompleted(1)
snapshotSignalProbe.expectSnapshotCompleted(2)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(4)
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5)
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
snapshotSignalProbe.expectSnapshotCompleted(5)
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7)
expectDeleteSnapshotCompleted(retentionProbe, 4, 1)
snapshotSignalProbe.expectSnapshotCompleted(7)
snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8)
expectDeleteSnapshotCompleted(retentionProbe, 5, 2)
snapshotSignalProbe.expectSnapshotCompleted(8)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
expectDeleteSnapshotCompleted(retentionProbe, 6, 3)
snapshotSignalProbe.expectSnapshotCompleted(9)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
expectDeleteSnapshotCompleted(retentionProbe, 7, 4)
snapshotSignalProbe.expectSnapshotCompleted(10)
snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
}
"be possible to snapshot every event withDeleteEventsOnSnapshot" in {
// very bad idea to snapshot every event, but technically possible
val pid = nextPid()
val snapshotProbe = TestProbe[Try[SnapshotMetadata]]()
val retentionProbe = TestProbe[Try[EventSourcedSignal]]()
val replyProbe = TestProbe[State]()
val snapshotSignalProbe = TestProbe[WrappedSignal]()
val eventProbe = TestProbe[Try[EventSourcedSignal]]()
val persistentActor = spawn(
Behaviors.setup[Command](ctx =>
counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention(
val persistentActor = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref))
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot)))
(1 to 10).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1
// one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering
// if sending many commands in one go is not deterministic
(1 to 4).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(1)
snapshotSignalProbe.expectSnapshotCompleted(2)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(4)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
expectDeleteSnapshotCompleted(retentionProbe, 1, 0)
persistentActor ! Increment // 5
snapshotSignalProbe.expectSnapshotCompleted(5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
expectDeleteSnapshotCompleted(retentionProbe, 2, 0)
persistentActor ! Increment // 6
snapshotSignalProbe.expectSnapshotCompleted(6)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
expectDeleteSnapshotCompleted(retentionProbe, 3, 0)
persistentActor ! Increment // 7
snapshotSignalProbe.expectSnapshotCompleted(7)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5
expectDeleteSnapshotCompleted(retentionProbe, 4, 1)
persistentActor ! Increment // 8
snapshotSignalProbe.expectSnapshotCompleted(8)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5
snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
expectDeleteSnapshotCompleted(retentionProbe, 5, 2)
persistentActor ! Increment // 9
snapshotSignalProbe.expectSnapshotCompleted(9)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10)
retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7
expectDeleteSnapshotCompleted(retentionProbe, 6, 3)
persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
val probe = TestProbe[String]()
val w = Behaviors.setup[Any] { ctx =>
ctx.watch(toWatch)
Behaviors
.receive[Any] { (_, _) =>
Behaviors.same
}
.receiveSignal {
case (_, _: Terminated) =>
probe.ref ! "Terminated"
Behaviors.stopped
}
}
spawn(w)
probe
}
}
}