From f6ebff6e272e4f2f823c211448fa98aaf0028076 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 1 Oct 2019 12:23:13 +0200 Subject: [PATCH] EventsourcedBehaviorRetentionSpec race fix (#27532) * Not working for mysterious reasons * Fix for #27507 race in test Logging at debug to try if there is a race when testing on Jenkins Also, fail fast if expecting a signal from a TestProbe (never happens) Some minor logging improvements for event sourced behavior as well. * Don't send batches of events when snapshot and event deletion is enabled When events are deleted the event sourced actor switches to running until it gets an ack back from the journal about deleting events, then it deletes snapshots if that is enabled. This means events and snapshotting could happen inbetween making the ordering of SnapshotComplete and DeleteSnapshotsComplete signals non deterministic. --- .../typed/internal/TestProbeImpl.scala | 10 +- .../typed/internal/ExternalInteractions.scala | 5 +- .../persistence/typed/internal/Running.scala | 12 +- .../EventSourcedBehaviorRetentionSpec.scala | 512 +++++++++--------- 4 files changed, 277 insertions(+), 262 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala index f5f76f1a24..e0f5585056 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala @@ -13,11 +13,11 @@ import java.util.{ List => JList } import scala.annotation.tailrec import akka.util.ccompat.JavaConverters._ + import scala.collection.immutable import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.control.NonFatal - import akka.actor.testkit.typed.FishingOutcome import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.javadsl.{ TestProbe => JavaTestProbe } @@ -26,6 +26,7 @@ import akka.actor.testkit.typed.scaladsl.{ TestProbe => ScalaTestProbe } import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior +import akka.actor.typed.Signal import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi @@ -153,6 +154,9 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) expectMessage(max.asScala, hint, obj) private def expectMessage_internal[T <: M](max: FiniteDuration, obj: T, hint: Option[String] = None): T = { + if (obj.isInstanceOf[Signal]) + throw new IllegalArgumentException( + s"${obj.getClass.getName} is a signal, expecting signals with a TestProbe is not possible") val o = receiveOne_internal(max) val hintOrEmptyString = hint.map(": " + _).getOrElse("") o match { @@ -217,6 +221,10 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) expectMessageClass_internal(max.asScala.dilated, clazz) private def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = { + if (classOf[Signal].isAssignableFrom(c)) { + throw new IllegalArgumentException( + s"${c.getName} is a signal, expecting signals with a TestProbe is not possible") + } val o = receiveOne_internal(max) val bt = BoxedType(c) o match { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 755a4d5786..39b73a2111 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -143,9 +143,10 @@ private[akka] trait JournalInteractions[C, E, S] { if (toSequenceNr > 0) { val self = setup.selfClassic - if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) + if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) { + setup.log.debug("Deleting events up to sequenceNr [{}]", toSequenceNr) setup.journal ! JournalProtocol.DeleteMessagesTo(setup.persistenceId.id, toSequenceNr, self) - else + } else self ! DeleteMessagesFailure( new RuntimeException( s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 3aa6f78552..2e79cf5b77 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -353,12 +353,13 @@ private[akka] object Running { case SaveSnapshotSuccess(meta) => setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta) if (snapshotReason == SnapshotWithRetention) { - // deletion of old events and snspahots are triggered by the SaveSnapshotSuccess + // deletion of old events and snapshots are triggered by the SaveSnapshotSuccess setup.retention match { case DisabledRetentionCriteria => // no further actions case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) => // deleteEventsOnSnapshot == true, deletion of old events val deleteEventsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr) + // snapshot deletion then happens on event deletion success in Running.onDeleteEventsJournalResponse internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr) case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) => // deleteEventsOnSnapshot == false, deletion of old snapshots @@ -377,8 +378,13 @@ private[akka] object Running { None } - setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal) - signal.foreach(setup.onSignal(state.state, _, catchAndLog = false)) + signal match { + case Some(signal) => + setup.log.debug2("Received snapshot response [{}], emitting signal [{}].", response, signal) + setup.onSignal(state.state, signal, catchAndLog = false) + case None => + setup.log.debug("Received snapshot response [{}], no signal emitted.", response) + } } Behaviors diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala index 3a8b6bcde9..4f897d5d54 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala @@ -7,34 +7,32 @@ package akka.persistence.typed.scaladsl import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior -import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.DeleteEventsCompleted import akka.persistence.typed.DeleteSnapshotsCompleted +import akka.persistence.typed.DeleteSnapshotsFailed import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventSourcedSignal import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed -import akka.persistence.typed.SnapshotMetadata import akka.persistence.typed.SnapshotSelectionCriteria import akka.serialization.jackson.CborSerializable import akka.util.unused import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import org.scalatest.Matchers import org.scalatest.WordSpecLike +import scala.concurrent.duration._ -object EventSourcedBehaviorRetentionSpec { +object EventSourcedBehaviorRetentionSpec extends Matchers { def conf: Config = ConfigFactory.parseString(s""" akka.loglevel = INFO @@ -51,61 +49,20 @@ object EventSourcedBehaviorRetentionSpec { final case class GetValue(replyTo: ActorRef[State]) extends Command final case object StopIt extends Command + final case class WrappedSignal(signal: EventSourcedSignal) + sealed trait Event extends CborSerializable final case class Incremented(delta: Int) extends Event final case class State(value: Int, history: Vector[Int]) extends CborSerializable - def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] = - Behaviors.setup(ctx => counter(ctx, persistenceId)) - - def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - probe = TestProbe[(State, Event)].ref, - snapshotProbe = TestProbe[Try[SnapshotMetadata]].ref, - retentionProbe = TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[SnapshotMetadata]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, probe, TestProbe[Try[SnapshotMetadata]].ref, TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithSnapshotProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probe: ActorRef[Try[SnapshotMetadata]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter( - ctx, - persistenceId, - TestProbe[(State, Event)].ref, - snapshotProbe = probe, - TestProbe[Try[EventSourcedSignal]].ref) - - def counterWithSnapshotAndRetentionProbe( - ctx: ActorContext[Command], - persistenceId: PersistenceId, - probeS: ActorRef[Try[SnapshotMetadata]], - probeR: ActorRef[Try[EventSourcedSignal]])( - implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, TestProbe[(State, Event)].ref, snapshotProbe = probeS, retentionProbe = probeR) - def counter( @unused ctx: ActorContext[Command], persistenceId: PersistenceId, - probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[SnapshotMetadata]], - retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = { + probe: Option[ActorRef[(State, Event)]] = None, + snapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None, + eventSignalProbe: Option[ActorRef[Try[EventSourcedSignal]]] = None) + : EventSourcedBehavior[Command, Event, State] = { EventSourcedBehavior[Command, Event, State]( persistenceId, emptyState = State(0, Vector.empty), @@ -128,18 +85,43 @@ object EventSourcedBehaviorRetentionSpec { eventHandler = (state, evt) => evt match { case Incremented(delta) => - probe ! ((state, evt)) + probe.foreach(_ ! ((state, evt))) State(state.value + delta, state.history :+ state.value) }).receiveSignal { case (_, RecoveryCompleted) => () - case (_, SnapshotCompleted(metadata)) => - snapshotProbe ! Success(metadata) - case (_, SnapshotFailed(_, failure)) => - snapshotProbe ! Failure(failure) + case (_, sc: SnapshotCompleted) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(sc)) + case (_, sf: SnapshotFailed) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(sf)) + case (_, dc: DeleteSnapshotsCompleted) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(dc)) + case (_, dsf: DeleteSnapshotsFailed) => + snapshotSignalProbe.foreach(_ ! WrappedSignal(dsf)) case (_, e: EventSourcedSignal) => - retentionProbe ! Success(e) + eventSignalProbe.foreach(_ ! Success(e)) } } + + implicit class WrappedSignalProbeAssert(probe: TestProbe[WrappedSignal]) { + def expectSnapshotCompleted(sequenceNumber: Int): SnapshotCompleted = { + val wrapped = probe.expectMessageType[WrappedSignal] + wrapped.signal shouldBe a[SnapshotCompleted] + val completed = wrapped.signal.asInstanceOf[SnapshotCompleted] + completed.metadata.sequenceNr should ===(sequenceNumber) + completed + } + + def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr: Long): DeleteSnapshotsCompleted = { + val wrapped = probe.expectMessageType[WrappedSignal] + wrapped.signal shouldBe a[DeleteSnapshotsCompleted] + val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted] + signal.target should ===( + DeletionTarget.Criteria( + SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr))) + signal + } + } + } class EventSourcedBehaviorRetentionSpec @@ -158,33 +140,35 @@ class EventSourcedBehaviorRetentionSpec val pid = nextPid() val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() c ! Increment c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) // no snapshot should have happened val probeC2 = TestProbe[(State, Event)]() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val snapshotProbe = createTestProbe[WrappedSignal]() + val c2 = spawn( Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC2.ref, snapshotProbe.ref) + counter(ctx, pid, probe = Some(probeC2.ref), snapshotSignalProbe = Some(snapshotProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) - val watchProbeC2 = watcher(c2) + c2 ! Increment - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) + snapshotProbe.expectSnapshotCompleted(2) c2 ! StopIt - watchProbeC2.expectMessage("Terminated") + watchProbe.expectTerminated(c2) val probeC3 = TestProbe[(State, Event)]() val c3 = spawn( Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC3.ref) + counter(ctx, pid, Some(probeC3.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) // this time it should have been snapshotted so no events to replay probeC3.expectNoMessage() @@ -194,27 +178,27 @@ class EventSourcedBehaviorRetentionSpec "snapshot every N sequence nrs when persisting multiple events" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() val c = spawn( Behaviors.setup[Command](ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref) + counter(ctx, pid, None, Some(snapshotSignalProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) - val watchProbe = watcher(c) val replyProbe = TestProbe[State]() c ! IncrementWithPersistAll(3) c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) // snapshot at seqNr 3 because of persistAll - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotSignalProbe.expectSnapshotCompleted(3) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) val probeC2 = TestProbe[(State, Event)]() val c2 = spawn( Behaviors.setup[Command](ctx => - counterWithProbe(ctx, pid, probeC2.ref) + counter(ctx, pid, Some(probeC2.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 2)))) probeC2.expectNoMessage() c2 ! GetValue(replyProbe.ref) @@ -223,26 +207,26 @@ class EventSourcedBehaviorRetentionSpec "snapshot via predicate" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]] + val snapshotSignalProbe = TestProbe[WrappedSignal] val alwaysSnapshot: Behavior[Command] = Behaviors.setup { ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (_, _, _) => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)).snapshotWhen { (_, _, _) => true } } val c = spawn(alwaysSnapshot) - val watchProbe = watcher(c) val replyProbe = TestProbe[State]() c ! Increment - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) + snapshotSignalProbe.expectSnapshotCompleted(1) c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) val probe = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probe.ref))) + val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probe.ref)))) // state should be rebuilt from snapshot, no events replayed // Fails as snapshot is async (i think) probe.expectNoMessage() @@ -253,13 +237,13 @@ class EventSourcedBehaviorRetentionSpec "check all events for snapshot in PersistAll" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]] + val snapshotSignalProbe = TestProbe[WrappedSignal] val snapshotAtTwo = Behaviors.setup[Command](ctx => - counterWithSnapshotProbe(ctx, pid, snapshotProbe.ref).snapshotWhen { (s, _, _) => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)).snapshotWhen { (s, _, _) => s.value == 2 }) val c: ActorRef[Command] = spawn(snapshotAtTwo) - val watchProbe = watcher(c) + val replyProbe = TestProbe[State]() c ! IncrementWithPersistAll(3) @@ -267,293 +251,309 @@ class EventSourcedBehaviorRetentionSpec c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) // snapshot at seqNr 3 because of persistAll - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) + snapshotSignalProbe.expectSnapshotCompleted(3) c ! StopIt - watchProbe.expectMessage("Terminated") + val watchProbe = TestProbe() + watchProbe.expectTerminated(c) val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(Behaviors.setup[Command](ctx => counterWithProbe(ctx, pid, probeC2.ref))) + val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probeC2.ref)))) // middle event triggered all to be snapshot probeC2.expectNoMessage() c2 ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) } - def expectDeleteSnapshotCompleted( - retentionProbe: TestProbe[Try[EventSourcedSignal]], - maxSequenceNr: Long, - minSequenceNr: Long): Unit = { - retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value should ===( - DeleteSnapshotsCompleted(DeletionTarget.Criteria( - SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))) - } - "delete snapshots automatically, based on criteria" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() val replyProbe = TestProbe[State]() val persistentActor = spawn( Behaviors.setup[Command](ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2)))) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(6) + snapshotSignalProbe.expectSnapshotCompleted(9) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(20, (0 until 20).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) - expectDeleteSnapshotCompleted(retentionProbe, 6, 0) - expectDeleteSnapshotCompleted(retentionProbe, 9, 3) - expectDeleteSnapshotCompleted(retentionProbe, 12, 6) + snapshotSignalProbe.expectSnapshotCompleted(12) + snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0) + snapshotSignalProbe.expectSnapshotCompleted(15) + snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3) + snapshotSignalProbe.expectSnapshotCompleted(18) + snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() } "optionally delete both old events and snapshots" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() val replyProbe = TestProbe[State]() - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( - // tests the Java API as well - RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot))) + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .withRetention( + // tests the Java API as well + RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot))) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(6) + snapshotSignalProbe.expectSnapshotCompleted(9) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 // Note that when triggering deletion of snapshots from deletion of events it is intentionally "off by one". // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without // starting at the snapshot at toSequenceNr would be invalid. - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(20, (0 until 20).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(18) + // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering + // if sending many commands in one go is not deterministic + persistentActor ! Increment // 11 + persistentActor ! Increment // 12 + snapshotSignalProbe.expectSnapshotCompleted(12) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + persistentActor ! Increment // 13 + persistentActor ! Increment // 14 + persistentActor ! Increment // 11 + persistentActor ! Increment // 15 + snapshotSignalProbe.expectSnapshotCompleted(15) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 + snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 - expectDeleteSnapshotCompleted(retentionProbe, 8, 2) + persistentActor ! Increment // 16 + persistentActor ! Increment // 17 + persistentActor ! Increment // 18 + snapshotSignalProbe.expectSnapshotCompleted(18) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 - expectDeleteSnapshotCompleted(retentionProbe, 11, 5) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 + snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5) - retentionProbe.expectNoMessage() + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() } "be possible to combine snapshotWhen and retention criteria" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() val replyProbe = TestProbe[State]() - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) + .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) (1 to 3).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, (0 until 3).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectSnapshotCompleted(3) + eventProbe.expectNoMessage() (4 to 10).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) + snapshotSignalProbe.expectSnapshotCompleted(5) + snapshotSignalProbe.expectSnapshotCompleted(10) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) (11 to 13).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13) + snapshotSignalProbe.expectSnapshotCompleted(13) // no deletes triggered by snapshotWhen - retentionProbe.expectNoMessage() + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } (14 to 16).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(16, (0 until 16).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(15) - expectDeleteSnapshotCompleted(retentionProbe, 10, 5) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectSnapshotCompleted(15) + snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5) + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } } "be possible to combine snapshotWhen and retention criteria withDeleteEventsOnSnapshot" in { val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() val replyProbe = TestProbe[State]() - val persistentActor = spawn( - Behaviors.setup[Command]( - ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) - .withRetention( - RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) + .withRetention( + RetentionCriteria.snapshotEvery(numberOfEvents = 2, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) (1 to 3).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, (0 until 3).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - retentionProbe.expectNoMessage() + snapshotSignalProbe.expectSnapshotCompleted(2) // every-2 through criteria + snapshotSignalProbe.expectSnapshotCompleted(3) // snapshotWhen + // no event deletes or snapshot deletes after snapshotWhen + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } - (4 to 10).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 - expectDeleteSnapshotCompleted(retentionProbe, 1, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering + // if sending many commands in one go is not deterministic + persistentActor ! Increment // 4 + snapshotSignalProbe.expectSnapshotCompleted(4) // every-2 through criteria + persistentActor ! Increment // 5 + persistentActor ! Increment // 6 + snapshotSignalProbe.expectSnapshotCompleted(6) // every-2 through criteria - (11 to 13).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(12) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(13) + persistentActor ! Increment // 7 + persistentActor ! Increment // 8 + snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through criteria + // triggers delete up to snapshot no 2 + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 + snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot + + persistentActor ! Increment // 9 + persistentActor ! Increment // 10 + snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through criteria + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 + + persistentActor ! Increment // 11 + persistentActor ! Increment // 12 + snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through criteria + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + + persistentActor ! Increment // 13 + snapshotSignalProbe.expectSnapshotCompleted(13) // snapshotWhen // no deletes triggered by snapshotWhen - retentionProbe.expectNoMessage() + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } - (14 to 16).foreach(_ => persistentActor ! Increment) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(14) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8 - expectDeleteSnapshotCompleted(retentionProbe, 7, 1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(16) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10 - expectDeleteSnapshotCompleted(retentionProbe, 9, 3) - retentionProbe.expectNoMessage() + persistentActor ! Increment // 14 + snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through criteria + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8 + snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1) + + persistentActor ! Increment // 15 + persistentActor ! Increment // 16 + snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through criteria + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10 + snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3) + + eventProbe.within(3.seconds) { + eventProbe.expectNoMessage() + snapshotSignalProbe.expectNoMessage() + } } "be possible to snapshot every event" in { // very bad idea to snapshot every event, but technically possible val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() val replyProbe = TestProbe[State]() val persistentActor = spawn( Behaviors.setup[Command](ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3)))) (1 to 10).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) - expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + snapshotSignalProbe.expectSnapshotCompleted(1) + snapshotSignalProbe.expectSnapshotCompleted(2) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(4) + snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + snapshotSignalProbe.expectSnapshotCompleted(5) + snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + snapshotSignalProbe.expectSnapshotCompleted(6) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7) - expectDeleteSnapshotCompleted(retentionProbe, 4, 1) + snapshotSignalProbe.expectSnapshotCompleted(7) + snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) - expectDeleteSnapshotCompleted(retentionProbe, 5, 2) + snapshotSignalProbe.expectSnapshotCompleted(8) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - expectDeleteSnapshotCompleted(retentionProbe, 6, 3) + snapshotSignalProbe.expectSnapshotCompleted(9) + snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - expectDeleteSnapshotCompleted(retentionProbe, 7, 4) + snapshotSignalProbe.expectSnapshotCompleted(10) + snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4) } "be possible to snapshot every event withDeleteEventsOnSnapshot" in { // very bad idea to snapshot every event, but technically possible val pid = nextPid() - val snapshotProbe = TestProbe[Try[SnapshotMetadata]]() - val retentionProbe = TestProbe[Try[EventSourcedSignal]]() - val replyProbe = TestProbe[State]() + val snapshotSignalProbe = TestProbe[WrappedSignal]() + val eventProbe = TestProbe[Try[EventSourcedSignal]]() - val persistentActor = spawn( - Behaviors.setup[Command](ctx => - counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( + val persistentActor = spawn(Behaviors.setup[Command](ctx => + counter(ctx, pid, snapshotSignalProbe = Some(snapshotSignalProbe.ref), eventSignalProbe = Some(eventProbe.ref)) + .withRetention( RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) - (1 to 10).foreach(_ => persistentActor ! Increment) - persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(10, (0 until 10).toVector)) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(3) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(4) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1 + // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering + // if sending many commands in one go is not deterministic + (1 to 4).foreach(_ => persistentActor ! Increment) + snapshotSignalProbe.expectSnapshotCompleted(1) + snapshotSignalProbe.expectSnapshotCompleted(2) + snapshotSignalProbe.expectSnapshotCompleted(3) + snapshotSignalProbe.expectSnapshotCompleted(4) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 1 - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(5) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 - expectDeleteSnapshotCompleted(retentionProbe, 1, 0) + persistentActor ! Increment // 5 + snapshotSignalProbe.expectSnapshotCompleted(5) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 + snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(6) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 - expectDeleteSnapshotCompleted(retentionProbe, 2, 0) + persistentActor ! Increment // 6 + snapshotSignalProbe.expectSnapshotCompleted(6) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 + snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(7) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 - expectDeleteSnapshotCompleted(retentionProbe, 3, 0) + persistentActor ! Increment // 7 + snapshotSignalProbe.expectSnapshotCompleted(7) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 + snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(8) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5 - expectDeleteSnapshotCompleted(retentionProbe, 4, 1) + persistentActor ! Increment // 8 + snapshotSignalProbe.expectSnapshotCompleted(8) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5 + snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(9) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - expectDeleteSnapshotCompleted(retentionProbe, 5, 2) + persistentActor ! Increment // 9 + snapshotSignalProbe.expectSnapshotCompleted(9) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 + snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2) - snapshotProbe.expectMessageType[Success[SnapshotMetadata]].value.sequenceNr should ===(10) - retentionProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7 - expectDeleteSnapshotCompleted(retentionProbe, 6, 3) + persistentActor ! Increment // 10 + snapshotSignalProbe.expectSnapshotCompleted(10) + eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7 + snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) } - def watcher(toWatch: ActorRef[_]): TestProbe[String] = { - val probe = TestProbe[String]() - val w = Behaviors.setup[Any] { ctx => - ctx.watch(toWatch) - Behaviors - .receive[Any] { (_, _) => - Behaviors.same - } - .receiveSignal { - case (_, _: Terminated) => - probe.ref ! "Terminated" - Behaviors.stopped - } - } - spawn(w) - probe - } } }