Remove Exiting/Down node from other DC, #24171
* When leaving/downing the last node in a DC it would not be removed in another DC, since that was only done by the leader in the owning DC (and that is gone). * It should be ok to eagerly remove such nodes also by leaders in other DCs. * Note that gossip is already sent out so for the last node that will be spread to other DC, unless there is a network partition. For that we can't do anything. It will be replaced if joining again.
This commit is contained in:
parent
fb72274b71
commit
2733a26540
2 changed files with 86 additions and 1 deletions
|
|
@ -1062,6 +1062,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
member.dataCenter == selfDc && member.status == Exiting
|
member.dataCenter == selfDc && member.status == Exiting
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val removedOtherDc =
|
||||||
|
if (latestGossip.isMultiDc) {
|
||||||
|
latestGossip.members.filter { m ⇒
|
||||||
|
(m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status))
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
Set.empty[Member]
|
||||||
|
|
||||||
val changedMembers = {
|
val changedMembers = {
|
||||||
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
|
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
|
||||||
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
|
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
|
||||||
|
|
@ -1091,10 +1099,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
}
|
}
|
||||||
|
|
||||||
val updatedGossip: Gossip =
|
val updatedGossip: Gossip =
|
||||||
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
|
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty ||
|
||||||
|
removedOtherDc.nonEmpty) {
|
||||||
|
|
||||||
// replace changed members
|
// replace changed members
|
||||||
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
|
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
|
||||||
|
.union(removedOtherDc.map(_.uniqueAddress))
|
||||||
val newGossip =
|
val newGossip =
|
||||||
latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())
|
latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())
|
||||||
|
|
||||||
|
|
@ -1120,6 +1130,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
removedExitingConfirmed.foreach { n ⇒
|
removedExitingConfirmed.foreach { n ⇒
|
||||||
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
|
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
|
||||||
}
|
}
|
||||||
|
removedOtherDc foreach { m ⇒
|
||||||
|
logInfo("Leader is removing {} node [{}] in DC [{}]", m.status, m.address, m.dataCenter)
|
||||||
|
}
|
||||||
|
|
||||||
newGossip
|
newGossip
|
||||||
} else
|
} else
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
object MultiDcLastNodeSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
val third = role("third")
|
||||||
|
|
||||||
|
commonConfig(ConfigFactory.parseString(
|
||||||
|
s"""
|
||||||
|
#akka.loglevel = DEBUG
|
||||||
|
""").withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
||||||
|
nodeConfig(first, second)(ConfigFactory.parseString(
|
||||||
|
"""
|
||||||
|
akka.cluster.multi-data-center.self-data-center = "dc1"
|
||||||
|
"""))
|
||||||
|
|
||||||
|
nodeConfig(third)(ConfigFactory.parseString(
|
||||||
|
"""
|
||||||
|
akka.cluster.multi-data-center.self-data-center = "dc2"
|
||||||
|
"""))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class MultiDcLastNodeMultiJvmNode1 extends MultiDcLastNodeSpec
|
||||||
|
class MultiDcLastNodeMultiJvmNode2 extends MultiDcLastNodeSpec
|
||||||
|
class MultiDcLastNodeMultiJvmNode3 extends MultiDcLastNodeSpec
|
||||||
|
|
||||||
|
abstract class MultiDcLastNodeSpec extends MultiNodeSpec(MultiDcLastNodeSpec)
|
||||||
|
with MultiNodeClusterSpec {
|
||||||
|
|
||||||
|
import MultiDcLastNodeSpec._
|
||||||
|
|
||||||
|
"A multi-dc cluster with one remaining node in other DC" must {
|
||||||
|
"join" in {
|
||||||
|
|
||||||
|
runOn(first) {
|
||||||
|
cluster.join(first)
|
||||||
|
}
|
||||||
|
runOn(second, third) {
|
||||||
|
cluster.join(first)
|
||||||
|
}
|
||||||
|
enterBarrier("join-cluster")
|
||||||
|
|
||||||
|
within(20.seconds) {
|
||||||
|
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("cluster started")
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to leave" in {
|
||||||
|
runOn(third) {
|
||||||
|
// this works in same way for down
|
||||||
|
cluster.leave(address(third))
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(first, second) {
|
||||||
|
awaitAssert(clusterView.members.map(_.address) should not contain address(third))
|
||||||
|
}
|
||||||
|
enterBarrier("cross-data-center-left")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue