diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3faef8a0e1..f2c57acb8e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -680,7 +680,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with */ def leaving(address: Address): Unit = { // only try to update if the node is available (in the member ring) - if (latestGossip.members.exists(m ⇒ m.address == address && m.status == Up)) { + if (latestGossip.members.exists(m ⇒ m.address == address && (m.status == Joining || m.status == WeaklyUp || m.status == Up))) { val newMembers = latestGossip.members map { m ⇒ if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING val newGossip = latestGossip copy (members = newMembers) diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index c5237fb9f7..a7cdfb5768 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -266,8 +266,8 @@ object MemberStatus { */ private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] = Map( - Joining → Set(WeaklyUp, Up, Down, Removed), - WeaklyUp → Set(Up, Down, Removed), + Joining → Set(WeaklyUp, Up, Leaving, Down, Removed), + WeaklyUp → Set(Up, Leaving, Down, Removed), Up → Set(Leaving, Down, Removed), Leaving → Set(Exiting, Down, Removed), Down → Set(Removed), diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 40b0bfe40c..78c96c2653 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -171,6 +171,33 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } } + "leave via CoordinatedShutdown.run when member status is Joining" in { + val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.cluster.min-nr-of-members = 2 + """)) + try { + val probe = TestProbe()(sys2) + Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent]) + probe.expectMsgType[CurrentClusterState] + Cluster(sys2).join(Cluster(sys2).selfAddress) + probe.expectMsgType[MemberJoined] + + CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) + probe.expectMsgType[MemberLeft] + // MemberExited might not be published before MemberRemoved + val removed = probe.fishForMessage() { + case _: MemberExited ⇒ false + case _: MemberRemoved ⇒ true + }.asInstanceOf[MemberRemoved] + removed.previousStatus should ===(MemberStatus.Exiting) + } finally { + shutdown(sys2) + } + } + "terminate ActorSystem via leave (CoordinatedShutdown)" in { val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" akka.actor.provider = "cluster"