diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala index f5f76f1a24..e0f5585056 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala @@ -13,11 +13,11 @@ import java.util.{ List => JList } import scala.annotation.tailrec import akka.util.ccompat.JavaConverters._ + import scala.collection.immutable import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.control.NonFatal - import akka.actor.testkit.typed.FishingOutcome import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.javadsl.{ TestProbe => JavaTestProbe } @@ -26,6 +26,7 @@ import akka.actor.testkit.typed.scaladsl.{ TestProbe => ScalaTestProbe } import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior +import akka.actor.typed.Signal import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi @@ -153,6 +154,9 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) expectMessage(max.asScala, hint, obj) private def expectMessage_internal[T <: M](max: FiniteDuration, obj: T, hint: Option[String] = None): T = { + if (obj.isInstanceOf[Signal]) + throw new IllegalArgumentException( + s"${obj.getClass.getName} is a signal, expecting signals with a TestProbe is not possible") val o = receiveOne_internal(max) val hintOrEmptyString = hint.map(": " + _).getOrElse("") o match { @@ -217,6 +221,10 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) expectMessageClass_internal(max.asScala.dilated, clazz) private def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = { + if (classOf[Signal].isAssignableFrom(c)) { + throw new IllegalArgumentException( + s"${c.getName} is a signal, expecting signals with a TestProbe is not possible") + } val o = receiveOne_internal(max) val bt = BoxedType(c) o match { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 755a4d5786..39b73a2111 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -143,9 +143,10 @@ private[akka] trait JournalInteractions[C, E, S] { if (toSequenceNr > 0) { val self = setup.selfClassic - if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) + if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) { + setup.log.debug("Deleting events up to sequenceNr [{}]", toSequenceNr) setup.journal ! JournalProtocol.DeleteMessagesTo(setup.persistenceId.id, toSequenceNr, self) - else + } else self ! DeleteMessagesFailure( new RuntimeException( s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 3aa6f78552..2e79cf5b77 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -353,12 +353,13 @@ private[akka] object Running { case SaveSnapshotSuccess(meta) => setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta) if (snapshotReason == SnapshotWithRetention) { - // deletion of old events and snspahots are triggered by the SaveSnapshotSuccess + // deletion of old events and snapshots are triggered by the SaveSnapshotSuccess setup.retention match { case DisabledRetentionCriteria => // no further actions case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) => // deleteEventsOnSnapshot == true, deletion of old events val deleteEventsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr) + // snapshot deletion then happens on event deletion success in Running.onDeleteEventsJournalResponse internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr) case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) => // deleteEventsOnSnapshot == false, deletion of old snapshots @@ -377,8 +378,13 @@ private[akka] object Running { None } - setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal) - signal.foreach(setup.onSignal(state.state, _, catchAndLog = false)) + signal match { + case Some(signal) => + setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal) + setup.onSignal(state.state, signal, catchAndLog = false) + case None => + setup.log.debug("Received snapshot response [{}], no signal emitted.", response) + } } Behaviors diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala index 3a8b6bcde9..4f897d5d54 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala @@ -7,34 +7,32 @@ package akka.persistence.typed.scaladsl import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior -import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.DeleteEventsCompleted import akka.persistence.typed.DeleteSnapshotsCompleted +import akka.persistence.typed.DeleteSnapshotsFailed import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventSourcedSignal import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed -import akka.persistence.typed.SnapshotMetadata import akka.persistence.typed.SnapshotSelectionCriteria import akka.serialization.jackson.CborSerializable import akka.util.unused import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import org.scalatest.Matchers import org.scalatest.WordSpecLike +import scala.concurrent.duration._ -object EventSourcedBehaviorRetentionSpec { +object EventSourcedBehaviorRetentionSpec extends Matchers { def conf: Config = ConfigFactory.parseString(s""" akka.loglevel = INFO @@ -51,61 +49,20 @@ object EventSourcedBehaviorRetentionSpec { final case class GetValue(replyTo: ActorRef[State]) extends Command final case object StopIt extends Command + final case class WrappedSignal(signal: EventSourcedSignal) + sealed trait Event extends CborSerializable final case class Incremented(delta: Int) extends Event final case class State(value: Int, history: Vector[Int]) extends CborSerializable - def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] = - Behaviors.setup(ctx => counter(ctx, persistenceId)) - - def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - probe = TestProbe[(State, Event)].ref, - snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref, - retentionProbe = TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[SnapshotMetadata]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, probe, TestProbe[Try[SnapshotMetadata]].ref, TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithSnapshotProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probe: ActorRef[Try[SnapshotMetadata]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - TestProbe[(State, Event)].ref, - snapshotProbe = probe, - TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithSnapshotAndRetentionProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probeS: ActorRef[Try[SnapshotMetadata]], - probeR: ActorRef[Try[EventSourcedSignal]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, TestProbe[(State, Event)].ref, snapshotProbe = probeS, retentionProbe = probeR) - def counter( @unused ctx: ActorContext[Command], persistenceId: PersistenceId, - probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[SnapshotMetadata]], - retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = { + probe: Option[ActorRef[(State, Event)]] = None, + snapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None, + eventSignalProbe: Option[ActorRef[Try[EventSourcedSignal]]] = None) + : EventSourcedBehavior[Command, Event, State] = { EventSourcedBehavior[Command, Event, State]( persistenceId, emptyState = State(0, Vector.empty), @@ -128,18 +85,43 @@ object EventSourcedBehaviorRetentionSpec { eventHandler = (state, evt) => evt match { case Incremented(delta) => - probe ! ((state, evt)) + probe.foreach(_ ! ((state, evt))) State(state.value + delta, state.history :+ state.value) }).receiveSignal { case (_, RecoveryCompleted) => () - case (_, SnapshotCompleted(metadata)) => - snapshotProbe ! Success(metadata) - case (_, SnapshotFailed(_, failure)) => - snapshotProbe ! Failure(failure) + case (_, sc: SnapshotCompleted) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(sc)) + case (_, sf: SnapshotFailed) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(sf)) + case (_, dc: DeleteSnapshotsCompleted) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(dc)) + case (_, dsf: DeleteSnapshotsFailed) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(dsf)) case (_, e: EventSourcedSignal) => - retentionProbe ! Success(e) + eventSignalProbe.foreach(_ ! Success(e)) } } + + implicit class WrappedSignalProbeAssert(probe: TestProbe[WrappedSignal]) { + def expectSnapshotCompleted(sequenceNumber: Int): SnapshotCompleted = { + val wrapped = probe.expectMessageType[WrappedSignal] + wrapped.signal shouldBe a[SnapshotCompleted] + val completed = wrapped.signal.asInstanceOf[SnapshotCompleted] + completed.metadata.sequenceNr should ===(sequenceNumber) + completed + } + + def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr: Long): DeleteSnapshotsCompleted = { + val wrapped = probe.expectMessageType[WrappedSignal] + wrapped.signal shouldBe a[DeleteSnapshotsCompleted] + val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted] + signal.target should ===( + DeletionTarget.Criteria( + SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr))) + signal + } + } + } class EventSourcedBehaviorRetentionSpec @@ -158,33 +140,35 @@ class EventSourcedBehaviorRetentionSpec val pid = nextPid() val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() c ! Increment c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) // no snapshot should have happened val probeC2 = TestProbe[(State, Event)]() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val snapshotProbe = createTestProbe[WrappedSignal]() + val c2 = spawn( Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref) + counter(ctx, pid, probe = Some(probeC2.ref), snapshotSignalProbe = Some(snapshotProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) - val watchProbeC2 = watcher(c2) + c2 ! Increment - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) + snapshotProbe.expectSnapshotCompleted(2) c2 ! StopIt - watchProbeC2.expectMessage("Terminated") + watchProbe.expectTerminated(c2) val probeC3 = TestProbe[(State, Event)]() val c3 = spawn( Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC3.ref) + counter(ctx, pid, Some(probeC3.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) // this time it should have been snapshotted so no events to replay probeC3.expectNoMessage() @@ -194,27 +178,27 @@ class EventSourcedBehaviorRetentionSpec "snapshot every N sequence nrs when persisting multiple events" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() val c = spawn( Behaviors.setup[Command](ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref) + counter(ctx, pid, None, Some(snapshotSignalProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - val watchProbe = watcher(c) val replyProbe = TestProbe[State]() c ! IncrementWithPersistAll(3) c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) // snapshot at seqNr 3 because of persistAll - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotSignalProbe.expectSnapshotCompleted(3) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) val probeC2 = TestProbe[(State, Event)]() val c2 = spawn( Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC2.ref) + counter(ctx, pid, Some(probeC2.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) probeC2.expectNoMessage() c2 ! GetValue(replyProbe.ref) @@ -223,26 +207,26 @@ class EventSourcedBehaviorRetentionSpec "snapshot via predicate" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]] + val snapshotSignalProbe = TestProbe[WrappedSignal] val alwaysSnapshot: Behavior[Command] = Behaviors.setup { ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)).snapshotWhen { (_, _, _) => true } } val c = spawn(alwaysSnapshot) - val watchProbe = watcher(c) val replyProbe = TestProbe[State]() c ! Increment - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) + snapshotSignalProbe.expectSnapshotCompleted(1) c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) val probe = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref))) + val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probe.ref)))) // state should be rebuilt from snapshot, no events replayed // Fails as snapshot is async (i think) probe.expectNoMessage() @@ -253,13 +237,13 @@ class EventSourcedBehaviorRetentionSpec "check all events for snapshot in PersistAll" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]] + val snapshotSignalProbe = TestProbe[WrappedSignal] val snapshotAtTwo = Behaviors.setup[Command](ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)).snapshotWhen { (s, _, _) => s.value == 2 }) val c: ActorRef[Command] = spawn(snapshotAtTwo) - val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() c ! IncrementWithPersistAll(3) @@ -267,293 +251,309 @@ class EventSourcedBehaviorRetentionSpec c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) // snapshot at seqNr 3 because of persistAll - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotSignalProbe.expectSnapshotCompleted(3) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref))) + val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probeC2.ref)))) // middle event triggered all to be snapshot probeC2.expectNoMessage() c2 ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) } - def expectDeleteSnapshotCompleted( - retentionProbe: TestProbe[Try[EventSourcedSignal]], - maxSequenceNr: Long, - minSequenceNr: Long): Unit = { - retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===( - DeleteSnapshotsCompleted(DeletionTarget.Criteria( - SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))) - } - "delete snapshots automatically, based on criteria" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() val replyProbe = TestProbe[State]() val persistentActor = spawn( Behaviors.setup[Command](ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2)))) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(6) + snapshotSignalProbe.expectSnapshotCompleted(9) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(20, (0 until 20).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) - expectDeleteSnapshotCompleted(retentionProbe, 6, 0) - expectDeleteSnapshotCompleted(retentionProbe, 9, 3) - expectDeleteSnapshotCompleted(retentionProbe, 12, 6) + snapshotSignalProbe.expectSnapshotCompleted(12) + snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0) + snapshotSignalProbe.expectSnapshotCompleted(15) + snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3) + snapshotSignalProbe.expectSnapshotCompleted(18) + snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() } "optionally delete both old events and snapshots" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() val replyProbe = TestProbe[State]() - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( - // tests the Java API as well - RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot))) + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .withRetention( + // tests the Java API as well + RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot))) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(6) + snapshotSignalProbe.expectSnapshotCompleted(9) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 // Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one". // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without // starting at the snapshot at toSequenceNr would be invalid. - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(20, (0 until 20).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) + // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering + // if sending many commands in one go is not deterministic + persistentActor ! Increment // 11 + persistentActor ! Increment // 12 + snapshotSignalProbe.expectSnapshotCompleted(12) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + persistentActor ! Increment // 13 + persistentActor ! Increment // 14 + persistentActor ! Increment // 11 + persistentActor ! Increment // 15 + snapshotSignalProbe.expectSnapshotCompleted(15) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 + snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 - expectDeleteSnapshotCompleted(retentionProbe, 8, 2) + persistentActor ! Increment // 16 + persistentActor ! Increment // 17 + persistentActor ! Increment // 18 + snapshotSignalProbe.expectSnapshotCompleted(18) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 - expectDeleteSnapshotCompleted(retentionProbe, 11, 5) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 + snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5) - retentionProbe.expectNoMessage() + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() } "be possible to combine snapshotWhen and retention criteria" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() val replyProbe = TestProbe[State]() - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) (1 to 3).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, (0 until 3).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectSnapshotCompleted(3) + eventProbe.expectNoMessage() (4 to 10).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + snapshotSignalProbe.expectSnapshotCompleted(5) + snapshotSignalProbe.expectSnapshotCompleted(10) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) (11 to 13).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13) + snapshotSignalProbe.expectSnapshotCompleted(13) // no deletes triggered by snapshotWhen - retentionProbe.expectNoMessage() + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } (14 to 16).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(16, (0 until 16).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - expectDeleteSnapshotCompleted(retentionProbe, 10, 5) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectSnapshotCompleted(15) + snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5) + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } } "be possible to combine snapshotWhen and retention criteria withDeleteEventsOnSnapshot" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() val replyProbe = TestProbe[State]() - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) - .withRetention( - RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) + .withRetention( + RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) (1 to 3).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, (0 until 3).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectSnapshotCompleted(2) // every-2 through criteria + snapshotSignalProbe.expectSnapshotCompleted(3) // snapshotWhen + // no event deletes or snapshot deletes after snapshotWhen + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } - (4 to 10).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 - expectDeleteSnapshotCompleted(retentionProbe, 1, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering + // if sending many commands in one go is not deterministic + persistentActor ! Increment // 4 + snapshotSignalProbe.expectSnapshotCompleted(4) // every-2 through criteria + persistentActor ! Increment // 5 + persistentActor ! Increment // 6 + snapshotSignalProbe.expectSnapshotCompleted(6) // every-2 through criteria - (11 to 13).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13) + persistentActor ! Increment // 7 + persistentActor ! Increment // 8 + snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through criteria + // triggers delete up to snapshot no 2 + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 + snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot + + persistentActor ! Increment // 9 + persistentActor ! Increment // 10 + snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through criteria + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 + + persistentActor ! Increment // 11 + persistentActor ! Increment // 12 + snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through criteria + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + + persistentActor ! Increment // 13 + snapshotSignalProbe.expectSnapshotCompleted(13) // snapshotWhen // no deletes triggered by snapshotWhen - retentionProbe.expectNoMessage() + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } - (14 to 16).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(14) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8 - expectDeleteSnapshotCompleted(retentionProbe, 7, 1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(16) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10 - expectDeleteSnapshotCompleted(retentionProbe, 9, 3) - retentionProbe.expectNoMessage() + persistentActor ! Increment // 14 + snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through criteria + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8 + snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1) + + persistentActor ! Increment // 15 + persistentActor ! Increment // 16 + snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through criteria + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10 + snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3) + + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } } "be possible to snapshot every event" in { // very bad idea to snapshot every event, but technically possible val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() val replyProbe = TestProbe[State]() val persistentActor = spawn( Behaviors.setup[Command](ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3)))) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) - expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + snapshotSignalProbe.expectSnapshotCompleted(1) + snapshotSignalProbe.expectSnapshotCompleted(2) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(4) + snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + snapshotSignalProbe.expectSnapshotCompleted(5) + snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + snapshotSignalProbe.expectSnapshotCompleted(6) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7) - expectDeleteSnapshotCompleted(retentionProbe, 4, 1) + snapshotSignalProbe.expectSnapshotCompleted(7) + snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) - expectDeleteSnapshotCompleted(retentionProbe, 5, 2) + snapshotSignalProbe.expectSnapshotCompleted(8) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - expectDeleteSnapshotCompleted(retentionProbe, 6, 3) + snapshotSignalProbe.expectSnapshotCompleted(9) + snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - expectDeleteSnapshotCompleted(retentionProbe, 7, 4) + snapshotSignalProbe.expectSnapshotCompleted(10) + snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4) } "be possible to snapshot every event withDeleteEventsOnSnapshot" in { // very bad idea to snapshot every event, but technically possible val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() - val replyProbe = TestProbe[State]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() - val persistentActor = spawn( - Behaviors.setup[Command](ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .withRetention( RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1 + // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering + // if sending many commands in one go is not deterministic + (1 to 4).foreach(_ => persistentActor ! Increment) + snapshotSignalProbe.expectSnapshotCompleted(1) + snapshotSignalProbe.expectSnapshotCompleted(2) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(4) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1 - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 - expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + persistentActor ! Increment // 5 + snapshotSignalProbe.expectSnapshotCompleted(5) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 + snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + persistentActor ! Increment // 6 + snapshotSignalProbe.expectSnapshotCompleted(6) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 + snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + persistentActor ! Increment // 7 + snapshotSignalProbe.expectSnapshotCompleted(7) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5 - expectDeleteSnapshotCompleted(retentionProbe, 4, 1) + persistentActor ! Increment // 8 + snapshotSignalProbe.expectSnapshotCompleted(8) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5 + snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 2) + persistentActor ! Increment // 9 + snapshotSignalProbe.expectSnapshotCompleted(9) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7 - expectDeleteSnapshotCompleted(retentionProbe, 6, 3) + persistentActor ! Increment // 10 + snapshotSignalProbe.expectSnapshotCompleted(10) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7 + snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) } - def watcher(toWatch: ActorRef[_]): TestProbe[String] = { - val probe = TestProbe[String]() - val w = Behaviors.setup[Any] { ctx => - ctx.watch(toWatch) - Behaviors - .receive[Any] { (_, _) => - Behaviors.same - } - .receiveSignal { - case (_, _: Terminated) => - probe.ref ! "Terminated" - Behaviors.stopped - } - } - spawn(w) - probe - } } }