From abfb4497e70b851570aebb6512c2dbcb7de78ef7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 18 Feb 2019 17:23:24 +0100 Subject: [PATCH] harden ClusterShardingPersistenceSpec by using onPostStop, #26230 --- .../ClusterShardingPersistenceSpec.scala | 85 ++++++++++++------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 86bb9afb8c..6212249855 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -61,7 +61,7 @@ object ClusterShardingPersistenceSpec { val typeKey = EntityTypeKey[Command]("test") - val recoveryCompletedProbes = new ConcurrentHashMap[String, ActorRef[String]] + val lifecycleProbes = new ConcurrentHashMap[String, ActorRef[String]] // Need this to be able to send the PoisonPill from the outside, simulating rebalance before recovery and such. // Promise completed by the actor when it's started. @@ -128,11 +128,17 @@ object ClusterShardingPersistenceSpec { eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) .onRecoveryCompleted { state ⇒ ctx.log.debug("onRecoveryCompleted: [{}]", state) - recoveryCompletedProbes.get(entityId) match { - case null ⇒ ctx.log.debug("no recoveryCompletedProbe for [{}]", entityId) + lifecycleProbes.get(entityId) match { + case null ⇒ ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId) case p ⇒ p ! s"recoveryCompleted:$state" } } + .onPostStop(() ⇒ + lifecycleProbes.get(entityId) match { + case null ⇒ ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId) + case p ⇒ p ! "stopped" + } + ) } } @@ -201,16 +207,17 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh "handle PoisonPill after persist effect" in { val entityId = nextEntityId() - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val p1 = TestProbe[Done]() val ref = ClusterSharding(system).entityRefFor(typeKey, entityId) + (1 to 10).foreach { n ⇒ ref ! PassivateAndPersist(n.toString)(p1.ref) - // recoveryCompleted each time, verifies that it was actually stopped - recoveryProbe.expectMessage(max = 10.seconds, "recoveryCompleted:" + (1 until n).map(_.toString).mkString("|")) + lifecycleProbe.expectMessage(max = 10.seconds, "recoveryCompleted:" + (1 until n).map(_.toString).mkString("|")) p1.expectMessage(Done) + lifecycleProbe.expectMessage("stopped") } val p2 = TestProbe[String]() @@ -224,14 +231,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh entityActorRefs.put(entityId, actorRefPromise) val addProbe = TestProbe[Done]() - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) // this will wakeup the entity, and complete the entityActorRefPromise entityRef ! AddWithConfirmation("a")(addProbe.ref) addProbe.expectMessage(Done) - recoveryProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("recoveryCompleted:") // now we know that it's in EventSourcedRunning with no stashed commands val actorRef = actorRefPromise.future.futureValue @@ -254,13 +261,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh addProbe.expectMessage(Done) addProbe.expectMessage(Done) + lifecycleProbe.expectMessage("stopped") // wake up again awaitEntityTerminatedAndRemoved(actorRef, entityId) val p2 = TestProbe[String]() entityRef ! AddWithConfirmation("f")(addProbe.ref) entityRef ! Get(p2.ref) - recoveryProbe.expectMessage("recoveryCompleted:a|b|c") + lifecycleProbe.expectMessage("recoveryCompleted:a|b|c") p2.expectMessage(entityId + ":a|b|c|f") } @@ -271,14 +279,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh val addProbe = TestProbe[Done]() val echoProbe = TestProbe[String]() - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) // this will wakeup the entity, and complete the entityActorRefPromise entityRef ! AddWithConfirmation("a")(addProbe.ref) addProbe.expectMessage(Done) - recoveryProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("recoveryCompleted:") // now we know that it's in EventSourcedRunning with no stashed commands val actorRef = actorRefPromise.future.futureValue @@ -304,6 +312,7 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh echoProbe.expectMessage("echo-1") addProbe.expectMessage(Done) addProbe.expectMessage(Done) + lifecycleProbe.expectMessage("stopped") // wake up again awaitEntityTerminatedAndRemoved(actorRef, entityId) @@ -312,7 +321,7 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh echoProbe.expectMessage("echo-4") entityRef ! AddWithConfirmation("f")(addProbe.ref) entityRef ! Get(p2.ref) - recoveryProbe.expectMessage("recoveryCompleted:a|b|c") + lifecycleProbe.expectMessage("recoveryCompleted:a|b|c") p2.expectMessage(entityId + ":a|b|c|f") } @@ -322,14 +331,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh entityActorRefs.put(entityId, actorRefPromise) val addProbe = TestProbe[Done]() - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) // this will wakeup the entity, and complete the entityActorRefPromise entityRef ! AddWithConfirmation("a")(addProbe.ref) addProbe.expectMessage(Done) - recoveryProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("recoveryCompleted:") // now we know that it's in EventSourcedRunning with no stashed commands val actorRef = actorRefPromise.future.futureValue @@ -343,12 +352,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh // now we have enqueued the message sequence and start processing them latch.countDown() + lifecycleProbe.expectMessage("stopped") + // wake up again awaitEntityTerminatedAndRemoved(actorRef, entityId) val p2 = TestProbe[String]() entityRef ! AddWithConfirmation("c")(addProbe.ref) entityRef ! Get(p2.ref) - recoveryProbe.expectMessage("recoveryCompleted:a") + lifecycleProbe.expectMessage("recoveryCompleted:a") p2.expectMessage(entityId + ":a|c") } @@ -357,8 +368,8 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh val actorRefPromise = Promise[ActorRef[Any]]() entityActorRefs.put(entityId, actorRefPromise) - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) val ignoreFirstEchoProbe = TestProbe[String]() @@ -377,13 +388,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh } val actorRef = poisonSent.futureValue - recoveryProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("stopped") // wake up again awaitEntityTerminatedAndRemoved(actorRef, entityId) entityRef ! Echo("echo-2", echoProbe.ref) echoProbe.expectMessage("echo-2") - recoveryProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("recoveryCompleted:") } "handle PoisonPill before recovery completed with stashed commands" in { @@ -391,8 +403,8 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh val actorRefPromise = Promise[ActorRef[Any]]() entityActorRefs.put(entityId, actorRefPromise) - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) val addProbe = TestProbe[Done]() @@ -422,22 +434,26 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh } val actorRef = poisonSent.futureValue - recoveryProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("recoveryCompleted:") echoProbe.expectMessage("echo-2") echoProbe.expectMessage("echo-3") addProbe.expectMessage(Done) addProbe.expectMessage(Done) + lifecycleProbe.expectMessage("stopped") // wake up again awaitEntityTerminatedAndRemoved(actorRef, entityId) entityRef ! Echo("echo-5", echoProbe.ref) echoProbe.expectMessage("echo-5") - recoveryProbe.expectMessage("recoveryCompleted:a|b") + lifecycleProbe.expectMessage("recoveryCompleted:a|b") } "handle PoisonPill before UnstashAll from user stash" in { val entityId = nextEntityId() + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) + val p1 = TestProbe[Done]() val ref = ClusterSharding(system).entityRefFor(typeKey, entityId) ref ! Add("1") @@ -448,6 +464,8 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh ref ! PassivateAndPersist("5")(p1.ref) p1.receiveMessage() + lifecycleProbe.expectMessage("recoveryCompleted:") + lifecycleProbe.expectMessage("stopped") ref ! Add("6") // user was stash discarded, i.e. 3 and 4 not handled @@ -460,12 +478,12 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh "handle PoisonPill after UnstashAll from user stash" in { val entityId = nextEntityId() - val recoveryProbe = TestProbe[String]() - recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + val lifecycleProbe = TestProbe[String]() + lifecycleProbes.put(entityId, lifecycleProbe.ref) val ref = ClusterSharding(system).entityRefFor(typeKey, entityId) ref ! Add("1") - recoveryProbe.expectMessage(max = 10.seconds, "recoveryCompleted:") + lifecycleProbe.expectMessage(max = 10.seconds, "recoveryCompleted:") ref ! BeginStashingAddCommands ref ! Add("2") ref ! Add("3") @@ -473,10 +491,11 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh ref ! UnstashAllAndPassivate val probe = TestProbe[String]() - recoveryProbe.awaitAssert { + val expected = (1 to 4).map(_.toString).mkString("|") + lifecycleProbe.awaitAssert { ref ! Get(probe.ref) - recoveryProbe.expectMessage(max = 1.second, "recoveryCompleted:" + (1 to 4).map(_.toString).mkString("|")) - probe.expectMessage(entityId + ":" + (1 to 4).map(_.toString).mkString("|")) + lifecycleProbe.expectMessage(max = 1.second, "recoveryCompleted:" + expected) + probe.expectMessage(entityId + ":" + expected) } }