From 055e0c5406a596219d7a29d75cadffc0f74dd4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 7 Oct 2021 14:35:20 +0200 Subject: [PATCH] fix: Race condition in sharding SupervisionSpec #30633 (#30751) --- .../akka/cluster/sharding/SupervisionSpec.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala index 2c73ea7d6e..7e99ce149d 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala @@ -27,7 +27,7 @@ object SupervisionSpec { """) case class Msg(id: Long, msg: Any) - case class Response(self: ActorRef) + case class Response(self: ActorRef, parentSupervisor: ActorRef) case object StopMessage val idExtractor: ShardRegion.ExtractEntityId = { @@ -59,7 +59,7 @@ object SupervisionSpec { // note that this means the StopMessage will go to dead letters context.stop(self) case "hello" => - sender() ! Response(self) + sender() ! Response(self, context.parent) case StopMessage => // note that we never see this because we stop early log.info("Received stop from region") @@ -97,13 +97,15 @@ class DeprecatedSupervisionSpec extends AkkaSpec(SupervisionSpec.config) with Im region ! Msg(10, "hello") val response = expectMsgType[Response](5.seconds) - watch(response.self) + watch(response.parentSupervisor) // We need the shard to have observed the passivation for this test but // we don't know that this means the passivation reached the shard yet unless we observe it EventFilter.debug("passy: Passivation started for [10]", occurrences = 1).intercept { region ! Msg(10, "passivate") - expectTerminated(response.self) + // if we'd only wait for the child to stop there is a race where the message is delivered to the child + // before it abruptly stops with the message in its inbox + expectTerminated(response.parentSupervisor) } // This would fail before as sharded actor would be stuck passivating @@ -141,7 +143,7 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend region ! Msg(10, "hello") val response = expectMsgType[Response](5.seconds) - watch(response.self) + watch(response.parentSupervisor) // 1. as soon as the PassivatingActor receives "passivate" the child sends Passivate(StopMessage) to its // backoff supervisor parent and then stops itself @@ -154,7 +156,9 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend // stop message `StopMessage` comes in from the shard it will stop itself // 4. when the supervisor stops the shard should start it anew and deliver the buffered messages region ! Msg(10, "passivate") - expectTerminated(response.self) + // if we'd only wait for the child to stop there is a race where the message is delivered to the child + // before it abruptly stops with the message in its inbox + expectTerminated(response.parentSupervisor) // Another race: now the shard either saw the entity terminating already and will // restart it as soon as it gets a message for it or has not yet seen it and will buffer the message