diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index dc2847b258..ef0a32fd1b 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -210,7 +210,16 @@ akka { # in which case these settings would not be used at all - they only apply, # if your cluster nodes are configured with at-least 2 different `akka.cluster.data-center` values. multi-data-center { - + + # Try to limit the number of connections between data centers. Used for gossip and heartbeating. + # This will not limit connections created for the messaging of the application. + # If the cluster does not span multiple data centers, this value has no effect. + cross-data-center-connections = 5 + + # The n oldest nodes in a data center will choose to gossip to another data center with + # this probability. Must be a value between 0.0 and 1.0 where 0.0 means never, 1.0 means always. + cross-data-center-gossip-probability = 0.2 + failure-detector { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have @@ -232,11 +241,6 @@ akka { # will start after this period, even though no heartbeat message has # been received. expected-response-after = 1 s - - # Maximum number of oldest members in a data center that will monitor other (oldest nodes in other) data centers. - # This is done to lessen the cross data center communication, as only those top-n-oldest nodes - # need to maintain connections to the other data-centers. - nr-of-monitoring-members = 5 } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d4b7ddcb4d..dde1cf19ba 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -108,7 +108,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val crossDcFailureDetector: FailureDetectorRegistry[Address] = { val createFailureDetector = () ⇒ - FailureDetectorLoader.load(settings.CrossDcFailureDetectorSettings.ImplementationClass, settings.CrossDcFailureDetectorSettings.config, system) + FailureDetectorLoader.load( + settings.MultiDataCenter.CrossDcFailureDetectorSettings.ImplementationClass, + settings.MultiDataCenter.CrossDcFailureDetectorSettings.config, system) new DefaultFailureDetectorRegistry(createFailureDetector) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 80391ec612..c038c5ecb5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -4,7 +4,7 @@ package akka.cluster import language.existentials -import scala.collection.immutable +import scala.collection.{ SortedSet, breakOut, immutable, mutable } import scala.concurrent.duration._ import java.util.concurrent.ThreadLocalRandom @@ -15,8 +15,6 @@ import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.cluster.ClusterSettings.DataCenter import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } - -import scala.collection.breakOut import akka.remote.QuarantinedEvent import java.util.ArrayList import java.util.Collections @@ -25,9 +23,11 @@ import akka.pattern.ask import akka.util.Timeout import akka.Done import akka.annotation.InternalApi +import akka.cluster.ClusterSettings.DataCenter import scala.concurrent.Future import scala.concurrent.Promise +import scala.util.Random /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -301,10 +301,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with protected def selfUniqueAddress = cluster.selfUniqueAddress val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress)) + val gossipTargetSelector = new GossipTargetSelector( + ReduceGossipDifferentViewProbability, + cluster.settings.MultiDataCenter.CrossDcGossipProbability) // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet - var membershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter) + var membershipState = MembershipState( + Gossip.empty, + cluster.selfUniqueAddress, + cluster.settings.DataCenter, + cluster.settings.MultiDataCenter.CrossDcConnections) + def latestGossip: Gossip = membershipState.latestGossip val statsEnabled = PublishStatsInterval.isFinite @@ -925,88 +933,25 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with */ def gossipRandomN(n: Int): Unit = { if (!isSingletonCluster && n > 0) { - val localGossip = latestGossip - // using ArrayList to be able to shuffle - val possibleTargets = new ArrayList[UniqueAddress](localGossip.members.size) - localGossip.members.foreach { m ⇒ - if (validNodeForGossip(m.uniqueAddress)) - possibleTargets.add(m.uniqueAddress) - } - val randomTargets = - if (possibleTargets.size <= n) - possibleTargets - else { - Collections.shuffle(possibleTargets, ThreadLocalRandom.current()) - possibleTargets.subList(0, n) - } - - val iter = randomTargets.iterator - while (iter.hasNext) - gossipTo(iter.next()) + gossipTargetSelector.randomNodesForFullGossip(membershipState, n).foreach(gossipTo) } } /** * Initiates a new round of gossip. */ - def gossip(): Unit = { - + def gossip(): Unit = if (!isSingletonCluster) { - val localGossip = latestGossip - - val preferredGossipTargets: Vector[UniqueAddress] = - if (ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability) { - // If it's time to try to gossip to some nodes with a different view - // gossip to a random alive member with preference to a member with older gossip version - localGossip.members.collect { - case m if !localGossip.seenByNode(m.uniqueAddress) && validNodeForGossip(m.uniqueAddress) ⇒ - m.uniqueAddress - }(breakOut) - } else Vector.empty - - if (preferredGossipTargets.nonEmpty) { - val peer = selectRandomNode(preferredGossipTargets) - // send full gossip because it has different view - peer foreach gossipTo - } else { - // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) - val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect { - case m if validNodeForGossip(m.uniqueAddress) ⇒ m.uniqueAddress - }) - peer foreach { node ⇒ - if (localGossip.seenByNode(node)) gossipStatusTo(node) - else gossipTo(node) - } + gossipTargetSelector.gossipTarget(membershipState) match { + case Some(peer) ⇒ + if (!membershipState.isInSameDc(peer) || latestGossip.seenByNode(peer)) + // avoid transferring the full state if possible + gossipStatusTo(peer) + else + gossipTo(peer) + case None ⇒ // nothing to see here } } - } - - /** - * For large clusters we should avoid shooting down individual - * nodes. Therefore the probability is reduced for large clusters. - */ - def adjustedGossipDifferentViewProbability: Double = { - val size = latestGossip.members.size - val low = ReduceGossipDifferentViewProbability - val high = low * 3 - // start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability - if (size <= low) - GossipDifferentViewProbability - else { - // don't go lower than 1/10 of the configured GossipDifferentViewProbability - val minP = GossipDifferentViewProbability / 10 - if (size >= high) - minP - else { - // linear reduction of the probability with increasing number of nodes - // from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes - // to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes - // i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes - val k = (minP - GossipDifferentViewProbability) / (high - low) - GossipDifferentViewProbability + (size - low) * k - } - } - } /** * Runs periodic leader actions, such as member status transitions, assigning partitions etc. @@ -1244,10 +1189,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } - def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] = - if (nodes.isEmpty) None - else Some(nodes(ThreadLocalRandom.current nextInt nodes.size)) - def isSingletonCluster: Boolean = latestGossip.isSingletonCluster // needed for tests @@ -1261,24 +1202,21 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with * Gossips latest gossip to a node. */ def gossipTo(node: UniqueAddress): Unit = - if (validNodeForGossip(node)) + if (membershipState.validNodeForGossip(node)) clusterCore(node.address) ! GossipEnvelope(selfUniqueAddress, node, latestGossip) def gossipTo(node: UniqueAddress, destination: ActorRef): Unit = - if (validNodeForGossip(node)) + if (membershipState.validNodeForGossip(node)) destination ! GossipEnvelope(selfUniqueAddress, node, latestGossip) def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit = - if (validNodeForGossip(node)) + if (membershipState.validNodeForGossip(node)) destination ! GossipStatus(selfUniqueAddress, latestGossip.version) def gossipStatusTo(node: UniqueAddress): Unit = - if (validNodeForGossip(node)) + if (membershipState.validNodeForGossip(node)) clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version) - def validNodeForGossip(node: UniqueAddress): Boolean = - node != selfUniqueAddress && membershipState.isReachableExcludingDownedObservers(node) - def updateLatestGossip(gossip: Gossip): Unit = { // Updating the vclock version for the changes val versionedGossip = gossip :+ vclockNode diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index b0b72633a1..16c9b21c7e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -378,7 +378,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val cluster = Cluster(context.system) val selfUniqueAddress = cluster.selfUniqueAddress - val emptyMembershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter) + val emptyMembershipState = MembershipState( + Gossip.empty, + cluster.selfUniqueAddress, + cluster.settings.DataCenter, + cluster.settings.MultiDataCenter.CrossDcConnections) var membershipState: MembershipState = emptyMembershipState def selfDc = cluster.settings.DataCenter diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index df37649d2b..63a4cec36f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -10,7 +10,7 @@ import com.typesafe.config.ConfigObject import scala.concurrent.duration.Duration import akka.actor.Address import akka.actor.AddressFromURIString -import akka.annotation.{ DoNotInherit, InternalApi } +import akka.annotation.InternalApi import akka.dispatch.Dispatchers import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase } @@ -34,7 +34,6 @@ object ClusterSettings { } final class ClusterSettings(val config: Config, val systemName: String) { - import ClusterSettings._ import ClusterSettings._ private val cc = config.getConfig("akka.cluster") @@ -51,6 +50,28 @@ final class ClusterSettings(val config: Config, val systemName: String) { FailureDetectorConfig.getInt("monitored-by-nr-of-members") } requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0") + final class CrossDcFailureDetectorSettings(val config: Config) { + val ImplementationClass: String = config.getString("implementation-class") + val HeartbeatInterval: FiniteDuration = { + config.getMillisDuration("heartbeat-interval") + } requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0") + val HeartbeatExpectedResponseAfter: FiniteDuration = { + config.getMillisDuration("expected-response-after") + } requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0") + def NrOfMonitoringActors: Int = MultiDataCenter.CrossDcConnections + } + + object MultiDataCenter { + val CrossDcConnections: Int = cc.getInt("multi-data-center.cross-data-center-connections") + .requiring(_ > 0, "cross-data-center-connections must be > 0") + + val CrossDcGossipProbability: Double = cc.getDouble("multi-data-center.cross-data-center-gossip-probability") + .requiring(d ⇒ d >= 0.0D && d <= 1.0D, "cross-data-center-gossip-probability must be >= 0.0 and <= 1.0") + + val CrossDcFailureDetectorSettings: CrossDcFailureDetectorSettings = + new CrossDcFailureDetectorSettings(cc.getConfig("multi-data-center.failure-detector")) + } + val SeedNodes: immutable.IndexedSeq[Address] = immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector val SeedNodeTimeout: FiniteDuration = cc.getMillisDuration("seed-node-timeout") @@ -114,24 +135,10 @@ final class ClusterSettings(val config: Config, val systemName: String) { val QuarantineRemovedNodeAfter: FiniteDuration = cc.getMillisDuration("quarantine-removed-node-after") requiring (_ > Duration.Zero, "quarantine-removed-node-after must be > 0") - val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members") + val AllowWeaklyUpMembers: Boolean = cc.getBoolean("allow-weakly-up-members") val DataCenter: DataCenter = cc.getString("data-center") - final class CrossDcFailureDetectorSettings(val config: Config) { - val ImplementationClass: String = config.getString("implementation-class") - val HeartbeatInterval: FiniteDuration = { - config.getMillisDuration("heartbeat-interval") - } requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0") - val HeartbeatExpectedResponseAfter: FiniteDuration = { - config.getMillisDuration("expected-response-after") - } requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0") - val NrOfMonitoringActors: Int = { - config.getInt("nr-of-monitoring-members") - } requiring (_ > 0, "failure-detector.nr-of-monitoring-members must be > 0") - } - val CrossDcFailureDetectorSettings = new CrossDcFailureDetectorSettings(cc.getConfig("multi-data-center.failure-detector")) - val Roles: Set[String] = { val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring ( _.forall(!_.startsWith(DcRolePrefix)), @@ -162,8 +169,8 @@ final class ClusterSettings(val config: Config, val systemName: String) { val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") object Debug { - val VerboseHeartbeatLogging = cc.getBoolean("debug.verbose-heartbeat-logging") - val VerboseGossipLogging = cc.getBoolean("debug.verbose-gossip-logging") + val VerboseHeartbeatLogging: Boolean = cc.getBoolean("debug.verbose-heartbeat-logging") + val VerboseGossipLogging: Boolean = cc.getBoolean("debug.verbose-gossip-logging") } } diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala index b022051bb6..2a5c9585b3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala @@ -47,7 +47,9 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg val isExternalClusterMember: Member ⇒ Boolean = member ⇒ member.dataCenter != cluster.selfDataCenter - val crossDcSettings: cluster.settings.CrossDcFailureDetectorSettings = cluster.settings.CrossDcFailureDetectorSettings + val crossDcSettings: cluster.settings.CrossDcFailureDetectorSettings = + cluster.settings.MultiDataCenter.CrossDcFailureDetectorSettings + val crossDcFailureDetector = cluster.crossDcFailureDetector val selfHeartbeat = ClusterHeartbeatSender.Heartbeat(selfAddress) @@ -299,7 +301,7 @@ private[cluster] object CrossDcHeartbeatingState { crossDcFailureDetector, nrOfMonitoredNodesPerDc, state = { - // TODO unduplicate this with other places where we do this + // TODO unduplicate this with the logic in MembershipState.ageSortedTopOldestMembersPerDc val groupedByDc = members.groupBy(_.dataCenter) if (members.ordering == Member.ageOrdering) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index dcf3982355..9a05003004 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -87,6 +87,13 @@ private[cluster] final case class Gossip( @transient private lazy val membersMap: Map[UniqueAddress, Member] = members.map(m ⇒ m.uniqueAddress → m)(collection.breakOut) + @transient lazy val isMultiDc = + if (members.size <= 1) false + else { + val dc1 = members.head.dataCenter + members.exists(_.dataCenter != dc1) + } + /** * Increments the version for this 'Node'. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index e2375a4d5b..99e10a273c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -3,12 +3,19 @@ */ package akka.cluster +import java.util.{ ArrayList, Collections } +import java.util.concurrent.ThreadLocalRandom + import scala.collection.immutable import scala.collection.SortedSet import akka.cluster.ClusterSettings.DataCenter import akka.cluster.MemberStatus._ import akka.annotation.InternalApi +import scala.annotation.tailrec +import scala.collection.breakOut +import scala.util.Random + /** * INTERNAL API */ @@ -23,9 +30,16 @@ import akka.annotation.InternalApi /** * INTERNAL API */ -@InternalApi private[akka] final case class MembershipState(latestGossip: Gossip, selfUniqueAddress: UniqueAddress, selfDc: DataCenter) { +@InternalApi private[akka] final case class MembershipState( + latestGossip: Gossip, + selfUniqueAddress: UniqueAddress, + selfDc: DataCenter, + crossDcConnections: Int) { + import MembershipState._ + lazy val selfMember = latestGossip.member(selfUniqueAddress) + def members: immutable.SortedSet[Member] = latestGossip.members def overview: GossipOverview = latestGossip.overview @@ -76,6 +90,20 @@ import akka.annotation.InternalApi overview.reachability.removeObservers(membersToExclude).remove(members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) } + /** + * @return Up to `crossDcConnections` oldest members for each DC + */ + lazy val ageSortedTopOldestMembersPerDc: Map[DataCenter, SortedSet[Member]] = + // TODO make this recursive and bail early when size reached to make it fast for large clusters + latestGossip.members.foldLeft(Map.empty[DataCenter, SortedSet[Member]]) { (acc, member) ⇒ + acc.get(member.dataCenter) match { + case Some(set) ⇒ + if (set.size < crossDcConnections) acc + (member.dataCenter → (set + member)) + else acc + case None ⇒ acc + (member.dataCenter → (SortedSet.empty(Member.ageOrdering) + member)) + } + } + /** * @return true if toAddress should be reachable from the fromDc in general, within a data center * this means only caring about data center local observations, across data centers it @@ -119,4 +147,173 @@ import akka.annotation.InternalApi .map(_.uniqueAddress) } + def isInSameDc(node: UniqueAddress): Boolean = + node == selfUniqueAddress || latestGossip.member(node).dataCenter == selfDc + + def validNodeForGossip(node: UniqueAddress): Boolean = + node != selfUniqueAddress && + ((isInSameDc(node) && isReachableExcludingDownedObservers(node)) || + // if cross DC we need to check pairwise unreachable observation + overview.reachability.isReachable(selfUniqueAddress, node)) + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class GossipTargetSelector( + reduceGossipDifferentViewProbability: Double, + crossDcGossipProbability: Double) { + + final def gossipTarget(state: MembershipState): Option[UniqueAddress] = { + selectRandomNode(gossipTargets(state)) + } + + final def gossipTargets(state: MembershipState): Vector[UniqueAddress] = + if (state.latestGossip.isMultiDc) multiDcGossipTargets(state) + else localDcGossipTargets(state) + + /** + * Select `n` random nodes to gossip to (used to quickly inform the rest of the cluster when leaving for example) + */ + def randomNodesForFullGossip(state: MembershipState, n: Int): Vector[UniqueAddress] = + if (state.latestGossip.isMultiDc && state.ageSortedTopOldestMembersPerDc(state.selfDc).contains(state.selfMember)) { + // this node is one of the N oldest in the cluster, gossip to one cross-dc but mostly locally + val randomLocalNodes = Random.shuffle(state.members.toVector.collect { + case m if m.dataCenter == state.selfDc && state.validNodeForGossip(m.uniqueAddress) ⇒ m.uniqueAddress + }) + + @tailrec + def selectOtherDcNode(randomizedDcs: List[DataCenter]): Option[UniqueAddress] = + randomizedDcs match { + case Nil ⇒ None // couldn't find a single cross-dc-node to talk to + case dc :: tail ⇒ + state.ageSortedTopOldestMembersPerDc(dc).collectFirst { + case m if state.validNodeForGossip(m.uniqueAddress) ⇒ m.uniqueAddress + } match { + case Some(addr) ⇒ Some(addr) + case None ⇒ selectOtherDcNode(tail) + } + + } + val otherDcs = Random.shuffle((state.ageSortedTopOldestMembersPerDc.keySet - state.selfDc).toList) + + selectOtherDcNode(otherDcs) match { + case Some(node) ⇒ randomLocalNodes.take(n - 1) :+ node + case None ⇒ randomLocalNodes.take(n) + } + + } else { + // single dc or not among the N oldest - select local nodes + val selectedNodes = state.members.toVector.collect { + case m if m.dataCenter == state.selfDc && state.validNodeForGossip(m.uniqueAddress) ⇒ m.uniqueAddress + } + + if (selectedNodes.size <= n) selectedNodes + else Random.shuffle(selectedNodes).take(n) + } + + /** + * Chooses a set of possible gossip targets that is in the same dc. If the cluster is not multi dc this + * means it is a choice among all nodes of the cluster. + */ + protected def localDcGossipTargets(state: MembershipState): Vector[UniqueAddress] = { + val latestGossip = state.latestGossip + val firstSelection: Vector[UniqueAddress] = + if (preferNodesWithDifferentView(state)) { + // If it's time to try to gossip to some nodes with a different view + // gossip to a random alive same dc member with preference to a member with older gossip version + latestGossip.members.collect { + case m if m.dataCenter == state.selfDc && !latestGossip.seenByNode(m.uniqueAddress) && state.validNodeForGossip(m.uniqueAddress) ⇒ + m.uniqueAddress + }(breakOut) + } else Vector.empty + + // Fall back to localGossip + if (firstSelection.isEmpty) { + latestGossip.members.toVector.collect { + case m if m.dataCenter == state.selfDc && state.validNodeForGossip(m.uniqueAddress) ⇒ m.uniqueAddress + } + } else firstSelection + + } + + /** + * Choose cross-dc nodes if this one of the N oldest nodes, and if not fall back to gosip locally in the dc + */ + protected def multiDcGossipTargets(state: MembershipState): Vector[UniqueAddress] = { + val latestGossip = state.latestGossip + // only a fraction of the time across data centers + if (selectDcLocalNodes()) localDcGossipTargets(state) + else { + val nodesPerDc = state.ageSortedTopOldestMembersPerDc + + // only do cross DC gossip if this node is among the N oldest + + if (!nodesPerDc(state.selfDc).contains(state.selfMember)) localDcGossipTargets(state) + else { + @tailrec + def findFirstDcWithValidNodes(left: List[DataCenter]): Vector[UniqueAddress] = + left match { + case dc :: tail ⇒ + + val validNodes = nodesPerDc(dc).collect { + case member if state.validNodeForGossip(member.uniqueAddress) ⇒ + member.uniqueAddress + } + + if (validNodes.nonEmpty) validNodes.toVector + else findFirstDcWithValidNodes(tail) // no valid nodes in dc, try next + + case Nil ⇒ + Vector.empty + } + + // chose another DC at random + val otherDcsInRandomOrder = dcsInRandomOrder((nodesPerDc - state.selfDc).keys.toList) + val nodes = findFirstDcWithValidNodes(otherDcsInRandomOrder) + if (nodes.nonEmpty) nodes + // no other dc with reachable nodes, fall back to local gossip + else localDcGossipTargets(state) + } + } + } + + /** + * For large clusters we should avoid shooting down individual + * nodes. Therefore the probability is reduced for large clusters. + */ + protected def adjustedGossipDifferentViewProbability(clusterSize: Int): Double = { + val low = reduceGossipDifferentViewProbability + val high = low * 3 + // start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability + if (clusterSize <= low) + reduceGossipDifferentViewProbability + else { + // don't go lower than 1/10 of the configured GossipDifferentViewProbability + val minP = reduceGossipDifferentViewProbability / 10 + if (clusterSize >= high) + minP + else { + // linear reduction of the probability with increasing number of nodes + // from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes + // to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes + // i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes + val k = (minP - reduceGossipDifferentViewProbability) / (high - low) + reduceGossipDifferentViewProbability + (clusterSize - low) * k + } + } + } + + protected def selectDcLocalNodes(): Boolean = ThreadLocalRandom.current.nextDouble() > crossDcGossipProbability + + protected def preferNodesWithDifferentView(state: MembershipState): Boolean = + ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size) + + protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = + Random.shuffle(dcs) + + protected def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] = + if (nodes.isEmpty) None + else Some(nodes(ThreadLocalRandom.current.nextInt(nodes.size))) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala index 9f49188dd4..91ae496fe0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcClusterSpec.scala @@ -10,41 +10,53 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -object MultiDcMultiJvmSpec extends MultiNodeConfig { +class MultiDcSpecConfig(crossDcConnections: Int = 5) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") val fifth = role("fifth") - commonConfig(MultiNodeClusterSpec.clusterConfig) + commonConfig(ConfigFactory.parseString( + s""" + akka.loglevel = INFO + akka.cluster.multi-data-center.cross-data-center-connections = $crossDcConnections + """).withFallback(MultiNodeClusterSpec.clusterConfig)) nodeConfig(first, second)(ConfigFactory.parseString( """ akka.cluster.data-center = "dc1" - akka.loglevel = INFO """)) nodeConfig(third, fourth, fifth)(ConfigFactory.parseString( """ akka.cluster.data-center = "dc2" - akka.loglevel = INFO """)) testTransport(on = true) } -class MultiDcMultiJvmNode1 extends MultiDcSpec -class MultiDcMultiJvmNode2 extends MultiDcSpec -class MultiDcMultiJvmNode3 extends MultiDcSpec -class MultiDcMultiJvmNode4 extends MultiDcSpec -class MultiDcMultiJvmNode5 extends MultiDcSpec +object MultiDcNormalConfig extends MultiDcSpecConfig() -abstract class MultiDcSpec - extends MultiNodeSpec(MultiDcMultiJvmSpec) +class MultiDcMultiJvmNode1 extends MultiDcSpec(MultiDcNormalConfig) +class MultiDcMultiJvmNode2 extends MultiDcSpec(MultiDcNormalConfig) +class MultiDcMultiJvmNode3 extends MultiDcSpec(MultiDcNormalConfig) +class MultiDcMultiJvmNode4 extends MultiDcSpec(MultiDcNormalConfig) +class MultiDcMultiJvmNode5 extends MultiDcSpec(MultiDcNormalConfig) + +object MultiDcFewCrossDcConnectionsConfig extends MultiDcSpecConfig(1) + +class MultiDcFewCrossDcMultiJvmNode1 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig) +class MultiDcFewCrossDcMultiJvmNode2 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig) +class MultiDcFewCrossDcMultiJvmNode3 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig) +class MultiDcFewCrossDcMultiJvmNode4 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig) +class MultiDcFewCrossDcMultiJvmNode5 extends MultiDcSpec(MultiDcFewCrossDcConnectionsConfig) + +abstract class MultiDcSpec(config: MultiDcSpecConfig) + extends MultiNodeSpec(config) with MultiNodeClusterSpec { - import MultiDcMultiJvmSpec._ + import config._ "A cluster with multiple data centers" must { "be able to form" in { @@ -87,27 +99,30 @@ abstract class MultiDcSpec runOn(first) { testConductor.blackhole(first, third, Direction.Both).await } - runOn(first, second, third, fourth) { - awaitAssert(clusterView.unreachableMembers should not be empty) - } enterBarrier("inter-data-center unreachability") runOn(fifth) { cluster.join(third) } + runOn(third, fourth, fifth) { + // should be able to join and become up since the + // unreachable is between dc1 and dc2, + within(10.seconds) { + awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5)) + } + } + + runOn(first) { + testConductor.passThrough(first, third, Direction.Both).await + } + // should be able to join and become up since the // unreachable is between dc1 and dc2, within(10.seconds) { awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5)) } - runOn(first) { - testConductor.passThrough(first, third, Direction.Both).await - } - runOn(first, second, third, fourth) { - awaitAssert(clusterView.unreachableMembers should not be empty) - } enterBarrier("inter-data-center unreachability end") } @@ -115,9 +130,6 @@ abstract class MultiDcSpec runOn(first) { testConductor.blackhole(first, second, Direction.Both).await } - runOn(first, second, third, fourth) { - awaitAssert(clusterView.unreachableMembers should not be empty) - } enterBarrier("other-data-center-internal-unreachable") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala index 4b0700beb8..d41c0d7608 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala @@ -58,14 +58,6 @@ abstract class MultiDcSplitBrainSpec testConductor.blackhole(dc1Node, dc2Node, Direction.Both).await } } - - runOn(dc1: _*) { - awaitAssert(clusterView.unreachableMembers.map(_.address) should contain allElementsOf (dc2.map(address))) - } - runOn(dc2: _*) { - awaitAssert(clusterView.unreachableMembers.map(_.address) should contain allElementsOf (dc1.map(address))) - } - } def unsplitDataCenters(dc1: Seq[RoleName], dc2: Seq[RoleName]): Unit = { @@ -78,9 +70,6 @@ abstract class MultiDcSplitBrainSpec } } - runOn(dc1 ++ dc2: _*) { - awaitAssert(clusterView.unreachableMembers.map(_.address) should be(empty)) - } } "A cluster with multiple data centers" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 68471ff5b8..e86fe2a1b5 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -19,7 +19,7 @@ import akka.testkit.ImplicitSender import akka.actor.ActorRef import akka.remote.RARP import akka.testkit.TestProbe -import akka.cluster.ClusterSettings.DefaultDataCenter +import akka.cluster.ClusterSettings.{ DataCenter, DefaultDataCenter } object ClusterDomainEventPublisherSpec { val config = """ @@ -50,27 +50,30 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up) val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP")) - val emptyMembershipState = MembershipState(Gossip.empty, aUp.uniqueAddress, DefaultDataCenter) + private def state(gossip: Gossip, self: UniqueAddress, dc: DataCenter) = + MembershipState(gossip, self, DefaultDataCenter, crossDcConnections = 5) + + val emptyMembershipState = state(Gossip.empty, aUp.uniqueAddress, DefaultDataCenter) val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress) - val state0 = MembershipState(g0, aUp.uniqueAddress, DefaultDataCenter) + val state0 = state(g0, aUp.uniqueAddress, DefaultDataCenter) val g1 = Gossip(members = SortedSet(aUp, cJoining)).seen(aUp.uniqueAddress).seen(cJoining.uniqueAddress) - val state1 = MembershipState(g1, aUp.uniqueAddress, DefaultDataCenter) + val state1 = state(g1, aUp.uniqueAddress, DefaultDataCenter) val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.uniqueAddress) - val state2 = MembershipState(g2, aUp.uniqueAddress, DefaultDataCenter) + val state2 = state(g2, aUp.uniqueAddress, DefaultDataCenter) val g3 = g2.seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress) - val state3 = MembershipState(g3, aUp.uniqueAddress, DefaultDataCenter) + val state3 = state(g3, aUp.uniqueAddress, DefaultDataCenter) val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress) - val state4 = MembershipState(g4, aUp.uniqueAddress, DefaultDataCenter) + val state4 = state(g4, aUp.uniqueAddress, DefaultDataCenter) val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress) - val state5 = MembershipState(g5, aUp.uniqueAddress, DefaultDataCenter) + val state5 = state(g5, aUp.uniqueAddress, DefaultDataCenter) val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress) - val state6 = MembershipState(g6, aUp.uniqueAddress, DefaultDataCenter) + val state6 = state(g6, aUp.uniqueAddress, DefaultDataCenter) val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress) - val state7 = MembershipState(g7, aUp.uniqueAddress, DefaultDataCenter) + val state7 = state(g7, aUp.uniqueAddress, DefaultDataCenter) val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability = Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress) - val state8 = MembershipState(g8, aUp.uniqueAddress, DefaultDataCenter) + val state8 = state(g8, aUp.uniqueAddress, DefaultDataCenter) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -143,11 +146,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged])) subscriber.expectMsgType[CurrentClusterState] - publisher ! PublishChanges(MembershipState(Gossip(members = SortedSet(cJoining, dUp)), dUp.uniqueAddress, DefaultDataCenter)) + publisher ! PublishChanges(state(Gossip(members = SortedSet(cJoining, dUp)), dUp.uniqueAddress, DefaultDataCenter)) subscriber.expectMsgAllOf( RoleLeaderChanged("GRP", Some(dUp.address)), RoleLeaderChanged(ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter, Some(dUp.address))) - publisher ! PublishChanges(MembershipState(Gossip(members = SortedSet(cUp, dUp)), dUp.uniqueAddress, DefaultDataCenter)) + publisher ! PublishChanges(state(Gossip(members = SortedSet(cUp, dUp)), dUp.uniqueAddress, DefaultDataCenter)) subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 420a39d18a..0ad9e55da9 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -7,6 +7,7 @@ package akka.cluster import org.scalatest.WordSpec import org.scalatest.Matchers import akka.actor.Address + import scala.collection.immutable.SortedSet class ClusterDomainEventSpec extends WordSpec with Matchers { @@ -39,7 +40,10 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { ((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.uniqueAddress), as + m.uniqueAddress) } private def state(g: Gossip): MembershipState = - MembershipState(g, selfDummyAddress, ClusterSettings.DefaultDataCenter) + state(g, selfDummyAddress) + + private def state(g: Gossip, self: UniqueAddress): MembershipState = + MembershipState(g, self, ClusterSettings.DefaultDataCenter, crossDcConnections = 5) "Domain events" must { @@ -80,8 +84,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { // never include self member in unreachable diffUnreachable( - MembershipState(g1, bDown.uniqueAddress, ClusterSettings.DefaultDataCenter), - MembershipState(g2, bDown.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq()) + state(g1, bDown.uniqueAddress), + state(g2, bDown.uniqueAddress)) should ===(Seq()) diffSeen(state(g1), state(g2)) should ===(Seq.empty) } @@ -99,13 +103,13 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffUnreachable(state(g1), state(g2)) should ===(Seq(UnreachableMember(cUp))) // never include self member in unreachable diffUnreachable( - MembershipState(g1, cUp.uniqueAddress, ClusterSettings.DefaultDataCenter), - MembershipState(g2, cUp.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq()) + state(g1, cUp.uniqueAddress), + state(g2, cUp.uniqueAddress)) should ===(Seq()) diffReachable(state(g1), state(g2)) should ===(Seq(ReachableMember(bUp))) // never include self member in reachable diffReachable( - MembershipState(g1, bUp.uniqueAddress, ClusterSettings.DefaultDataCenter), - MembershipState(g2, bUp.uniqueAddress, ClusterSettings.DefaultDataCenter)) should ===(Seq()) + state(g1, bUp.uniqueAddress), + state(g2, bUp.uniqueAddress)) should ===(Seq()) } "be produced for removed members" in { diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 25e17d91c6..f4c29f2115 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -35,7 +35,7 @@ class GossipSpec extends WordSpec with Matchers { val dc2d2 = TestMember(dc2d1.address, status = Down, roles = Set.empty, dataCenter = dc2d1.dataCenter) private def state(g: Gossip, selfMember: Member = a1): MembershipState = - MembershipState(g, selfMember.uniqueAddress, selfMember.dataCenter) + MembershipState(g, selfMember.uniqueAddress, selfMember.dataCenter, crossDcConnections = 5) "A Gossip" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipTargetSelectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipTargetSelectorSpec.scala new file mode 100644 index 0000000000..a3bfd9747d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/GossipTargetSelectorSpec.scala @@ -0,0 +1,206 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.cluster + +import akka.actor.Address +import akka.cluster.ClusterSettings.DataCenter +import akka.cluster.MemberStatus.Up +import org.scalatest.{ Matchers, WordSpec } + +import scala.collection.immutable.SortedSet + +class GossipTargetSelectorSpec extends WordSpec with Matchers { + + val aDc1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty, dataCenter = "dc1") + val bDc1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty, dataCenter = "dc1") + val cDc1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, Set.empty, dataCenter = "dc1") + + val eDc2 = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, Set.empty, dataCenter = "dc2") + val fDc2 = TestMember(Address("akka.tcp", "sys", "f", 2552), Up, Set.empty, dataCenter = "dc2") + + val gDc3 = TestMember(Address("akka.tcp", "sys", "g", 2552), Up, Set.empty, dataCenter = "dc3") + val hDc3 = TestMember(Address("akka.tcp", "sys", "h", 2552), Up, Set.empty, dataCenter = "dc3") + + val defaultSelector = new GossipTargetSelector( + reduceGossipDifferentViewProbability = 400, + crossDcGossipProbability = 0.2 + ) + + "The gossip target selection" should { + + "select local nodes in a multi dc setting when chance says so" in { + val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) { + override protected def selectDcLocalNodes: Boolean = true + } + + val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5) + val gossipTo = alwaysLocalSelector.gossipTargets(state) + + // only one other local node + gossipTo should ===(Vector[UniqueAddress](bDc1)) + } + + "select cross dc nodes when chance says so" in { + val alwaysCrossDcSelector = new GossipTargetSelector(400, 0.2) { + override protected def selectDcLocalNodes: Boolean = false + } + + val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5) + val gossipTo = alwaysCrossDcSelector.gossipTargets(state) + + // only one other local node + gossipTo should (contain(eDc2.uniqueAddress) or contain(fDc2.uniqueAddress)) + } + + "select local nodes that hasn't seen the gossip when chance says so" in { + val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) { + override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = true + } + + val state = MembershipState( + Gossip(SortedSet(aDc1, bDc1, cDc1)).seen(bDc1), + aDc1, + aDc1.dataCenter, + crossDcConnections = 5 + ) + val gossipTo = alwaysLocalSelector.gossipTargets(state) + + // a1 is self, b1 has seen so only option is c1 + gossipTo should ===(Vector[UniqueAddress](cDc1)) + } + + "select among all local nodes regardless if they saw the gossip already when chance says so" in { + val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) { + override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false + } + + val state = MembershipState( + Gossip(SortedSet(aDc1, bDc1, cDc1)).seen(bDc1), + aDc1, + aDc1.dataCenter, + crossDcConnections = 5 + ) + val gossipTo = alwaysLocalSelector.gossipTargets(state) + + // a1 is self, b1 is the only that has seen + gossipTo should ===(Vector[UniqueAddress](bDc1, cDc1)) + } + + "not choose unreachable nodes" in { + val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) { + override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false + } + + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, cDc1), + overview = GossipOverview( + reachability = Reachability.empty.unreachable(aDc1, bDc1))), + aDc1, + aDc1.dataCenter, + crossDcConnections = 5) + val gossipTo = alwaysLocalSelector.gossipTargets(state) + + // a1 cannot reach b1 so only option is c1 + gossipTo should ===(Vector[UniqueAddress](cDc1)) + } + + "continue with the next dc when doing cross dc and no node where suitable" in { + val selector = new GossipTargetSelector(400, 0.2) { + override protected def selectDcLocalNodes: Boolean = false + override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name + } + + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, eDc2, fDc2, gDc3, hDc3), + overview = GossipOverview( + reachability = Reachability.empty + .unreachable(aDc1, eDc2) + .unreachable(aDc1, fDc2))), + aDc1, + aDc1.dataCenter, + crossDcConnections = 5) + val gossipTo = selector.gossipTargets(state) + gossipTo should ===(Vector[UniqueAddress](gDc3, hDc3)) + } + + "not care about seen/unseen for cross dc" in { + val selector = new GossipTargetSelector(400, 0.2) { + override protected def selectDcLocalNodes: Boolean = false + override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name + } + + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, eDc2, fDc2, gDc3, hDc3) + ).seen(fDc2).seen(hDc3), + aDc1, + aDc1.dataCenter, + crossDcConnections = 5) + val gossipTo = selector.gossipTargets(state) + gossipTo should ===(Vector[UniqueAddress](eDc2, fDc2)) + } + + "limit the numbers of chosen cross dc nodes to the crossDcConnections setting" in { + val selector = new GossipTargetSelector(400, 0.2) { + override protected def selectDcLocalNodes: Boolean = false + override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name + } + + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, eDc2, fDc2, gDc3, hDc3)), + aDc1, + aDc1.dataCenter, + crossDcConnections = 1) + val gossipTo = selector.gossipTargets(state) + gossipTo should ===(Vector[UniqueAddress](eDc2)) + } + + "select N random local nodes when single dc" in { + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, cDc1)), + aDc1, + aDc1.dataCenter, + crossDcConnections = 1) // means only a e and g are oldest + + val randomNodes = defaultSelector.randomNodesForFullGossip(state, 3) + + randomNodes.toSet should ===(Set[UniqueAddress](bDc1, cDc1)) + } + + "select N random local nodes when not self among oldest" in { + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, cDc1, eDc2, fDc2, gDc3, hDc3)), + bDc1, + bDc1.dataCenter, + crossDcConnections = 1) // means only a, e and g are oldest + + val randomNodes = defaultSelector.randomNodesForFullGossip(state, 3) + + randomNodes.toSet should ===(Set[UniqueAddress](aDc1, cDc1)) + } + + "select N-1 random local nodes plus one cross dc oldest node when self among oldest" in { + val state = MembershipState( + Gossip( + members = SortedSet(aDc1, bDc1, cDc1, eDc2, fDc2)), + aDc1, + aDc1.dataCenter, + crossDcConnections = 1) // means only a and e are oldest + + val randomNodes = defaultSelector.randomNodesForFullGossip(state, 3) + + randomNodes.toSet should ===(Set[UniqueAddress](bDc1, cDc1, eDc2)) + } + + } + + // made the test so much easier to read + import scala.language.implicitConversions + private implicit def memberToUniqueAddress(m: Member): UniqueAddress = m.uniqueAddress +}