From 37f6a6581cbe4d1ebfa4a478dd9e25c6d86f8ac6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sat, 7 Feb 2015 14:43:44 +0100 Subject: [PATCH] =clu #16726 Down member automatically when restarted * When new uid is seen in join attempt we can down existing member and thereby new restarted node will be able to join in later retried join attempt without relying on auto-down. --- .../scala/akka/cluster/ClusterDaemon.scala | 61 +++++++++---------- .../src/main/scala/akka/cluster/Gossip.scala | 2 +- .../cluster/RestartFirstSeedNodeSpec.scala | 15 ++--- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7bb523024f..c105013456 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -448,31 +448,36 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // check by address without uid to make sure that node with same host:port is not allowed // to join until previous node with that host:port has been removed from the cluster - val alreadyMember = localMembers.exists(_.address == node.address) - val isUnreachable = !latestGossip.overview.reachability.isReachable(node) + localMembers.find(_.address == node.address) match { + case Some(m) if m.uniqueAddress == node ⇒ + // node retried join attempt, probably due to lost Welcome message + logInfo("Existing member [{}] is joining again.", m) + if (node != selfUniqueAddress) + sender() ! Welcome(selfUniqueAddress, latestGossip) + case Some(m) ⇒ + // node restarted, same host:port as existing member, but with different uid + // safe to down and later remove existing member + // new node will retry join + logInfo("New incarnation of existing member [{}] is trying to join. " + + "Existing will be removed from the cluster and then new member will be allowed to join.", m) + if (m.status != Down && m.status != Leaving && m.status != Exiting) + downing(m.address) + case None ⇒ + // remove the node from the failure detector + failureDetector.remove(node.address) - if (alreadyMember) - logInfo("Existing member [{}] is trying to join, ignoring", node) - else if (isUnreachable) - logInfo("Unreachable member [{}] is trying to join, ignoring", node) - else { + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles) + val newGossip = latestGossip copy (members = newMembers) - // remove the node from the failure detector - failureDetector.remove(node.address) + updateLatestGossip(newGossip) - // add joining node as Joining - // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles) - val newGossip = latestGossip copy (members = newMembers) + logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) + if (node != selfUniqueAddress) + sender() ! Welcome(selfUniqueAddress, latestGossip) - updateLatestGossip(newGossip) - - logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) - if (node != selfUniqueAddress) { - sender() ! Welcome(selfUniqueAddress, latestGossip) - } - - publish(latestGossip) + publish(latestGossip) } } } @@ -776,8 +781,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with /** * Leader actions are as follows: * 1. Move JOINING => UP -- When a node joins the cluster - * 2. Move LEAVING => EXITING -- When all partition handoff has completed - * 3. Non-exiting remain -- When all partition handoff has completed + * 2. Move LEAVING => EXITING -- + * 3. Non-exiting remain -- * 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) - * remove the node from the node ring and seen table * 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) - @@ -792,11 +797,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localOverview = localGossip.overview val localSeen = localOverview.seen - val hasPartionHandoffCompletedSuccessfully: Boolean = { - // TODO implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully - true - } - def enoughMembers: Boolean = { localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold @@ -827,9 +827,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } m.copyUp(upNumber) - case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ - // Move LEAVING => EXITING (once we have a convergence on LEAVING - // *and* if we have a successful partition handoff) + case m if m.status == Leaving ⇒ + // Move LEAVING => EXITING (once we have a convergence on LEAVING) m copy (status = Exiting) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 313fad160f..6ba51854d6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -20,7 +20,7 @@ private[cluster] object Gossip { if (members.isEmpty) empty else empty.copy(members = members) private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) - val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) // FIXME private + private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 0c755b97c7..004e25b95d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -25,7 +25,10 @@ object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed3 = role("seed3") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 0s")). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down-unreachable-after = off + akka.cluster.retry-unsuccessful-join-after = 3s + """)). withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -100,17 +103,15 @@ abstract class RestartFirstSeedNodeSpec runOn(seed1) { shutdown(seed1System, remainingOrDefault) } - runOn(seed2, seed3) { - awaitMembersUp(2, canNotBePartOfMemberRing = Set(seedNodes.head)) - awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (seedNodes.head)) - } enterBarrier("seed1-shutdown") // then start restartedSeed1System, which has the same address as seed1System runOn(seed1) { Cluster(restartedSeed1System).joinSeedNodes(seedNodes) - awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(3)) - awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up))) + within(20.seconds) { + awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(3)) + awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up))) + } } runOn(seed2, seed3) { awaitMembersUp(3)