limit cross dc gossip #23282

This commit is contained in:
Johan Andrén 2017-07-07 13:19:10 +01:00 committed by GitHub
parent b568975acc
commit c0d439eac3
14 changed files with 549 additions and 174 deletions

View file

@ -211,6 +211,15 @@ akka {
# if your cluster nodes are configured with at-least 2 different `akka.cluster.data-center` values.
multi-data-center {
# Try to limit the number of connections between data centers. Used for gossip and heartbeating.
# This will not limit connections created for the messaging of the application.
# If the cluster does not span multiple data centers, this value has no effect.
cross-data-center-connections = 5
# The n oldest nodes in a data center will choose to gossip to another data center with
# this probability. Must be a value between 0.0 and 1.0 where 0.0 means never, 1.0 means always.
cross-data-center-gossip-probability = 0.2
failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
@ -232,11 +241,6 @@ akka {
# will start after this period, even though no heartbeat message has
# been received.
expected-response-after = 1 s
# Maximum number of oldest members in a data center that will monitor other (oldest nodes in other) data centers.
# This is done to lessen the cross data center communication, as only those top-n-oldest nodes
# need to maintain connections to the other data-centers.
nr-of-monitoring-members = 5
}
}

View file

@ -108,7 +108,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val crossDcFailureDetector: FailureDetectorRegistry[Address] = {
val createFailureDetector = ()
FailureDetectorLoader.load(settings.CrossDcFailureDetectorSettings.ImplementationClass, settings.CrossDcFailureDetectorSettings.config, system)
FailureDetectorLoader.load(
settings.MultiDataCenter.CrossDcFailureDetectorSettings.ImplementationClass,
settings.MultiDataCenter.CrossDcFailureDetectorSettings.config, system)
new DefaultFailureDetectorRegistry(createFailureDetector)
}

View file

@ -4,7 +4,7 @@
package akka.cluster
import language.existentials
import scala.collection.immutable
import scala.collection.{ SortedSet, breakOut, immutable, mutable }
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
@ -15,8 +15,6 @@ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import scala.collection.breakOut
import akka.remote.QuarantinedEvent
import java.util.ArrayList
import java.util.Collections
@ -25,9 +23,11 @@ import akka.pattern.ask
import akka.util.Timeout
import akka.Done
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Random
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -301,10 +301,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
protected def selfUniqueAddress = cluster.selfUniqueAddress
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
val gossipTargetSelector = new GossipTargetSelector(
ReduceGossipDifferentViewProbability,
cluster.settings.MultiDataCenter.CrossDcGossipProbability)
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var membershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter)
var membershipState = MembershipState(
Gossip.empty,
cluster.selfUniqueAddress,
cluster.settings.DataCenter,
cluster.settings.MultiDataCenter.CrossDcConnections)
def latestGossip: Gossip = membershipState.latestGossip
val statsEnabled = PublishStatsInterval.isFinite
@ -925,86 +933,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
*/
def gossipRandomN(n: Int): Unit = {
if (!isSingletonCluster && n > 0) {
val localGossip = latestGossip
// using ArrayList to be able to shuffle
val possibleTargets = new ArrayList[UniqueAddress](localGossip.members.size)
localGossip.members.foreach { m
if (validNodeForGossip(m.uniqueAddress))
possibleTargets.add(m.uniqueAddress)
}
val randomTargets =
if (possibleTargets.size <= n)
possibleTargets
else {
Collections.shuffle(possibleTargets, ThreadLocalRandom.current())
possibleTargets.subList(0, n)
}
val iter = randomTargets.iterator
while (iter.hasNext)
gossipTo(iter.next())
gossipTargetSelector.randomNodesForFullGossip(membershipState, n).foreach(gossipTo)
}
}
/**
* Initiates a new round of gossip.
*/
def gossip(): Unit = {
def gossip(): Unit =
if (!isSingletonCluster) {
val localGossip = latestGossip
val preferredGossipTargets: Vector[UniqueAddress] =
if (ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability) {
// If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older gossip version
localGossip.members.collect {
case m if !localGossip.seenByNode(m.uniqueAddress) && validNodeForGossip(m.uniqueAddress)
m.uniqueAddress
}(breakOut)
} else Vector.empty
if (preferredGossipTargets.nonEmpty) {
val peer = selectRandomNode(preferredGossipTargets)
// send full gossip because it has different view
peer foreach gossipTo
} else {
// Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect {
case m if validNodeForGossip(m.uniqueAddress) m.uniqueAddress
})
peer foreach { node
if (localGossip.seenByNode(node)) gossipStatusTo(node)
else gossipTo(node)
}
}
}
}
/**
* For large clusters we should avoid shooting down individual
* nodes. Therefore the probability is reduced for large clusters.
*/
def adjustedGossipDifferentViewProbability: Double = {
val size = latestGossip.members.size
val low = ReduceGossipDifferentViewProbability
val high = low * 3
// start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability
if (size <= low)
GossipDifferentViewProbability
else {
// don't go lower than 1/10 of the configured GossipDifferentViewProbability
val minP = GossipDifferentViewProbability / 10
if (size >= high)
minP
else {
// linear reduction of the probability with increasing number of nodes
// from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes
// to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes
// i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes
val k = (minP - GossipDifferentViewProbability) / (high - low)
GossipDifferentViewProbability + (size - low) * k
}
gossipTargetSelector.gossipTarget(membershipState) match {
case Some(peer)
if (!membershipState.isInSameDc(peer) || latestGossip.seenByNode(peer))
// avoid transferring the full state if possible
gossipStatusTo(peer)
else
gossipTo(peer)
case None // nothing to see here
}
}
@ -1244,10 +1189,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] =
if (nodes.isEmpty) None
else Some(nodes(ThreadLocalRandom.current nextInt nodes.size))
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
// needed for tests
@ -1261,24 +1202,21 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Gossips latest gossip to a node.
*/
def gossipTo(node: UniqueAddress): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
clusterCore(node.address) ! GossipEnvelope(selfUniqueAddress, node, latestGossip)
def gossipTo(node: UniqueAddress, destination: ActorRef): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
destination ! GossipEnvelope(selfUniqueAddress, node, latestGossip)
def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
destination ! GossipStatus(selfUniqueAddress, latestGossip.version)
def gossipStatusTo(node: UniqueAddress): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
node != selfUniqueAddress && membershipState.isReachableExcludingDownedObservers(node)
def updateLatestGossip(gossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = gossip :+ vclockNode

View file

@ -378,7 +378,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
val cluster = Cluster(context.system)
val selfUniqueAddress = cluster.selfUniqueAddress
val emptyMembershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter)
val emptyMembershipState = MembershipState(
Gossip.empty,
cluster.selfUniqueAddress,
cluster.settings.DataCenter,
cluster.settings.MultiDataCenter.CrossDcConnections)
var membershipState: MembershipState = emptyMembershipState
def selfDc = cluster.settings.DataCenter

View file

@ -10,7 +10,7 @@ import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
@ -34,7 +34,6 @@ object ClusterSettings {
}
final class ClusterSettings(val config: Config, val systemName: String) {
import ClusterSettings._
import ClusterSettings._
private val cc = config.getConfig("akka.cluster")
@ -51,6 +50,28 @@ final class ClusterSettings(val config: Config, val systemName: String) {
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")
final class CrossDcFailureDetectorSettings(val config: Config) {
val ImplementationClass: String = config.getString("implementation-class")
val HeartbeatInterval: FiniteDuration = {
config.getMillisDuration("heartbeat-interval")
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
val HeartbeatExpectedResponseAfter: FiniteDuration = {
config.getMillisDuration("expected-response-after")
} requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0")
def NrOfMonitoringActors: Int = MultiDataCenter.CrossDcConnections
}
object MultiDataCenter {
val CrossDcConnections: Int = cc.getInt("multi-data-center.cross-data-center-connections")
.requiring(_ > 0, "cross-data-center-connections must be > 0")
val CrossDcGossipProbability: Double = cc.getDouble("multi-data-center.cross-data-center-gossip-probability")
.requiring(d d >= 0.0D && d <= 1.0D, "cross-data-center-gossip-probability must be >= 0.0 and <= 1.0")
val CrossDcFailureDetectorSettings: CrossDcFailureDetectorSettings =
new CrossDcFailureDetectorSettings(cc.getConfig("multi-data-center.failure-detector"))
}
val SeedNodes: immutable.IndexedSeq[Address] =
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) addr }.toVector
val SeedNodeTimeout: FiniteDuration = cc.getMillisDuration("seed-node-timeout")
@ -114,24 +135,10 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val QuarantineRemovedNodeAfter: FiniteDuration =
cc.getMillisDuration("quarantine-removed-node-after") requiring (_ > Duration.Zero, "quarantine-removed-node-after must be > 0")
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
val AllowWeaklyUpMembers: Boolean = cc.getBoolean("allow-weakly-up-members")
val DataCenter: DataCenter = cc.getString("data-center")
final class CrossDcFailureDetectorSettings(val config: Config) {
val ImplementationClass: String = config.getString("implementation-class")
val HeartbeatInterval: FiniteDuration = {
config.getMillisDuration("heartbeat-interval")
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
val HeartbeatExpectedResponseAfter: FiniteDuration = {
config.getMillisDuration("expected-response-after")
} requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0")
val NrOfMonitoringActors: Int = {
config.getInt("nr-of-monitoring-members")
} requiring (_ > 0, "failure-detector.nr-of-monitoring-members must be > 0")
}
val CrossDcFailureDetectorSettings = new CrossDcFailureDetectorSettings(cc.getConfig("multi-data-center.failure-detector"))
val Roles: Set[String] = {
val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring (
_.forall(!_.startsWith(DcRolePrefix)),
@ -162,8 +169,8 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
object Debug {
val VerboseHeartbeatLogging = cc.getBoolean("debug.verbose-heartbeat-logging")
val VerboseGossipLogging = cc.getBoolean("debug.verbose-gossip-logging")
val VerboseHeartbeatLogging: Boolean = cc.getBoolean("debug.verbose-heartbeat-logging")
val VerboseGossipLogging: Boolean = cc.getBoolean("debug.verbose-gossip-logging")
}
}

View file

@ -47,7 +47,9 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
val isExternalClusterMember: Member Boolean =
member member.dataCenter != cluster.selfDataCenter
val crossDcSettings: cluster.settings.CrossDcFailureDetectorSettings = cluster.settings.CrossDcFailureDetectorSettings
val crossDcSettings: cluster.settings.CrossDcFailureDetectorSettings =
cluster.settings.MultiDataCenter.CrossDcFailureDetectorSettings
val crossDcFailureDetector = cluster.crossDcFailureDetector
val selfHeartbeat = ClusterHeartbeatSender.Heartbeat(selfAddress)
@ -299,7 +301,7 @@ private[cluster] object CrossDcHeartbeatingState {
crossDcFailureDetector,
nrOfMonitoredNodesPerDc,
state = {
// TODO unduplicate this with other places where we do this
// TODO unduplicate this with the logic in MembershipState.ageSortedTopOldestMembersPerDc
val groupedByDc = members.groupBy(_.dataCenter)
if (members.ordering == Member.ageOrdering) {

View file

@ -87,6 +87,13 @@ private[cluster] final case class Gossip(
@transient private lazy val membersMap: Map[UniqueAddress, Member] =
members.map(m m.uniqueAddress m)(collection.breakOut)
@transient lazy val isMultiDc =
if (members.size <= 1) false
else {
val dc1 = members.head.dataCenter
members.exists(_.dataCenter != dc1)
}
/**
* Increments the version for this 'Node'.
*/

View file

@ -3,12 +3,19 @@
*/
package akka.cluster
import java.util.{ ArrayList, Collections }
import java.util.concurrent.ThreadLocalRandom
import scala.collection.immutable
import scala.collection.SortedSet
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.MemberStatus._
import akka.annotation.InternalApi
import scala.annotation.tailrec
import scala.collection.breakOut
import scala.util.Random
/**
* INTERNAL API
*/
@ -23,9 +30,16 @@ import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class MembershipState(latestGossip: Gossip, selfUniqueAddress: UniqueAddress, selfDc: DataCenter) {
@InternalApi private[akka] final case class MembershipState(
latestGossip: Gossip,
selfUniqueAddress: UniqueAddress,
selfDc: DataCenter,
crossDcConnections: Int) {
import MembershipState._
lazy val selfMember = latestGossip.member(selfUniqueAddress)
def members: immutable.SortedSet[Member] = latestGossip.members
def overview: GossipOverview = latestGossip.overview
@ -76,6 +90,20 @@ import akka.annotation.InternalApi
overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != selfDc m.uniqueAddress })
}
/**
* @return Up to `crossDcConnections` oldest members for each DC
*/
lazy val ageSortedTopOldestMembersPerDc: Map[DataCenter, SortedSet[Member]] =
// TODO make this recursive and bail early when size reached to make it fast for large clusters
latestGossip.members.foldLeft(Map.empty[DataCenter, SortedSet[Member]]) { (acc, member)
acc.get(member.dataCenter) match {
case Some(set)
if (set.size < crossDcConnections) acc + (member.dataCenter (set + member))
else acc
case None acc + (member.dataCenter (SortedSet.empty(Member.ageOrdering) + member))
}
}
/**
* @return true if toAddress should be reachable from the fromDc in general, within a data center
* this means only caring about data center local observations, across data centers it
@ -119,4 +147,173 @@ import akka.annotation.InternalApi
.map(_.uniqueAddress)
}
def isInSameDc(node: UniqueAddress): Boolean =
node == selfUniqueAddress || latestGossip.member(node).dataCenter == selfDc
def validNodeForGossip(node: UniqueAddress): Boolean =
node != selfUniqueAddress &&
((isInSameDc(node) && isReachableExcludingDownedObservers(node)) ||
// if cross DC we need to check pairwise unreachable observation
overview.reachability.isReachable(selfUniqueAddress, node))
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class GossipTargetSelector(
reduceGossipDifferentViewProbability: Double,
crossDcGossipProbability: Double) {
final def gossipTarget(state: MembershipState): Option[UniqueAddress] = {
selectRandomNode(gossipTargets(state))
}
final def gossipTargets(state: MembershipState): Vector[UniqueAddress] =
if (state.latestGossip.isMultiDc) multiDcGossipTargets(state)
else localDcGossipTargets(state)
/**
* Select `n` random nodes to gossip to (used to quickly inform the rest of the cluster when leaving for example)
*/
def randomNodesForFullGossip(state: MembershipState, n: Int): Vector[UniqueAddress] =
if (state.latestGossip.isMultiDc && state.ageSortedTopOldestMembersPerDc(state.selfDc).contains(state.selfMember)) {
// this node is one of the N oldest in the cluster, gossip to one cross-dc but mostly locally
val randomLocalNodes = Random.shuffle(state.members.toVector.collect {
case m if m.dataCenter == state.selfDc && state.validNodeForGossip(m.uniqueAddress) m.uniqueAddress
})
@tailrec
def selectOtherDcNode(randomizedDcs: List[DataCenter]): Option[UniqueAddress] =
randomizedDcs match {
case Nil None // couldn't find a single cross-dc-node to talk to
case dc :: tail
state.ageSortedTopOldestMembersPerDc(dc).collectFirst {
case m if state.validNodeForGossip(m.uniqueAddress) m.uniqueAddress
} match {
case Some(addr) Some(addr)
case None selectOtherDcNode(tail)
}
}
val otherDcs = Random.shuffle((state.ageSortedTopOldestMembersPerDc.keySet - state.selfDc).toList)
selectOtherDcNode(otherDcs) match {
case Some(node) randomLocalNodes.take(n - 1) :+ node
case None randomLocalNodes.take(n)
}
} else {
// single dc or not among the N oldest - select local nodes
val selectedNodes = state.members.toVector.collect {
case m if m.dataCenter == state.selfDc && state.validNodeForGossip(m.uniqueAddress) m.uniqueAddress
}
if (selectedNodes.size <= n) selectedNodes
else Random.shuffle(selectedNodes).take(n)
}
/**
* Chooses a set of possible gossip targets that is in the same dc. If the cluster is not multi dc this
* means it is a choice among all nodes of the cluster.
*/
protected def localDcGossipTargets(state: MembershipState): Vector[UniqueAddress] = {
val latestGossip = state.latestGossip
val firstSelection: Vector[UniqueAddress] =
if (preferNodesWithDifferentView(state)) {
// If it's time to try to gossip to some nodes with a different view
// gossip to a random alive same dc member with preference to a member with older gossip version
latestGossip.members.collect {
case m if m.dataCenter == state.selfDc && !latestGossip.seenByNode(m.uniqueAddress) && state.validNodeForGossip(m.uniqueAddress)
m.uniqueAddress
}(breakOut)
} else Vector.empty
// Fall back to localGossip
if (firstSelection.isEmpty) {
latestGossip.members.toVector.collect {
case m if m.dataCenter == state.selfDc && state.validNodeForGossip(m.uniqueAddress) m.uniqueAddress
}
} else firstSelection
}
/**
* Choose cross-dc nodes if this one of the N oldest nodes, and if not fall back to gosip locally in the dc
*/
protected def multiDcGossipTargets(state: MembershipState): Vector[UniqueAddress] = {
val latestGossip = state.latestGossip
// only a fraction of the time across data centers
if (selectDcLocalNodes()) localDcGossipTargets(state)
else {
val nodesPerDc = state.ageSortedTopOldestMembersPerDc
// only do cross DC gossip if this node is among the N oldest
if (!nodesPerDc(state.selfDc).contains(state.selfMember)) localDcGossipTargets(state)
else {
@tailrec
def findFirstDcWithValidNodes(left: List[DataCenter]): Vector[UniqueAddress] =
left match {
case dc :: tail
val validNodes = nodesPerDc(dc).collect {
case member if state.validNodeForGossip(member.uniqueAddress)
member.uniqueAddress
}
if (validNodes.nonEmpty) validNodes.toVector
else findFirstDcWithValidNodes(tail) // no valid nodes in dc, try next
case Nil
Vector.empty
}
// chose another DC at random
val otherDcsInRandomOrder = dcsInRandomOrder((nodesPerDc - state.selfDc).keys.toList)
val nodes = findFirstDcWithValidNodes(otherDcsInRandomOrder)
if (nodes.nonEmpty) nodes
// no other dc with reachable nodes, fall back to local gossip
else localDcGossipTargets(state)
}
}
}
/**
* For large clusters we should avoid shooting down individual
* nodes. Therefore the probability is reduced for large clusters.
*/
protected def adjustedGossipDifferentViewProbability(clusterSize: Int): Double = {
val low = reduceGossipDifferentViewProbability
val high = low * 3
// start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability
if (clusterSize <= low)
reduceGossipDifferentViewProbability
else {
// don't go lower than 1/10 of the configured GossipDifferentViewProbability
val minP = reduceGossipDifferentViewProbability / 10
if (clusterSize >= high)
minP
else {
// linear reduction of the probability with increasing number of nodes
// from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes
// to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes
// i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes
val k = (minP - reduceGossipDifferentViewProbability) / (high - low)
reduceGossipDifferentViewProbability + (clusterSize - low) * k
}
}
}
protected def selectDcLocalNodes(): Boolean = ThreadLocalRandom.current.nextDouble() > crossDcGossipProbability
protected def preferNodesWithDifferentView(state: MembershipState): Boolean =
ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size)
protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] =
Random.shuffle(dcs)
protected def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] =
if (nodes.isEmpty) None
else Some(nodes(ThreadLocalRandom.current.nextInt(nodes.size)))
}

View file

@ -10,41 +10,53 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object MultiDcMultiJvmSpec extends MultiNodeConfig {
class MultiDcSpecConfig(crossDcConnections: Int = 5) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(MultiNodeClusterSpec.clusterConfig)
commonConfig(ConfigFactory.parseString(
s"""
akka.loglevel = INFO
akka.cluster.multi-data-center.cross-data-center-connections = $crossDcConnections
""").withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second)(ConfigFactory.parseString(
"""
akka.cluster.data-center = "dc1"
akka.loglevel = INFO
"""))
nodeConfig(third, fourth, fifth)(ConfigFactory.parseString(
"""
akka.cluster.data-center = "dc2"
akka.loglevel = INFO
"""))
testTransport(on = true)
}
class MultiDcMultiJvmNode1 extends MultiDcSpec
class MultiDcMultiJvmNode2 extends MultiDcSpec
class MultiDcMultiJvmNode3 extends MultiDcSpec
class MultiDcMultiJvmNode4 extends MultiDcSpec
class MultiDcMultiJvmNode5 extends MultiDcSpec
object MultiDcNormalConfig extends MultiDcSpecConfig()
abstract class MultiDcSpec
extends MultiNodeSpec(MultiDcMultiJvmSpec)
class MultiDcMultiJvmNode1 extends MultiDcSpec(MultiDcNormalConfig)
class MultiDcMultiJvmNode2 extends MultiDcSpec(MultiDcNormalConfig)
class MultiDcMultiJvmNode3 extends MultiDcSpec(MultiDcNormalConfig)
class MultiDcMultiJvmNode4 extends MultiDcSpec(MultiDcNormalConfig)
class MultiDcMultiJvmNode5 extends MultiDcSpec(MultiDcNormalConfig)
object MultiDcFewCrossDcConnectionsConfig extends MultiDcSpecConfig(1)
class MultiDcFewCrossDcMultiJvmNode1 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig)
class MultiDcFewCrossDcMultiJvmNode2 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig)
class MultiDcFewCrossDcMultiJvmNode3 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig)
class MultiDcFewCrossDcMultiJvmNode4 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig)
class MultiDcFewCrossDcMultiJvmNode5 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig)
abstract class MultiDcSpec(config: MultiDcSpecConfig)
extends MultiNodeSpec(config)
with MultiNodeClusterSpec {
import MultiDcMultiJvmSpec._
import config._
"A cluster with multiple data centers" must {
"be able to form" in {
@ -87,27 +99,30 @@ abstract class MultiDcSpec
runOn(first) {
testConductor.blackhole(first, third, Direction.Both).await
}
runOn(first, second, third, fourth) {
awaitAssert(clusterView.unreachableMembers should not be empty)
}
enterBarrier("inter-data-center unreachability")
runOn(fifth) {
cluster.join(third)
}
runOn(third, fourth, fifth) {
// should be able to join and become up since the
// unreachable is between dc1 and dc2,
within(10.seconds) {
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5))
}
}
runOn(first) {
testConductor.passThrough(first, third, Direction.Both).await
}
// should be able to join and become up since the
// unreachable is between dc1 and dc2,
within(10.seconds) {
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5))
}
runOn(first) {
testConductor.passThrough(first, third, Direction.Both).await
}
runOn(first, second, third, fourth) {
awaitAssert(clusterView.unreachableMembers should not be empty)
}
enterBarrier("inter-data-center unreachability end")
}
@ -115,9 +130,6 @@ abstract class MultiDcSpec
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
runOn(first, second, third, fourth) {
awaitAssert(clusterView.unreachableMembers should not be empty)
}
enterBarrier("other-data-center-internal-unreachable")
runOn(third) {

View file

@ -58,14 +58,6 @@ abstract class MultiDcSplitBrainSpec
testConductor.blackhole(dc1Node, dc2Node, Direction.Both).await
}
}
runOn(dc1: _*) {
awaitAssert(clusterView.unreachableMembers.map(_.address) should contain allElementsOf (dc2.map(address)))
}
runOn(dc2: _*) {
awaitAssert(clusterView.unreachableMembers.map(_.address) should contain allElementsOf (dc1.map(address)))
}
}
def unsplitDataCenters(dc1: Seq[RoleName], dc2: Seq[RoleName]): Unit = {
@ -78,9 +70,6 @@ abstract class MultiDcSplitBrainSpec
}
}
runOn(dc1 ++ dc2: _*) {
awaitAssert(clusterView.unreachableMembers.map(_.address) should be(empty))
}
}
"A cluster with multiple data centers" must {

View file

@ -19,7 +19,7 @@ import akka.testkit.ImplicitSender
import akka.actor.ActorRef
import akka.remote.RARP
import akka.testkit.TestProbe
import akka.cluster.ClusterSettings.DefaultDataCenter
import akka.cluster.ClusterSettings.{ DataCenter, DefaultDataCenter }
object ClusterDomainEventPublisherSpec {
val config = """
@ -50,27 +50,30 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up)
val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP"))
val emptyMembershipState = MembershipState(Gossip.empty, aUp.uniqueAddress, DefaultDataCenter)
private def state(gossip: Gossip, self: UniqueAddress, dc: DataCenter) =
MembershipState(gossip, self, DefaultDataCenter, crossDcConnections = 5)
val emptyMembershipState = state(Gossip.empty, aUp.uniqueAddress, DefaultDataCenter)
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress)
val state0 = MembershipState(g0, aUp.uniqueAddress, DefaultDataCenter)
val state0 = state(g0, aUp.uniqueAddress, DefaultDataCenter)
val g1 = Gossip(members = SortedSet(aUp, cJoining)).seen(aUp.uniqueAddress).seen(cJoining.uniqueAddress)
val state1 = MembershipState(g1, aUp.uniqueAddress, DefaultDataCenter)
val state1 = state(g1, aUp.uniqueAddress, DefaultDataCenter)
val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.uniqueAddress)
val state2 = MembershipState(g2, aUp.uniqueAddress, DefaultDataCenter)
val state2 = state(g2, aUp.uniqueAddress, DefaultDataCenter)
val g3 = g2.seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress)
val state3 = MembershipState(g3, aUp.uniqueAddress, DefaultDataCenter)
val state3 = state(g3, aUp.uniqueAddress, DefaultDataCenter)
val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress)
val state4 = MembershipState(g4, aUp.uniqueAddress, DefaultDataCenter)
val state4 = state(g4, aUp.uniqueAddress, DefaultDataCenter)
val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress)
val state5 = MembershipState(g5, aUp.uniqueAddress, DefaultDataCenter)
val state5 = state(g5, aUp.uniqueAddress, DefaultDataCenter)
val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress)
val state6 = MembershipState(g6, aUp.uniqueAddress, DefaultDataCenter)
val state6 = state(g6, aUp.uniqueAddress, DefaultDataCenter)
val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress)
val state7 = MembershipState(g7, aUp.uniqueAddress, DefaultDataCenter)
val state7 = state(g7, aUp.uniqueAddress, DefaultDataCenter)
val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability =
Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress)
val state8 = MembershipState(g8, aUp.uniqueAddress, DefaultDataCenter)
val state8 = state(g8, aUp.uniqueAddress, DefaultDataCenter)
// created in beforeEach
var memberSubscriber: TestProbe = _
@ -143,11 +146,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged]))
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(MembershipState(Gossip(members = SortedSet(cJoining, dUp)), dUp.uniqueAddress, DefaultDataCenter))
publisher ! PublishChanges(state(Gossip(members = SortedSet(cJoining, dUp)), dUp.uniqueAddress, DefaultDataCenter))
subscriber.expectMsgAllOf(
RoleLeaderChanged("GRP", Some(dUp.address)),
RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(dUp.address)))
publisher ! PublishChanges(MembershipState(Gossip(members = SortedSet(cUp, dUp)), dUp.uniqueAddress, DefaultDataCenter))
publisher ! PublishChanges(state(Gossip(members = SortedSet(cUp, dUp)), dUp.uniqueAddress, DefaultDataCenter))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))
}

View file

@ -7,6 +7,7 @@ package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.actor.Address
import scala.collection.immutable.SortedSet
class ClusterDomainEventSpec extends WordSpec with Matchers {
@ -39,7 +40,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) (gs.seen(m.uniqueAddress), as + m.uniqueAddress) }
private def state(g: Gossip): MembershipState =
MembershipState(g, selfDummyAddress, ClusterSettings.DefaultDataCenter)
state(g, selfDummyAddress)
private def state(g: Gossip, self: UniqueAddress): MembershipState =
MembershipState(g, self, ClusterSettings.DefaultDataCenter, crossDcConnections = 5)
"Domain events" must {
@ -80,8 +84,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
// never include self member in unreachable
diffUnreachable(
MembershipState(g1, bDown.uniqueAddress, ClusterSettings.DefaultDataCenter),
MembershipState(g2, bDown.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq())
state(g1, bDown.uniqueAddress),
state(g2, bDown.uniqueAddress)) should ===(Seq())
diffSeen(state(g1), state(g2)) should ===(Seq.empty)
}
@ -99,13 +103,13 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
diffUnreachable(state(g1), state(g2)) should ===(Seq(UnreachableMember(cUp)))
// never include self member in unreachable
diffUnreachable(
MembershipState(g1, cUp.uniqueAddress, ClusterSettings.DefaultDataCenter),
MembershipState(g2, cUp.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq())
state(g1, cUp.uniqueAddress),
state(g2, cUp.uniqueAddress)) should ===(Seq())
diffReachable(state(g1), state(g2)) should ===(Seq(ReachableMember(bUp)))
// never include self member in reachable
diffReachable(
MembershipState(g1, bUp.uniqueAddress, ClusterSettings.DefaultDataCenter),
MembershipState(g2, bUp.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq())
state(g1, bUp.uniqueAddress),
state(g2, bUp.uniqueAddress)) should ===(Seq())
}
"be produced for removed members" in {

View file

@ -35,7 +35,7 @@ class GossipSpec extends WordSpec with Matchers {
val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, dataCenter = dc2d1.dataCenter)
private def state(g: Gossip, selfMember: Member = a1): MembershipState =
MembershipState(g, selfMember.uniqueAddress, selfMember.dataCenter)
MembershipState(g, selfMember.uniqueAddress, selfMember.dataCenter, crossDcConnections = 5)
"A Gossip" must {

View file

@ -0,0 +1,206 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.actor.Address
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.MemberStatus.Up
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.immutable.SortedSet
class GossipTargetSelectorSpec extends WordSpec with Matchers {
val aDc1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, dataCenter = "dc1")
val bDc1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, dataCenter = "dc1")
val cDc1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, dataCenter = "dc1")
val eDc2 = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, Set.empty, dataCenter = "dc2")
val fDc2 = TestMember(Address("akka.tcp", "sys", "f", 2552), Up, Set.empty, dataCenter = "dc2")
val gDc3 = TestMember(Address("akka.tcp", "sys", "g", 2552), Up, Set.empty, dataCenter = "dc3")
val hDc3 = TestMember(Address("akka.tcp", "sys", "h", 2552), Up, Set.empty, dataCenter = "dc3")
val defaultSelector = new GossipTargetSelector(
reduceGossipDifferentViewProbability = 400,
crossDcGossipProbability = 0.2
)
"The gossip target selection" should {
"select local nodes in a multi dc setting when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = true
}
val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5)
val gossipTo = alwaysLocalSelector.gossipTargets(state)
// only one other local node
gossipTo should ===(Vector[UniqueAddress](bDc1))
}
"select cross dc nodes when chance says so" in {
val alwaysCrossDcSelector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
}
val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5)
val gossipTo = alwaysCrossDcSelector.gossipTargets(state)
// only one other local node
gossipTo should (contain(eDc2.uniqueAddress) or contain(fDc2.uniqueAddress))
}
"select local nodes that hasn't seen the gossip when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = true
}
val state = MembershipState(
Gossip(SortedSet(aDc1, bDc1, cDc1)).seen(bDc1),
aDc1,
aDc1.dataCenter,
crossDcConnections = 5
)
val gossipTo = alwaysLocalSelector.gossipTargets(state)
// a1 is self, b1 has seen so only option is c1
gossipTo should ===(Vector[UniqueAddress](cDc1))
}
"select among all local nodes regardless if they saw the gossip already when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false
}
val state = MembershipState(
Gossip(SortedSet(aDc1, bDc1, cDc1)).seen(bDc1),
aDc1,
aDc1.dataCenter,
crossDcConnections = 5
)
val gossipTo = alwaysLocalSelector.gossipTargets(state)
// a1 is self, b1 is the only that has seen
gossipTo should ===(Vector[UniqueAddress](bDc1, cDc1))
}
"not choose unreachable nodes" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false
}
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, cDc1),
overview = GossipOverview(
reachability = Reachability.empty.unreachable(aDc1, bDc1))),
aDc1,
aDc1.dataCenter,
crossDcConnections = 5)
val gossipTo = alwaysLocalSelector.gossipTargets(state)
// a1 cannot reach b1 so only option is c1
gossipTo should ===(Vector[UniqueAddress](cDc1))
}
"continue with the next dc when doing cross dc and no node where suitable" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, eDc2, fDc2, gDc3, hDc3),
overview = GossipOverview(
reachability = Reachability.empty
.unreachable(aDc1, eDc2)
.unreachable(aDc1, fDc2))),
aDc1,
aDc1.dataCenter,
crossDcConnections = 5)
val gossipTo = selector.gossipTargets(state)
gossipTo should ===(Vector[UniqueAddress](gDc3, hDc3))
}
"not care about seen/unseen for cross dc" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, eDc2, fDc2, gDc3, hDc3)
).seen(fDc2).seen(hDc3),
aDc1,
aDc1.dataCenter,
crossDcConnections = 5)
val gossipTo = selector.gossipTargets(state)
gossipTo should ===(Vector[UniqueAddress](eDc2, fDc2))
}
"limit the numbers of chosen cross dc nodes to the crossDcConnections setting" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, eDc2, fDc2, gDc3, hDc3)),
aDc1,
aDc1.dataCenter,
crossDcConnections = 1)
val gossipTo = selector.gossipTargets(state)
gossipTo should ===(Vector[UniqueAddress](eDc2))
}
"select N random local nodes when single dc" in {
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, cDc1)),
aDc1,
aDc1.dataCenter,
crossDcConnections = 1) // means only a e and g are oldest
val randomNodes = defaultSelector.randomNodesForFullGossip(state, 3)
randomNodes.toSet should ===(Set[UniqueAddress](bDc1, cDc1))
}
"select N random local nodes when not self among oldest" in {
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, cDc1, eDc2, fDc2, gDc3, hDc3)),
bDc1,
bDc1.dataCenter,
crossDcConnections = 1) // means only a, e and g are oldest
val randomNodes = defaultSelector.randomNodesForFullGossip(state, 3)
randomNodes.toSet should ===(Set[UniqueAddress](aDc1, cDc1))
}
"select N-1 random local nodes plus one cross dc oldest node when self among oldest" in {
val state = MembershipState(
Gossip(
members = SortedSet(aDc1, bDc1, cDc1, eDc2, fDc2)),
aDc1,
aDc1.dataCenter,
crossDcConnections = 1) // means only a and e are oldest
val randomNodes = defaultSelector.randomNodesForFullGossip(state, 3)
randomNodes.toSet should ===(Set[UniqueAddress](bDc1, cDc1, eDc2))
}
}
// made the test so much easier to read
import scala.language.implicitConversions
private implicit def memberToUniqueAddress(m: Member): UniqueAddress = m.uniqueAddress
}