harden ClusterShardingPersistenceSpec by using onPostStop, #26230

This commit is contained in:
Patrik Nordwall 2019-02-18 17:23:24 +01:00
parent d82115988b
commit abfb4497e7

View file

@ -61,7 +61,7 @@ object ClusterShardingPersistenceSpec {
val typeKey = EntityTypeKey[Command]("test")
val recoveryCompletedProbes = new ConcurrentHashMap[String, ActorRef[String]]
val lifecycleProbes = new ConcurrentHashMap[String, ActorRef[String]]
// Need this to be able to send the PoisonPill from the outside, simulating rebalance before recovery and such.
// Promise completed by the actor when it's started.
@ -128,11 +128,17 @@ object ClusterShardingPersistenceSpec {
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
.onRecoveryCompleted { state
ctx.log.debug("onRecoveryCompleted: [{}]", state)
recoveryCompletedProbes.get(entityId) match {
case null ctx.log.debug("no recoveryCompletedProbe for [{}]", entityId)
lifecycleProbes.get(entityId) match {
case null ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId)
case p p ! s"recoveryCompleted:$state"
}
}
.onPostStop(()
lifecycleProbes.get(entityId) match {
case null ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId)
case p p ! "stopped"
}
)
}
}
@ -201,16 +207,17 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
"handle PoisonPill after persist effect" in {
val entityId = nextEntityId()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val p1 = TestProbe[Done]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
(1 to 10).foreach { n
ref ! PassivateAndPersist(n.toString)(p1.ref)
// recoveryCompleted each time, verifies that it was actually stopped
recoveryProbe.expectMessage(max = 10.seconds, "recoveryCompleted:" + (1 until n).map(_.toString).mkString("|"))
lifecycleProbe.expectMessage(max = 10.seconds, "recoveryCompleted:" + (1 until n).map(_.toString).mkString("|"))
p1.expectMessage(Done)
lifecycleProbe.expectMessage("stopped")
}
val p2 = TestProbe[String]()
@ -224,14 +231,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
entityActorRefs.put(entityId, actorRefPromise)
val addProbe = TestProbe[Done]()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
recoveryProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("recoveryCompleted:")
// now we know that it's in EventSourcedRunning with no stashed commands
val actorRef = actorRefPromise.future.futureValue
@ -254,13 +261,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
addProbe.expectMessage(Done)
addProbe.expectMessage(Done)
lifecycleProbe.expectMessage("stopped")
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
val p2 = TestProbe[String]()
entityRef ! AddWithConfirmation("f")(addProbe.ref)
entityRef ! Get(p2.ref)
recoveryProbe.expectMessage("recoveryCompleted:a|b|c")
lifecycleProbe.expectMessage("recoveryCompleted:a|b|c")
p2.expectMessage(entityId + ":a|b|c|f")
}
@ -271,14 +279,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
val addProbe = TestProbe[Done]()
val echoProbe = TestProbe[String]()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
recoveryProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("recoveryCompleted:")
// now we know that it's in EventSourcedRunning with no stashed commands
val actorRef = actorRefPromise.future.futureValue
@ -304,6 +312,7 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
echoProbe.expectMessage("echo-1")
addProbe.expectMessage(Done)
addProbe.expectMessage(Done)
lifecycleProbe.expectMessage("stopped")
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
@ -312,7 +321,7 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
echoProbe.expectMessage("echo-4")
entityRef ! AddWithConfirmation("f")(addProbe.ref)
entityRef ! Get(p2.ref)
recoveryProbe.expectMessage("recoveryCompleted:a|b|c")
lifecycleProbe.expectMessage("recoveryCompleted:a|b|c")
p2.expectMessage(entityId + ":a|b|c|f")
}
@ -322,14 +331,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
entityActorRefs.put(entityId, actorRefPromise)
val addProbe = TestProbe[Done]()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
recoveryProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("recoveryCompleted:")
// now we know that it's in EventSourcedRunning with no stashed commands
val actorRef = actorRefPromise.future.futureValue
@ -343,12 +352,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
// now we have enqueued the message sequence and start processing them
latch.countDown()
lifecycleProbe.expectMessage("stopped")
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
val p2 = TestProbe[String]()
entityRef ! AddWithConfirmation("c")(addProbe.ref)
entityRef ! Get(p2.ref)
recoveryProbe.expectMessage("recoveryCompleted:a")
lifecycleProbe.expectMessage("recoveryCompleted:a")
p2.expectMessage(entityId + ":a|c")
}
@ -357,8 +368,8 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ignoreFirstEchoProbe = TestProbe[String]()
@ -377,13 +388,14 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
}
val actorRef = poisonSent.futureValue
recoveryProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("stopped")
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
entityRef ! Echo("echo-2", echoProbe.ref)
echoProbe.expectMessage("echo-2")
recoveryProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("recoveryCompleted:")
}
"handle PoisonPill before recovery completed with stashed commands" in {
@ -391,8 +403,8 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val addProbe = TestProbe[Done]()
@ -422,22 +434,26 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
}
val actorRef = poisonSent.futureValue
recoveryProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("recoveryCompleted:")
echoProbe.expectMessage("echo-2")
echoProbe.expectMessage("echo-3")
addProbe.expectMessage(Done)
addProbe.expectMessage(Done)
lifecycleProbe.expectMessage("stopped")
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
entityRef ! Echo("echo-5", echoProbe.ref)
echoProbe.expectMessage("echo-5")
recoveryProbe.expectMessage("recoveryCompleted:a|b")
lifecycleProbe.expectMessage("recoveryCompleted:a|b")
}
"handle PoisonPill before UnstashAll from user stash" in {
val entityId = nextEntityId()
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val p1 = TestProbe[Done]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
ref ! Add("1")
@ -448,6 +464,8 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
ref ! PassivateAndPersist("5")(p1.ref)
p1.receiveMessage()
lifecycleProbe.expectMessage("recoveryCompleted:")
lifecycleProbe.expectMessage("stopped")
ref ! Add("6")
// user was stash discarded, i.e. 3 and 4 not handled
@ -460,12 +478,12 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
"handle PoisonPill after UnstashAll from user stash" in {
val entityId = nextEntityId()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
ref ! Add("1")
recoveryProbe.expectMessage(max = 10.seconds, "recoveryCompleted:")
lifecycleProbe.expectMessage(max = 10.seconds, "recoveryCompleted:")
ref ! BeginStashingAddCommands
ref ! Add("2")
ref ! Add("3")
@ -473,10 +491,11 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
ref ! UnstashAllAndPassivate
val probe = TestProbe[String]()
recoveryProbe.awaitAssert {
val expected = (1 to 4).map(_.toString).mkString("|")
lifecycleProbe.awaitAssert {
ref ! Get(probe.ref)
recoveryProbe.expectMessage(max = 1.second, "recoveryCompleted:" + (1 to 4).map(_.toString).mkString("|"))
probe.expectMessage(entityId + ":" + (1 to 4).map(_.toString).mkString("|"))
lifecycleProbe.expectMessage(max = 1.second, "recoveryCompleted:" + expected)
probe.expectMessage(entityId + ":" + expected)
}
}