From ddada9a8e120be5dc026380935fe0ef09cdf99d7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Feb 2019 15:05:33 +0100 Subject: [PATCH] Stop singleton and shards when self MemberDowned, #26336 (#26339) * Stop singleton when self MemberDowned, #26336 * It's safer to stop singleton instance early in case of downing. * Instead of waiting for MemberRemoved and trying to hand over. * Stop ShardRegion when self MemberDowned, #26336 * Upper bound when waiting for seen in shutdownSelfWhenDown, #26336 --- .../akka/cluster/sharding/ShardRegion.scala | 6 + .../singleton/ClusterSingletonManager.scala | 32 ++++- .../ClusterSingletonManagerDownedSpec.scala | 123 ++++++++++++++++++ .../scala/akka/cluster/ClusterDaemon.scala | 6 +- 4 files changed, 165 insertions(+), 2 deletions(-) create mode 100644 akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerDownedSpec.scala diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 30fd16695d..4989b789ed 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -516,6 +516,12 @@ private[akka] class ShardRegion( else if (matchingRole(m)) changeMembers(membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)) + case MemberDowned(m) ⇒ + if (m.uniqueAddress == cluster.selfUniqueAddress) { + log.info("Self downed, stopping ShardRegion [{}]", self.path) + context.stop(self) + } + case _: MemberEvent ⇒ // these are expected, no need to warn about them case _ ⇒ unhandled(evt) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index b03949b203..c52000d55e 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -511,7 +511,7 @@ class ClusterSingletonManager( require(!cluster.isTerminated, "Cluster node must not be terminated") // subscribe to cluster changes, re-subscribe when restart - cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) + cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved], classOf[MemberDowned]) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) @@ -573,6 +573,10 @@ class ClusterSingletonManager( stay using YoungerData(oldestOption) } + case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -612,6 +616,10 @@ class ClusterSingletonManager( stay } + case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -722,6 +730,15 @@ class ClusterSingletonManager( // complete memberExitingProgress when handOverDone sender() ! Done // reply to ask stay + + case Event(MemberDowned(m), OldestData(singleton, singletonTerminated)) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ + if (singletonTerminated) { + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + } else { + logInfo("Self downed, stopping") + gotoStopping(singleton) + } } when(WasOldest) { @@ -761,6 +778,15 @@ class ClusterSingletonManager( sender() ! Done // reply to ask stay + case Event(MemberDowned(m), OldestData(singleton, singletonTerminated)) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ + if (singletonTerminated) { + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + } else { + logInfo("Self downed, stopping") + gotoStopping(singleton) + } + } def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = { @@ -853,6 +879,10 @@ class ClusterSingletonManager( case Event(Cleanup, _) ⇒ cleanupOverdueNotMemberAnyMore() stay + case Event(MemberDowned(m), _) ⇒ + if (m.uniqueAddress == cluster.selfUniqueAddress) + logInfo("Self downed, waiting for removal") + stay } onTransition { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerDownedSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerDownedSpec.scala new file mode 100644 index 0000000000..63ff82c8b2 --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerDownedSpec.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.singleton + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ClusterSingletonManagerDownedSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + """)) + + testTransport(on = true) + + case object EchoStarted + case object EchoStopped + /** + * The singleton actor + */ + class Echo(testActor: ActorRef) extends Actor { + testActor ! EchoStarted + + override def postStop(): Unit = { + testActor ! EchoStopped + } + + def receive = { + case _ ⇒ sender() ! self + } + } +} + +class ClusterSingletonManagerDownedMultiJvmNode1 extends ClusterSingletonManagerDownedSpec +class ClusterSingletonManagerDownedMultiJvmNode2 extends ClusterSingletonManagerDownedSpec +class ClusterSingletonManagerDownedMultiJvmNode3 extends ClusterSingletonManagerDownedSpec + +class ClusterSingletonManagerDownedSpec extends MultiNodeSpec(ClusterSingletonManagerDownedSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterSingletonManagerDownedSpec._ + + override def initialParticipants = roles.size + + private val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster.join(node(to).address) + createSingleton() + } + } + + def createSingleton(): ActorRef = { + system.actorOf( + ClusterSingletonManager.props( + singletonProps = Props(classOf[Echo], testActor), + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(system)), + name = "echo") + } + + "A ClusterSingletonManager downing" must { + + "startup 3 node" in { + join(first, first) + join(second, first) + join(third, first) + within(15.seconds) { + awaitAssert { + cluster.state.members.size should ===(3) + cluster.state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + runOn(first) { + expectMsg(EchoStarted) + } + enterBarrier("started") + } + + "stop instance when member is downed" in { + runOn(first) { + testConductor.blackhole(first, third, ThrottlerTransportAdapter.Direction.Both).await + testConductor.blackhole(second, third, ThrottlerTransportAdapter.Direction.Both).await + + within(15.seconds) { + awaitAssert { + cluster.state.unreachable.size should ===(1) + } + } + } + enterBarrier("blackhole-1") + runOn(first) { + // another blackhole so that second can't mark gossip as seen and thereby deferring shutdown of first + testConductor.blackhole(first, second, ThrottlerTransportAdapter.Direction.Both).await + cluster.down(node(second).address) + cluster.down(cluster.selfAddress) + // singleton instance stopped, before failure detection of first-second + expectMsg(3.seconds, EchoStopped) + } + + enterBarrier("stopped") + } + } +} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ec1fa3b133..9e78fe8952 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -286,6 +286,7 @@ private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: Join private[cluster] object ClusterCoreDaemon { val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5 val MaxGossipsBeforeShuttingDownMyself = 5 + val MaxTicksBeforeShuttingDownMyself = 4 } @@ -333,6 +334,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh var seedNodeProcessCounter = 0 // for unique names var joinSeedNodesDeadline: Option[Deadline] = None var leaderActionCounter = 0 + var selfDownCounter = 0 var exitingTasksInProgress = false val selfExiting = Promise[Done]() @@ -1112,7 +1114,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // status Down. The down commands should spread before we shutdown. val unreachable = membershipState.dcReachability.allUnreachableOrTerminated val downed = membershipState.dcMembers.collect { case m if m.status == Down ⇒ m.uniqueAddress } - if (downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) { + if (selfDownCounter >= MaxTicksBeforeShuttingDownMyself || downed.forall(node ⇒ unreachable(node) || latestGossip.seenByNode(node))) { // the reason for not shutting down immediately is to give the gossip a chance to spread // the downing information to other downed nodes, so that they can shutdown themselves logInfo("Shutting down myself") @@ -1120,6 +1122,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // if other downed know that this node has seen the version gossipRandomN(MaxGossipsBeforeShuttingDownMyself) shutdown() + } else { + selfDownCounter += 1 } } }