diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 2af51db7b6..28cc646fd6 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -407,7 +407,7 @@ final class CoordinatedShutdown private[akka] ( remainingPhases match { case Nil ⇒ Future.successful(Done) case phase :: remaining ⇒ - val phaseResult = (tasks.get(phase) match { + val phaseResult = tasks.get(phase) match { case null ⇒ if (debugEnabled) log.debug("Performing phase [{}] with [0] tasks", phase) Future.successful(Done) @@ -459,7 +459,7 @@ final class CoordinatedShutdown private[akka] ( result } Future.firstCompletedOf(List(result, timeoutFut)) - }) + } if (remaining.isEmpty) phaseResult // avoid flatMap when system terminated in last phase else diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala index d3905f1935..0f65376039 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -61,7 +61,7 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi Future.successful(Done) } CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ - probe1.ref ! "CS-unbind-3" + probe3.ref ! "CS-unbind-3" Future.successful(Done) } @@ -70,13 +70,14 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi shutdown(sys2) } + // Using region 2 as it is not shutdown in either test def pingEntities(): Unit = { - region3.tell(1, probe3.ref) - probe3.expectMsg(10.seconds, 1) - region3.tell(2, probe3.ref) - probe3.expectMsg(2) - region3.tell(3, probe3.ref) - probe3.expectMsg(3) + region2.tell(1, probe2.ref) + probe2.expectMsg(10.seconds, 1) + region2.tell(2, probe2.ref) + probe2.expectMsg(10.seconds, 2) + region2.tell(3, probe2.ref) + probe2.expectMsg(10.seconds, 3) } "Sharding and CoordinatedShutdown" must { @@ -134,18 +135,19 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi } "run coordinated shutdown when downing" in { - Cluster(sys3).down(Cluster(sys2).selfAddress) - probe2.expectMsg("CS-unbind-2") + // coordinator is on sys2 + Cluster(sys2).down(Cluster(sys3).selfAddress) + probe3.expectMsg("CS-unbind-3") within(10.seconds) { awaitAssert { - Cluster(system).state.members.size should ===(1) + Cluster(sys2).state.members.size should ===(1) } } within(10.seconds) { awaitAssert { - Cluster(sys2).isTerminated should ===(true) - sys2.whenTerminated.isCompleted should ===(true) + Cluster(sys3).isTerminated should ===(true) + sys3.whenTerminated.isCompleted should ===(true) } } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 18ebb899e7..0658b02156 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -303,9 +303,12 @@ object ClusterSingletonManager { } def sendFirstChange(): Unit = { - val event = changes.head - changes = changes.tail - context.parent ! event + // don't send cluster change events if this node is shutting its self down, just wait for SelfExiting + if (!cluster.isTerminated) { + val event = changes.head + changes = changes.tail + context.parent ! event + } } def receive = { @@ -331,7 +334,7 @@ object ClusterSingletonManager { context.unbecome() case MemberUp(m) ⇒ add(m) - deliverChanges + deliverChanges() case MemberRemoved(m, _) ⇒ remove(m) deliverChanges() @@ -357,9 +360,7 @@ object ClusterSingletonManager { case _ ⇒ super.unhandled(msg) } } - } - } } @@ -763,7 +764,7 @@ class ClusterSingletonManager( case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton ⇒ handOverDone(handOverTo) - case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) ⇒ + case Event(HandOverToMe, HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) ⇒ // retry sender() ! HandOverInProgress stay diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 923f64c053..54afe67c67 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.ConfigurationException import akka.actor._ +import akka.annotation.InternalApi import akka.cluster.ClusterSettings.DataCenter import akka.dispatch.MonitorableThreadFactory import akka.event.{ Logging, LoggingAdapter } @@ -409,7 +410,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * Should not called by the user. The user can issue a LEAVE command which will tell the node * to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`. */ - private[cluster] def shutdown(): Unit = { + @InternalApi private[cluster] def shutdown(): Unit = { if (_isTerminated.compareAndSet(false, true)) { logInfo("Shutting down...")