diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 85b157fd01..180c0ad593 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -1062,6 +1062,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with member.dataCenter == selfDc && member.status == Exiting } + val removedOtherDc = + if (latestGossip.isMultiDc) { + latestGossip.members.filter { m ⇒ + (m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)) + } + } else + Set.empty[Member] + val changedMembers = { val enoughMembers: Boolean = isMinNrOfMembersFulfilled def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers @@ -1091,10 +1099,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } val updatedGossip: Gossip = - if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) { + if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty || + removedOtherDc.nonEmpty) { // replace changed members val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed) + .union(removedOtherDc.map(_.uniqueAddress)) val newGossip = latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis()) @@ -1120,6 +1130,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with removedExitingConfirmed.foreach { n ⇒ logInfo("Leader is removing confirmed Exiting node [{}]", n.address) } + removedOtherDc foreach { m ⇒ + logInfo("Leader is removing {} node [{}] in DC [{}]", m.status, m.address, m.dataCenter) + } newGossip } else diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcLastNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcLastNodeSpec.scala new file mode 100644 index 0000000000..c47d137bfb --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcLastNodeSpec.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.cluster + +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object MultiDcLastNodeSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString( + s""" + #akka.loglevel = DEBUG + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + + nodeConfig(first, second)(ConfigFactory.parseString( + """ + akka.cluster.multi-data-center.self-data-center = "dc1" + """)) + + nodeConfig(third)(ConfigFactory.parseString( + """ + akka.cluster.multi-data-center.self-data-center = "dc2" + """)) + +} + +class MultiDcLastNodeMultiJvmNode1 extends MultiDcLastNodeSpec +class MultiDcLastNodeMultiJvmNode2 extends MultiDcLastNodeSpec +class MultiDcLastNodeMultiJvmNode3 extends MultiDcLastNodeSpec + +abstract class MultiDcLastNodeSpec extends MultiNodeSpec(MultiDcLastNodeSpec) + with MultiNodeClusterSpec { + + import MultiDcLastNodeSpec._ + + "A multi-dc cluster with one remaining node in other DC" must { + "join" in { + + runOn(first) { + cluster.join(first) + } + runOn(second, third) { + cluster.join(first) + } + enterBarrier("join-cluster") + + within(20.seconds) { + awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 3) + } + + enterBarrier("cluster started") + } + + "be able to leave" in { + runOn(third) { + // this works in same way for down + cluster.leave(address(third)) + } + + runOn(first, second) { + awaitAssert(clusterView.members.map(_.address) should not contain address(third)) + } + enterBarrier("cross-data-center-left") + } + } +}