Connect the dots for cross-dc reachability, #23377

* the crossDcFailureDetector was not connected to the reachability table
* additional test by listen for {Reachable/Unreachable}DataCenter events in split spec
* missing Java API for getUnreachableDataCenters in CurrentClusterState
This commit is contained in:
Patrik Nordwall 2017-08-22 15:02:27 +02:00
parent eefe6474c3
commit e3aada5016
3 changed files with 91 additions and 27 deletions

View file

@ -281,7 +281,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
import MembershipState._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector }
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector }
import cluster.settings._
import cluster.InfoLogger._
@ -606,6 +606,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case None
// remove the node from the failure detector
failureDetector.remove(joiningNode.address)
crossDcFailureDetector.remove(joiningNode.address)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
@ -859,7 +860,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// for all new joining nodes we remove them from the failure detector
latestGossip.members foreach {
node if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address)
node
if (node.status == Joining && !localGossip.members(node)) {
failureDetector.remove(node.address)
crossDcFailureDetector.remove(node.address)
}
}
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
@ -1133,15 +1138,20 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localOverview = localGossip.overview
val localMembers = localGossip.members
def isAvailable(member: Member): Boolean = {
if (member.dataCenter == SelfDataCenter) failureDetector.isAvailable(member.address)
else crossDcFailureDetector.isAvailable(member.address)
}
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.uniqueAddress == selfUniqueAddress ||
localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Unreachable ||
localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Terminated ||
failureDetector.isAvailable(member.address)
isAvailable(member)
}
val newlyDetectedReachableMembers = localOverview.reachability.allUnreachableFrom(selfUniqueAddress) collect {
case node if node != selfUniqueAddress && failureDetector.isAvailable(node.address)
case node if node != selfUniqueAddress && isAvailable(localGossip.member(node))
localGossip.member(node)
}

View file

@ -73,8 +73,7 @@ object ClusterEvent {
cs.unreachable,
cs.seenBy,
cs.leader,
cs.roleLeaderMap
))
cs.roleLeaderMap))
}
@ -117,6 +116,12 @@ object ClusterEvent {
def getUnreachable: java.util.Set[Member] =
scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava
/**
* Java API: All data centers in the cluster
*/
def getUnreachableDataCenters: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(unreachableDataCenters).asJava
/**
* Java API: get current seen-by set.
*/
@ -556,8 +561,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
leader = membershipState.leader.map(_.address),
roleLeaderMap = membershipState.latestGossip.allRoles.map(r
r membershipState.roleLeader(r).map(_.address))(collection.breakOut),
unreachableDataCenters
)
unreachableDataCenters)
receiver ! state
}

View file

@ -3,9 +3,11 @@
*/
package akka.cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, DataCenterReachabilityEvent, ReachableDataCenter, UnreachableDataCenter }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
@ -19,7 +21,12 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(
"""
akka.loglevel = INFO
akka.cluster.run-coordinated-shutdown-when-down = off
akka.cluster.multi-data-center {
failure-detector {
acceptable-heartbeat-pause = 4s
heartbeat-interval = 1s
}
}
""").withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second)(ConfigFactory.parseString(
@ -48,28 +55,75 @@ abstract class MultiDcSplitBrainSpec
val dc1 = List(first, second)
val dc2 = List(third, fourth)
var barrierCounter = 0
def splitDataCenters(notMembers: Set[RoleName]): Unit = {
val memberNodes = (dc1 ++ dc2).filterNot(notMembers)
val probe = TestProbe()
runOn(memberNodes: _*) {
cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent])
probe.expectMsgType[CurrentClusterState]
}
enterBarrier(s"split-$barrierCounter")
barrierCounter += 1
def splitDataCenters(dc1: Seq[RoleName], dc2: Seq[RoleName]): Unit = {
runOn(first) {
for {
dc1Node dc1
dc2Node dc2
} {
for (dc1Node dc1; dc2Node dc2) {
testConductor.blackhole(dc1Node, dc2Node, Direction.Both).await
}
}
enterBarrier(s"after-split-$barrierCounter")
barrierCounter += 1
runOn(memberNodes: _*) {
probe.expectMsgType[UnreachableDataCenter](15.seconds)
cluster.unsubscribe(probe.ref)
runOn(dc1: _*) {
awaitAssert {
cluster.state.unreachableDataCenters should ===(Set("dc2"))
}
}
runOn(dc2: _*) {
awaitAssert {
cluster.state.unreachableDataCenters should ===(Set("dc1"))
}
}
cluster.state.unreachable should ===(Set.empty)
}
enterBarrier(s"after-split-verified-$barrierCounter")
barrierCounter += 1
}
def unsplitDataCenters(dc1: Seq[RoleName], dc2: Seq[RoleName]): Unit = {
def unsplitDataCenters(notMembers: Set[RoleName]): Unit = {
val memberNodes = (dc1 ++ dc2).filterNot(notMembers)
val probe = TestProbe()
runOn(memberNodes: _*) {
cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent])
probe.expectMsgType[CurrentClusterState]
}
enterBarrier(s"unsplit-$barrierCounter")
barrierCounter += 1
runOn(first) {
for {
dc1Node dc1
dc2Node dc2
} {
for (dc1Node dc1; dc2Node dc2) {
testConductor.passThrough(dc1Node, dc2Node, Direction.Both).await
}
}
enterBarrier(s"after-unsplit-$barrierCounter")
barrierCounter += 1
runOn(memberNodes: _*) {
probe.expectMsgType[ReachableDataCenter](15.seconds)
cluster.unsubscribe(probe.ref)
awaitAssert {
cluster.state.unreachableDataCenters should ===(Set.empty)
}
}
enterBarrier(s"after-unsplit-verified-$barrierCounter")
barrierCounter += 1
}
"A cluster with multiple data centers" must {
@ -79,8 +133,7 @@ abstract class MultiDcSplitBrainSpec
"be able to have a data center member join while there is inter data center split" in within(20.seconds) {
// introduce a split between data centers
splitDataCenters(dc1 = List(first, second), dc2 = List(third))
enterBarrier("data-center-split-1")
splitDataCenters(notMembers = Set(fourth))
runOn(fourth) {
cluster.join(third)
@ -96,8 +149,7 @@ abstract class MultiDcSplitBrainSpec
}
enterBarrier("dc2-join-completed")
unsplitDataCenters(dc1 = List(first, second), dc2 = List(third))
enterBarrier("data-center-unsplit-1")
unsplitDataCenters(notMembers = Set.empty)
runOn(dc1: _*) {
awaitAssert(clusterView.members.collect {
@ -109,8 +161,7 @@ abstract class MultiDcSplitBrainSpec
}
"be able to have data center member leave while there is inter data center split" in within(20.seconds) {
splitDataCenters(dc1, dc2)
enterBarrier("data-center-split-2")
splitDataCenters(notMembers = Set.empty)
runOn(fourth) {
cluster.leave(fourth)
@ -121,8 +172,7 @@ abstract class MultiDcSplitBrainSpec
}
enterBarrier("node-4-left")
unsplitDataCenters(dc1, List(third))
enterBarrier("data-center-unsplit-2")
unsplitDataCenters(notMembers = Set(fourth))
runOn(first, second) {
awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))