Merge pull request #16792 from akka/wip-16726-down-restarted-patriknw
=clu #16726 Down member automatically when restarted
This commit is contained in:
commit
617cd31046
3 changed files with 39 additions and 39 deletions
|
|
@ -448,31 +448,36 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
// check by address without uid to make sure that node with same host:port is not allowed
|
||||
// to join until previous node with that host:port has been removed from the cluster
|
||||
val alreadyMember = localMembers.exists(_.address == node.address)
|
||||
val isUnreachable = !latestGossip.overview.reachability.isReachable(node)
|
||||
localMembers.find(_.address == node.address) match {
|
||||
case Some(m) if m.uniqueAddress == node ⇒
|
||||
// node retried join attempt, probably due to lost Welcome message
|
||||
logInfo("Existing member [{}] is joining again.", m)
|
||||
if (node != selfUniqueAddress)
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
case Some(m) ⇒
|
||||
// node restarted, same host:port as existing member, but with different uid
|
||||
// safe to down and later remove existing member
|
||||
// new node will retry join
|
||||
logInfo("New incarnation of existing member [{}] is trying to join. " +
|
||||
"Existing will be removed from the cluster and then new member will be allowed to join.", m)
|
||||
if (m.status != Down && m.status != Leaving && m.status != Exiting)
|
||||
downing(m.address)
|
||||
case None ⇒
|
||||
// remove the node from the failure detector
|
||||
failureDetector.remove(node.address)
|
||||
|
||||
if (alreadyMember)
|
||||
logInfo("Existing member [{}] is trying to join, ignoring", node)
|
||||
else if (isUnreachable)
|
||||
logInfo("Unreachable member [{}] is trying to join, ignoring", node)
|
||||
else {
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
|
||||
val newGossip = latestGossip copy (members = newMembers)
|
||||
|
||||
// remove the node from the failure detector
|
||||
failureDetector.remove(node.address)
|
||||
updateLatestGossip(newGossip)
|
||||
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
|
||||
val newGossip = latestGossip copy (members = newMembers)
|
||||
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress)
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
|
||||
updateLatestGossip(newGossip)
|
||||
|
||||
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress) {
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
publish(latestGossip)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -777,8 +782,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
/**
|
||||
* Leader actions are as follows:
|
||||
* 1. Move JOINING => UP -- When a node joins the cluster
|
||||
* 2. Move LEAVING => EXITING -- When all partition handoff has completed
|
||||
* 3. Non-exiting remain -- When all partition handoff has completed
|
||||
* 2. Move LEAVING => EXITING --
|
||||
* 3. Non-exiting remain --
|
||||
* 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) -
|
||||
* remove the node from the node ring and seen table
|
||||
* 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) -
|
||||
|
|
@ -793,11 +798,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
val localOverview = localGossip.overview
|
||||
val localSeen = localOverview.seen
|
||||
|
||||
val hasPartionHandoffCompletedSuccessfully: Boolean = {
|
||||
// TODO implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
|
||||
true
|
||||
}
|
||||
|
||||
def enoughMembers: Boolean = {
|
||||
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
|
||||
case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold
|
||||
|
|
@ -828,9 +828,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
m.copyUp(upNumber)
|
||||
|
||||
case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒
|
||||
// Move LEAVING => EXITING (once we have a convergence on LEAVING
|
||||
// *and* if we have a successful partition handoff)
|
||||
case m if m.status == Leaving ⇒
|
||||
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
|
||||
m copy (status = Exiting)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ private[cluster] object Gossip {
|
|||
if (members.isEmpty) empty else empty.copy(members = members)
|
||||
|
||||
private val leaderMemberStatus = Set[MemberStatus](Up, Leaving)
|
||||
val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) // FIXME private
|
||||
private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving)
|
||||
val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
|
||||
val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,10 @@ object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig {
|
|||
val seed3 = role("seed3")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 0s")).
|
||||
withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.auto-down-unreachable-after = off
|
||||
akka.cluster.retry-unsuccessful-join-after = 3s
|
||||
""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
|
|
@ -100,17 +103,15 @@ abstract class RestartFirstSeedNodeSpec
|
|||
runOn(seed1) {
|
||||
shutdown(seed1System, remainingOrDefault)
|
||||
}
|
||||
runOn(seed2, seed3) {
|
||||
awaitMembersUp(2, canNotBePartOfMemberRing = Set(seedNodes.head))
|
||||
awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (seedNodes.head))
|
||||
}
|
||||
enterBarrier("seed1-shutdown")
|
||||
|
||||
// then start restartedSeed1System, which has the same address as seed1System
|
||||
runOn(seed1) {
|
||||
Cluster(restartedSeed1System).joinSeedNodes(seedNodes)
|
||||
awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(3))
|
||||
awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up)))
|
||||
within(20.seconds) {
|
||||
awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(3))
|
||||
awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up)))
|
||||
}
|
||||
}
|
||||
runOn(seed2, seed3) {
|
||||
awaitMembersUp(3)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue