diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index c1386ef524..e4e429f636 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -369,7 +369,7 @@ private[akka] object Running { // # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess. setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta) if (setup.retention.deleteEventsOnSnapshot) - internalDeleteEvents(e, state) // if successful, DeleteMessagesSuccess then internalDeleteSnapshots + internalDeleteEvents(e, state) else internalDeleteSnapshots(meta.sequenceNr) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index c629d272aa..87c53d3769 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -752,6 +752,7 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh "delete snapshots automatically, based on criteria" in { val unexpected = (signal: EventSourcedSignal) => fail(s"Unexpected signal [$signal].") + val snapshotEvery = 2 val pid = nextPid val snapshotProbe = TestProbe[Try[Done]]() val retentionProbe = TestProbe[Try[EventSourcedSignal]]() @@ -761,35 +762,35 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh Behaviors.setup[Command]( ctx ⇒ counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .snapshotEvery(2) - .withRetention( - RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = false)))) + .snapshotEvery(snapshotEvery) + .withRetention(RetentionCriteria(snapshotEveryNEvents = snapshotEvery, keepNSnapshots = 2)))) - persistentActor ! IncrementWithPersistAll(3) + persistentActor ! IncrementWithPersistAll(10) persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + replyProbe.expectMessage(State(10, (0 until 10).toVector)) snapshotProbe.expectMessage(Try(Done)) retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match { case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) => - maxSequenceNr shouldEqual 2 - minSequenceNr shouldEqual 0 + maxSequenceNr shouldEqual 9 + minSequenceNr shouldEqual 5 case signal => unexpected(signal) } - persistentActor ! IncrementWithPersistAll(3) + persistentActor ! IncrementWithPersistAll(10) snapshotProbe.expectMessage(Try(Done)) retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match { case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) => - maxSequenceNr shouldEqual 5 - minSequenceNr shouldEqual 1 + maxSequenceNr shouldEqual 19 + minSequenceNr shouldEqual 15 case signal => unexpected(signal) } persistentActor ! GetValue(replyProbe.ref) - replyProbe.expectMessage(State(6, Vector(0, 1, 2, 3, 4, 5))) + replyProbe.expectMessage(State(20, (0 until 20).toVector)) } "optionally delete both old messages and snapshots" in { + val snapshotEvery = 2 val pid = nextPid val snapshotProbe = TestProbe[Try[Done]]() val retentionProbe = TestProbe[Try[EventSourcedSignal]]() @@ -799,9 +800,9 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh Behaviors.setup[Command]( ctx ⇒ counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) - .snapshotEvery(2) + .snapshotEvery(snapshotEvery) .withRetention( - RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = true)))) + RetentionCriteria(snapshotEveryNEvents = snapshotEvery, keepNSnapshots = 2, deleteEventsOnSnapshot = true)))) persistentActor ! IncrementWithPersistAll(10) persistentActor ! GetValue(replyProbe.ref) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 493cd70355..4bb253517e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -1440,7 +1440,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi } val probes = Vector.fill(10)(TestProbe()) - (probes.zip(commands)).foreach { + probes.zip(commands).foreach { case (p, c) => persistentActor.tell(c, p.ref) } @@ -1530,7 +1530,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("a-2") expectMsg("d-3") expectMsg("d-4") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } test(deferringAsyncWithPersistActor) @@ -1543,7 +1543,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("pa-a-2") expectMsg("d-a-3") expectMsg("d-a-4") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } test(deferringAsyncWithAsyncPersistActor) @@ -1569,7 +1569,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi p2.expectMsg("pa-b-5") p2.expectMsg("d-b-6") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } test(deferringAsyncMixedCallsPPADDPADPersistActor) @@ -1581,7 +1581,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("d-1") expectMsg("d-2") expectMsg("d-3") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } test(deferringAsyncWithNoPersistCallsPersistActor) @@ -1602,7 +1602,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi p2.expectMsg("pa-b-2") p2.expectMsg("d-b-3") p2.expectMsg("d-b-4") - expectNoMsg(100.millis) + expectNoMessage(100.millis) } test(deferringAsyncWithAsyncPersistActor) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index fb7557691c..c98158e5d4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -124,7 +124,7 @@ class SnapshotFailureRobustnessSpec TestEvent.Mute(EventFilter[java.io.NotSerializableException](start = "Error loading snapshot"))) system.eventStream.subscribe(testActor, classOf[Logging.Error]) try { - val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor)) expectMsgType[Logging.Error].message.toString should startWith("Error loading snapshot") expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) => @@ -133,7 +133,7 @@ class SnapshotFailureRobustnessSpec } expectMsg("kablama-2") expectMsg(RecoveryCompleted) - expectNoMsg(1 second) + expectNoMessage(1 second) } finally { system.eventStream.unsubscribe(testActor, classOf[Logging.Error]) system.eventStream.publish(TestEvent.UnMute(EventFilter.error(start = "Error loading snapshot ["))) @@ -182,7 +182,7 @@ class SnapshotFailureRobustnessSpec expectMsg(1) p ! DeleteSnapshot(1) expectMsgPF() { - case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, timestamp), cause) => + case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, _), cause) => // ok, expected failure cause.getMessage should include("Failed to delete") } @@ -199,7 +199,7 @@ class SnapshotFailureRobustnessSpec val criteria = SnapshotSelectionCriteria(maxSequenceNr = 10) p ! DeleteSnapshots(criteria) expectMsgPF() { - case DeleteSnapshotsFailure(criteria, cause) => + case DeleteSnapshotsFailure(_, cause) => // ok, expected failure cause.getMessage should include("Failed to delete") }