From fa3da328be1844bb2e3488b6480a7ab136f2048c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 23 Nov 2017 08:48:38 +0100 Subject: [PATCH] Run all CoordinatedShutdown phases also when downing, #24048 --- .../akka/cluster/sharding/ShardRegion.scala | 8 +- .../CoordinatedShutdownShardingSpec.scala | 151 ++++++++++++++++++ .../sharding/GetShardTypeNamesSpec.scala | 2 + .../singleton/ClusterSingletonManager.scala | 29 +++- .../scala/akka/cluster/ClusterDaemon.scala | 7 +- 5 files changed, 183 insertions(+), 14 deletions(-) create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 20051cdba8..4390955882 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -406,8 +406,12 @@ private[akka] class ShardRegion( CoordinatedShutdown(context.system).addTask( CoordinatedShutdown.PhaseClusterShardingShutdownRegion, "region-shutdown") { () ⇒ - self ! GracefulShutdown - gracefulShutdownProgress.future + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) { + Future.successful(Done) + } else { + self ! GracefulShutdown + gracefulShutdownProgress.future + } } // subscribe to MemberEvent, re-subscribe when restart diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala new file mode 100644 index 0000000000..e0605044de --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.Future + +import scala.concurrent.duration._ +import akka.Done +import akka.actor.ActorSystem +import akka.actor.CoordinatedShutdown +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.AkkaSpec +import akka.testkit.TestActors.EchoActor +import akka.testkit.TestProbe + +object CoordinatedShutdownShardingSpec { + val config = + """ + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + """ + + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg: Int ⇒ (msg.toString, msg) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case msg: Int ⇒ (msg % 10).toString + } +} + +class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) { + import CoordinatedShutdownShardingSpec._ + + val sys1 = ActorSystem(system.name, system.settings.config) + val sys2 = ActorSystem(system.name, system.settings.config) + val sys3 = system + + val region1 = ClusterSharding(sys1).start("type1", Props[EchoActor](), ClusterShardingSettings(sys1), + extractEntityId, extractShardId) + val region2 = ClusterSharding(sys2).start("type1", Props[EchoActor](), ClusterShardingSettings(sys2), + extractEntityId, extractShardId) + val region3 = ClusterSharding(sys3).start("type1", Props[EchoActor](), ClusterShardingSettings(sys3), + extractEntityId, extractShardId) + + val probe1 = TestProbe()(sys1) + val probe2 = TestProbe()(sys2) + val probe3 = TestProbe()(sys3) + + CoordinatedShutdown(sys1).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ + probe1.ref ! "CS-unbind-1" + Future.successful(Done) + } + CoordinatedShutdown(sys2).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ + probe2.ref ! "CS-unbind-2" + Future.successful(Done) + } + CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ + probe1.ref ! "CS-unbind-3" + Future.successful(Done) + } + + override def beforeTermination(): Unit = { + shutdown(sys1) + shutdown(sys2) + } + + def pingEntities(): Unit = { + region3.tell(1, probe3.ref) + probe3.expectMsg(10.seconds, 1) + region3.tell(2, probe3.ref) + probe3.expectMsg(2) + region3.tell(3, probe3.ref) + probe3.expectMsg(3) + } + + "Sharding and CoordinatedShutdown" must { + "init cluster" in { + Cluster(sys1).join(Cluster(sys1).selfAddress) // coordinator will initially run on sys2 + awaitAssert(Cluster(sys1).selfMember.status should ===(MemberStatus.Up)) + + Cluster(sys2).join(Cluster(sys1).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(sys1).state.members.size should ===(2) + Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys2).state.members.size should ===(2) + Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + + Cluster(sys3).join(Cluster(sys1).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(sys1).state.members.size should ===(3) + Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys2).state.members.size should ===(3) + Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys3).state.members.size should ===(3) + Cluster(sys3).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + + pingEntities() + } + + "run coordinated shutdown when leaving" in { + Cluster(sys3).leave(Cluster(sys1).selfAddress) + probe1.expectMsg("CS-unbind-1") + + within(10.seconds) { + awaitAssert { + Cluster(sys2).state.members.size should ===(2) + Cluster(sys3).state.members.size should ===(2) + } + } + within(10.seconds) { + awaitAssert { + Cluster(sys1).isTerminated should ===(true) + sys1.whenTerminated.isCompleted should ===(true) + } + } + + pingEntities() + } + + "run coordinated shutdown when downing" in { + Cluster(sys3).down(Cluster(sys2).selfAddress) + probe2.expectMsg("CS-unbind-2") + + within(10.seconds) { + awaitAssert { + Cluster(system).state.members.size should ===(1) + } + } + within(10.seconds) { + awaitAssert { + Cluster(sys2).isTerminated should ===(true) + sys2.whenTerminated.isCompleted should ===(true) + } + } + + pingEntities() + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala index 10550b1c84..0290daaa08 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala @@ -13,6 +13,8 @@ object GetShardTypeNamesSpec { """ akka.loglevel = INFO akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """ val extractEntityId: ShardRegion.ExtractEntityId = { diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 292be74935..0271d491dc 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -5,9 +5,10 @@ package akka.cluster.singleton import com.typesafe.config.Config - import scala.concurrent.duration._ import scala.collection.immutable +import scala.concurrent.Future + import akka.actor.Actor import akka.actor.Deploy import akka.actor.ActorSystem @@ -26,8 +27,8 @@ import akka.AkkaException import akka.actor.NoSerializationVerificationNeeded import akka.cluster.UniqueAddress import akka.cluster.ClusterEvent - import scala.concurrent.Promise + import akka.Done import akka.actor.CoordinatedShutdown import akka.annotation.DoNotInherit @@ -254,8 +255,12 @@ object ClusterSingletonManager { // should preferably complete before stopping the singleton sharding coordinator on same node. val coordShutdown = CoordinatedShutdown(context.system) coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { () ⇒ - implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) - self.ask(SelfExiting).mapTo[Done] + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) { + Future.successful(Done) + } else { + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) + self.ask(SelfExiting).mapTo[Done] + } } } override def postStop(): Unit = cluster.unsubscribe(self) @@ -468,11 +473,19 @@ class ClusterSingletonManager( // for CoordinatedShutdown val coordShutdown = CoordinatedShutdown(context.system) val memberExitingProgress = Promise[Done]() - coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting")(() ⇒ - memberExitingProgress.future) + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting") { () ⇒ + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) + Future.successful(Done) + else + memberExitingProgress.future + } coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { () ⇒ - implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) - self.ask(SelfExiting).mapTo[Done] + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) { + Future.successful(Done) + } else { + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) + self.ask(SelfExiting).mapTo[Done] + } } def logInfo(message: String): Unit = diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8688024bbe..5db9792920 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -176,7 +176,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") { val sys = context.system () ⇒ - if (Cluster(sys).isTerminated) + if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down) Future.successful(Done) else { implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave)) @@ -190,8 +190,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac override def postStop(): Unit = { clusterShutdown.trySuccess(Done) if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) { - // run the last phases e.g. if node was downed (not leaving) - coordShutdown.run(Some(CoordinatedShutdown.PhaseClusterShutdown)) + coordShutdown.run() } } @@ -325,7 +324,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") { val sys = context.system () ⇒ - if (Cluster(sys).isTerminated) + if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down) Future.successful(Done) else { implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone))