From 0380cc517a76b5f134f0aa2281350bfc8fc6efb6 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 5 Jan 2018 08:47:43 +0000 Subject: [PATCH] Cluster singleton manager: don't send member events to FSM during shutdown (#24236) There exists a race where a cluter node that is being downed seens its self as the oldest node (as it has had the other nodes removed) and it takes over the singleton manager sending the real oldest node to go into the End state meaning that cluster singletons never work again. This fix simply prevents Member events being given to the Cluster Manager FSM during a shut down, instread relying on SelfExiting. This also hardens the test by not downing the node that the current sharding coordinator is running on as well as fixing a bug in the probes. --- .../akka/actor/CoordinatedShutdown.scala | 4 +-- .../CoordinatedShutdownShardingSpec.scala | 26 ++++++++++--------- .../singleton/ClusterSingletonManager.scala | 15 ++++++----- .../src/main/scala/akka/cluster/Cluster.scala | 3 ++- 4 files changed, 26 insertions(+), 22 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 2af51db7b6..28cc646fd6 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -407,7 +407,7 @@ final class CoordinatedShutdown private[akka] ( remainingPhases match { case Nil ⇒ Future.successful(Done) case phase :: remaining ⇒ - val phaseResult = (tasks.get(phase) match { + val phaseResult = tasks.get(phase) match { case null ⇒ if (debugEnabled) log.debug("Performing phase [{}] with [0] tasks", phase) Future.successful(Done) @@ -459,7 +459,7 @@ final class CoordinatedShutdown private[akka] ( result } Future.firstCompletedOf(List(result, timeoutFut)) - }) + } if (remaining.isEmpty) phaseResult // avoid flatMap when system terminated in last phase else diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala index d3905f1935..0f65376039 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -61,7 +61,7 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi Future.successful(Done) } CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ - probe1.ref ! "CS-unbind-3" + probe3.ref ! "CS-unbind-3" Future.successful(Done) } @@ -70,13 +70,14 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi shutdown(sys2) } + // Using region 2 as it is not shutdown in either test def pingEntities(): Unit = { - region3.tell(1, probe3.ref) - probe3.expectMsg(10.seconds, 1) - region3.tell(2, probe3.ref) - probe3.expectMsg(2) - region3.tell(3, probe3.ref) - probe3.expectMsg(3) + region2.tell(1, probe2.ref) + probe2.expectMsg(10.seconds, 1) + region2.tell(2, probe2.ref) + probe2.expectMsg(10.seconds, 2) + region2.tell(3, probe2.ref) + probe2.expectMsg(10.seconds, 3) } "Sharding and CoordinatedShutdown" must { @@ -134,18 +135,19 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi } "run coordinated shutdown when downing" in { - Cluster(sys3).down(Cluster(sys2).selfAddress) - probe2.expectMsg("CS-unbind-2") + // coordinator is on sys2 + Cluster(sys2).down(Cluster(sys3).selfAddress) + probe3.expectMsg("CS-unbind-3") within(10.seconds) { awaitAssert { - Cluster(system).state.members.size should ===(1) + Cluster(sys2).state.members.size should ===(1) } } within(10.seconds) { awaitAssert { - Cluster(sys2).isTerminated should ===(true) - sys2.whenTerminated.isCompleted should ===(true) + Cluster(sys3).isTerminated should ===(true) + sys3.whenTerminated.isCompleted should ===(true) } } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 18ebb899e7..0658b02156 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -303,9 +303,12 @@ object ClusterSingletonManager { } def sendFirstChange(): Unit = { - val event = changes.head - changes = changes.tail - context.parent ! event + // don't send cluster change events if this node is shutting its self down, just wait for SelfExiting + if (!cluster.isTerminated) { + val event = changes.head + changes = changes.tail + context.parent ! event + } } def receive = { @@ -331,7 +334,7 @@ object ClusterSingletonManager { context.unbecome() case MemberUp(m) ⇒ add(m) - deliverChanges + deliverChanges() case MemberRemoved(m, _) ⇒ remove(m) deliverChanges() @@ -357,9 +360,7 @@ object ClusterSingletonManager { case _ ⇒ super.unhandled(msg) } } - } - } } @@ -763,7 +764,7 @@ class ClusterSingletonManager( case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton ⇒ handOverDone(handOverTo) - case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) ⇒ + case Event(HandOverToMe, HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) ⇒ // retry sender() ! HandOverInProgress stay diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 923f64c053..54afe67c67 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.ConfigurationException import akka.actor._ +import akka.annotation.InternalApi import akka.cluster.ClusterSettings.DataCenter import akka.dispatch.MonitorableThreadFactory import akka.event.{ Logging, LoggingAdapter } @@ -409,7 +410,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * Should not called by the user. The user can issue a LEAVE command which will tell the node * to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`. */ - private[cluster] def shutdown(): Unit = { + @InternalApi private[cluster] def shutdown(): Unit = { if (_isTerminated.compareAndSet(false, true)) { logInfo("Shutting down...")