Allow member to leave a cluster via CoordinatedShutdown.run when MemberStatus is Joining/WeaklyUp/Up. (#24152)
This commit is contained in:
parent
1dffa344c4
commit
37f0da17b7
3 changed files with 30 additions and 3 deletions
|
|
@ -680,7 +680,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
*/
|
||||
def leaving(address: Address): Unit = {
|
||||
// only try to update if the node is available (in the member ring)
|
||||
if (latestGossip.members.exists(m ⇒ m.address == address && m.status == Up)) {
|
||||
if (latestGossip.members.exists(m ⇒ m.address == address && (m.status == Joining || m.status == WeaklyUp || m.status == Up))) {
|
||||
val newMembers = latestGossip.members map { m ⇒ if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING
|
||||
val newGossip = latestGossip copy (members = newMembers)
|
||||
|
||||
|
|
|
|||
|
|
@ -266,8 +266,8 @@ object MemberStatus {
|
|||
*/
|
||||
private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
|
||||
Map(
|
||||
Joining → Set(WeaklyUp, Up, Down, Removed),
|
||||
WeaklyUp → Set(Up, Down, Removed),
|
||||
Joining → Set(WeaklyUp, Up, Leaving, Down, Removed),
|
||||
WeaklyUp → Set(Up, Leaving, Down, Removed),
|
||||
Up → Set(Leaving, Down, Removed),
|
||||
Leaving → Set(Exiting, Down, Removed),
|
||||
Down → Set(Removed),
|
||||
|
|
|
|||
|
|
@ -171,6 +171,33 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
}
|
||||
}
|
||||
|
||||
"leave via CoordinatedShutdown.run when member status is Joining" in {
|
||||
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
|
||||
akka.actor.provider = "cluster"
|
||||
akka.remote.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.cluster.min-nr-of-members = 2
|
||||
"""))
|
||||
try {
|
||||
val probe = TestProbe()(sys2)
|
||||
Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent])
|
||||
probe.expectMsgType[CurrentClusterState]
|
||||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||
probe.expectMsgType[MemberJoined]
|
||||
|
||||
CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason)
|
||||
probe.expectMsgType[MemberLeft]
|
||||
// MemberExited might not be published before MemberRemoved
|
||||
val removed = probe.fishForMessage() {
|
||||
case _: MemberExited ⇒ false
|
||||
case _: MemberRemoved ⇒ true
|
||||
}.asInstanceOf[MemberRemoved]
|
||||
removed.previousStatus should ===(MemberStatus.Exiting)
|
||||
} finally {
|
||||
shutdown(sys2)
|
||||
}
|
||||
}
|
||||
|
||||
"terminate ActorSystem via leave (CoordinatedShutdown)" in {
|
||||
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
|
||||
akka.actor.provider = "cluster"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue