diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 12944b3d04..a3e4502e25 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -448,31 +448,36 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // check by address without uid to make sure that node with same host:port is not allowed // to join until previous node with that host:port has been removed from the cluster - val alreadyMember = localMembers.exists(_.address == node.address) - val isUnreachable = !latestGossip.overview.reachability.isReachable(node) + localMembers.find(_.address == node.address) match { + case Some(m) if m.uniqueAddress == node ⇒ + // node retried join attempt, probably due to lost Welcome message + logInfo("Existing member [{}] is joining again.", m) + if (node != selfUniqueAddress) + sender() ! Welcome(selfUniqueAddress, latestGossip) + case Some(m) ⇒ + // node restarted, same host:port as existing member, but with different uid + // safe to down and later remove existing member + // new node will retry join + logInfo("New incarnation of existing member [{}] is trying to join. " + + "Existing will be removed from the cluster and then new member will be allowed to join.", m) + if (m.status != Down && m.status != Leaving && m.status != Exiting) + downing(m.address) + case None ⇒ + // remove the node from the failure detector + failureDetector.remove(node.address) - if (alreadyMember) - logInfo("Existing member [{}] is trying to join, ignoring", node) - else if (isUnreachable) - logInfo("Unreachable member [{}] is trying to join, ignoring", node) - else { + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles) + val newGossip = latestGossip copy (members = newMembers) - // remove the node from the failure detector - failureDetector.remove(node.address) + updateLatestGossip(newGossip) - // add joining node as Joining - // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles) - val newGossip = latestGossip copy (members = newMembers) + logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) + if (node != selfUniqueAddress) + sender() ! Welcome(selfUniqueAddress, latestGossip) - updateLatestGossip(newGossip) - - logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) - if (node != selfUniqueAddress) { - sender() ! Welcome(selfUniqueAddress, latestGossip) - } - - publish(latestGossip) + publish(latestGossip) } } } @@ -777,8 +782,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with /** * Leader actions are as follows: * 1. Move JOINING => UP -- When a node joins the cluster - * 2. Move LEAVING => EXITING -- When all partition handoff has completed - * 3. Non-exiting remain -- When all partition handoff has completed + * 2. Move LEAVING => EXITING -- + * 3. Non-exiting remain -- * 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) - * remove the node from the node ring and seen table * 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) - @@ -793,11 +798,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localOverview = localGossip.overview val localSeen = localOverview.seen - val hasPartionHandoffCompletedSuccessfully: Boolean = { - // TODO implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully - true - } - def enoughMembers: Boolean = { localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold @@ -828,9 +828,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } m.copyUp(upNumber) - case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ - // Move LEAVING => EXITING (once we have a convergence on LEAVING - // *and* if we have a successful partition handoff) + case m if m.status == Leaving ⇒ + // Move LEAVING => EXITING (once we have a convergence on LEAVING) m copy (status = Exiting) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 313fad160f..6ba51854d6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -20,7 +20,7 @@ private[cluster] object Gossip { if (members.isEmpty) empty else empty.copy(members = members) private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) - val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) // FIXME private + private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 0c755b97c7..004e25b95d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -25,7 +25,10 @@ object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed3 = role("seed3") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 0s")). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down-unreachable-after = off + akka.cluster.retry-unsuccessful-join-after = 3s + """)). withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -100,17 +103,15 @@ abstract class RestartFirstSeedNodeSpec runOn(seed1) { shutdown(seed1System, remainingOrDefault) } - runOn(seed2, seed3) { - awaitMembersUp(2, canNotBePartOfMemberRing = Set(seedNodes.head)) - awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (seedNodes.head)) - } enterBarrier("seed1-shutdown") // then start restartedSeed1System, which has the same address as seed1System runOn(seed1) { Cluster(restartedSeed1System).joinSeedNodes(seedNodes) - awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(3)) - awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up))) + within(20.seconds) { + awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(3)) + awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up))) + } } runOn(seed2, seed3) { awaitMembersUp(3)