diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b16fe4328e..dd6fc1e34d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -281,7 +281,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with import MembershipState._ val cluster = Cluster(context.system) - import cluster.{ selfAddress, selfRoles, scheduler, failureDetector } + import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector } import cluster.settings._ import cluster.InfoLogger._ @@ -606,6 +606,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case None ⇒ // remove the node from the failure detector failureDetector.remove(joiningNode.address) + crossDcFailureDetector.remove(joiningNode.address) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) @@ -859,7 +860,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // for all new joining nodes we remove them from the failure detector latestGossip.members foreach { - node ⇒ if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address) + node ⇒ + if (node.status == Joining && !localGossip.members(node)) { + failureDetector.remove(node.address) + crossDcFailureDetector.remove(node.address) + } } log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) @@ -1133,15 +1138,20 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localOverview = localGossip.overview val localMembers = localGossip.members + def isAvailable(member: Member): Boolean = { + if (member.dataCenter == SelfDataCenter) failureDetector.isAvailable(member.address) + else crossDcFailureDetector.isAvailable(member.address) + } + val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ member.uniqueAddress == selfUniqueAddress || localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Unreachable || localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Terminated || - failureDetector.isAvailable(member.address) + isAvailable(member) } val newlyDetectedReachableMembers = localOverview.reachability.allUnreachableFrom(selfUniqueAddress) collect { - case node if node != selfUniqueAddress && failureDetector.isAvailable(node.address) ⇒ + case node if node != selfUniqueAddress && isAvailable(localGossip.member(node)) ⇒ localGossip.member(node) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 18ad6853c6..e4e769c79c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -73,8 +73,7 @@ object ClusterEvent { cs.unreachable, cs.seenBy, cs.leader, - cs.roleLeaderMap - )) + cs.roleLeaderMap)) } @@ -117,6 +116,12 @@ object ClusterEvent { def getUnreachable: java.util.Set[Member] = scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava + /** + * Java API: All data centers in the cluster + */ + def getUnreachableDataCenters: java.util.Set[String] = + scala.collection.JavaConverters.setAsJavaSetConverter(unreachableDataCenters).asJava + /** * Java API: get current “seen-by” set. */ @@ -556,8 +561,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto leader = membershipState.leader.map(_.address), roleLeaderMap = membershipState.latestGossip.allRoles.map(r ⇒ r → membershipState.roleLeader(r).map(_.address))(collection.breakOut), - unreachableDataCenters - ) + unreachableDataCenters) receiver ! state } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala index 458d5ab03d..036d997209 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala @@ -3,9 +3,11 @@ */ package akka.cluster +import akka.cluster.ClusterEvent.{ CurrentClusterState, DataCenterReachabilityEvent, ReachableDataCenter, UnreachableDataCenter } import akka.remote.testconductor.RoleName import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ @@ -19,7 +21,12 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString( """ akka.loglevel = INFO - akka.cluster.run-coordinated-shutdown-when-down = off + akka.cluster.multi-data-center { + failure-detector { + acceptable-heartbeat-pause = 4s + heartbeat-interval = 1s + } + } """).withFallback(MultiNodeClusterSpec.clusterConfig)) nodeConfig(first, second)(ConfigFactory.parseString( @@ -48,28 +55,75 @@ abstract class MultiDcSplitBrainSpec val dc1 = List(first, second) val dc2 = List(third, fourth) + var barrierCounter = 0 + + def splitDataCenters(notMembers: Set[RoleName]): Unit = { + val memberNodes = (dc1 ++ dc2).filterNot(notMembers) + val probe = TestProbe() + runOn(memberNodes: _*) { + cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent]) + probe.expectMsgType[CurrentClusterState] + } + enterBarrier(s"split-$barrierCounter") + barrierCounter += 1 - def splitDataCenters(dc1: Seq[RoleName], dc2: Seq[RoleName]): Unit = { runOn(first) { - for { - dc1Node ← dc1 - dc2Node ← dc2 - } { + for (dc1Node ← dc1; dc2Node ← dc2) { testConductor.blackhole(dc1Node, dc2Node, Direction.Both).await } } + + enterBarrier(s"after-split-$barrierCounter") + barrierCounter += 1 + + runOn(memberNodes: _*) { + probe.expectMsgType[UnreachableDataCenter](15.seconds) + cluster.unsubscribe(probe.ref) + runOn(dc1: _*) { + awaitAssert { + cluster.state.unreachableDataCenters should ===(Set("dc2")) + } + } + runOn(dc2: _*) { + awaitAssert { + cluster.state.unreachableDataCenters should ===(Set("dc1")) + } + } + cluster.state.unreachable should ===(Set.empty) + } + enterBarrier(s"after-split-verified-$barrierCounter") + barrierCounter += 1 } - def unsplitDataCenters(dc1: Seq[RoleName], dc2: Seq[RoleName]): Unit = { + def unsplitDataCenters(notMembers: Set[RoleName]): Unit = { + val memberNodes = (dc1 ++ dc2).filterNot(notMembers) + val probe = TestProbe() + runOn(memberNodes: _*) { + cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent]) + probe.expectMsgType[CurrentClusterState] + } + enterBarrier(s"unsplit-$barrierCounter") + barrierCounter += 1 + runOn(first) { - for { - dc1Node ← dc1 - dc2Node ← dc2 - } { + for (dc1Node ← dc1; dc2Node ← dc2) { testConductor.passThrough(dc1Node, dc2Node, Direction.Both).await } } + enterBarrier(s"after-unsplit-$barrierCounter") + barrierCounter += 1 + + runOn(memberNodes: _*) { + probe.expectMsgType[ReachableDataCenter](15.seconds) + cluster.unsubscribe(probe.ref) + awaitAssert { + cluster.state.unreachableDataCenters should ===(Set.empty) + } + } + enterBarrier(s"after-unsplit-verified-$barrierCounter") + barrierCounter += 1 + } "A cluster with multiple data centers" must { @@ -79,8 +133,7 @@ abstract class MultiDcSplitBrainSpec "be able to have a data center member join while there is inter data center split" in within(20.seconds) { // introduce a split between data centers - splitDataCenters(dc1 = List(first, second), dc2 = List(third)) - enterBarrier("data-center-split-1") + splitDataCenters(notMembers = Set(fourth)) runOn(fourth) { cluster.join(third) @@ -96,8 +149,7 @@ abstract class MultiDcSplitBrainSpec } enterBarrier("dc2-join-completed") - unsplitDataCenters(dc1 = List(first, second), dc2 = List(third)) - enterBarrier("data-center-unsplit-1") + unsplitDataCenters(notMembers = Set.empty) runOn(dc1: _*) { awaitAssert(clusterView.members.collect { @@ -109,8 +161,7 @@ abstract class MultiDcSplitBrainSpec } "be able to have data center member leave while there is inter data center split" in within(20.seconds) { - splitDataCenters(dc1, dc2) - enterBarrier("data-center-split-2") + splitDataCenters(notMembers = Set.empty) runOn(fourth) { cluster.leave(fourth) @@ -121,8 +172,7 @@ abstract class MultiDcSplitBrainSpec } enterBarrier("node-4-left") - unsplitDataCenters(dc1, List(third)) - enterBarrier("data-center-unsplit-2") + unsplitDataCenters(notMembers = Set(fourth)) runOn(first, second) { awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))