Cross DC gossip fixes #23803

* Adjust cross DC gossip probability for small nr of nodes in a DC
When a Dc is being bootstrapped the initial node has no local peers and
can not gossip if it selects a local gossip round. Start at a
probability of 1.0 for a single node cluster and move down 0.25 per node
until a 5 node DC is reached then use the cross-data-center-gossip-probability
* Fix cross DC gossip selecting of oldest members
This used to select the members based on the sort order members in
Gossip (by address) rather than by upNumber
This commit is contained in:
Christopher Batey 2017-11-02 08:17:24 +00:00 committed by Johan Andrén
parent a50df1c575
commit 5a37cdc862
10 changed files with 227 additions and 36 deletions

View file

@ -235,6 +235,8 @@ akka {
# The n oldest nodes in a data center will choose to gossip to another data center with
# this probability. Must be a value between 0.0 and 1.0 where 0.0 means never, 1.0 means always.
# When a data center is first started (nodes < 5) a higher probability is used so other data
# centers find out about the new nodes more quickly
cross-data-center-gossip-probability = 0.2
failure-detector {

View file

@ -946,7 +946,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (isGossipSpeedupNeeded) gossip()
def isGossipSpeedupNeeded: Boolean =
(latestGossip.overview.seen.size < latestGossip.members.size / 2)
latestGossip.overview.seen.count(membershipState.isInSameDc) < latestGossip.members.count(_.dataCenter == cluster.selfDataCenter) / 2
/**
* Sends full gossip to `n` other random members.
@ -970,6 +970,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
else
gossipTo(peer)
case None // nothing to see here
if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] dc [{}] will not gossip this round", selfAddress, cluster.settings.SelfDataCenter)
}
}

View file

@ -3,7 +3,6 @@
*/
package akka.cluster
import java.util.{ ArrayList, Collections }
import java.util.concurrent.ThreadLocalRandom
import scala.collection.immutable
@ -102,16 +101,24 @@ import scala.util.Random
/**
* @return Up to `crossDcConnections` oldest members for each DC
*/
lazy val ageSortedTopOldestMembersPerDc: Map[DataCenter, SortedSet[Member]] =
// TODO make this recursive and bail early when size reached to make it fast for large clusters
lazy val ageSortedTopOldestMembersPerDc: Map[DataCenter, SortedSet[Member]] = {
latestGossip.members.foldLeft(Map.empty[DataCenter, SortedSet[Member]]) { (acc, member)
acc.get(member.dataCenter) match {
case Some(set)
if (set.size < crossDcConnections) acc + (member.dataCenter (set + member))
else acc
case None acc + (member.dataCenter (SortedSet.empty(Member.ageOrdering) + member))
if (set.size < crossDcConnections) {
acc + (member.dataCenter (set + member))
} else {
if (set.exists(member.isOlderThan)) {
acc + (member.dataCenter -> (set + member).take(crossDcConnections))
} else {
acc
}
}
case None
acc + (member.dataCenter (SortedSet.empty(Member.ageOrdering) + member))
}
}
}
/**
* @return true if toAddress should be reachable from the fromDc in general, within a data center
@ -255,12 +262,12 @@ import scala.util.Random
}
/**
* Choose cross-dc nodes if this one of the N oldest nodes, and if not fall back to gosip locally in the dc
* Choose cross-dc nodes if this one of the N oldest nodes, and if not fall back to gossip locally in the dc
*/
protected def multiDcGossipTargets(state: MembershipState): Vector[UniqueAddress] = {
val latestGossip = state.latestGossip
// only a fraction of the time across data centers
if (selectDcLocalNodes()) localDcGossipTargets(state)
if (selectDcLocalNodes(state))
localDcGossipTargets(state)
else {
val nodesPerDc = state.ageSortedTopOldestMembersPerDc
@ -321,7 +328,22 @@ import scala.util.Random
}
}
protected def selectDcLocalNodes(): Boolean = ThreadLocalRandom.current.nextDouble() > crossDcGossipProbability
/**
* For small DCs prefer cross DC gossip. This speeds up the bootstrapping of
* new DCs as adding an initial node means it has no local peers.
* Once the DC is at 5 members use the configured crossDcGossipProbability, before
* that for a single node cluster use 1.0, two nodes use 0.75 etc
*/
protected def selectDcLocalNodes(state: MembershipState): Boolean = {
val localMembers = state.dcMembers.size
val probability = if (localMembers > 4)
crossDcGossipProbability
else {
// don't go below the configured probability
math.max((5 - localMembers) * 0.25, crossDcGossipProbability)
}
ThreadLocalRandom.current.nextDouble() > probability
}
protected def preferNodesWithDifferentView(state: MembershipState): Boolean =
ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size)

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.remote.testkit._
import akka.testkit.ImplicitSender
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object InitialMembersOfNewDcSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(
s"""
akka.actor.provider = cluster
akka.actor.warn-about-java-serializer-usage = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster {
jmx.enabled = off
debug.verbose-gossip-logging = on
}
akka.cluster.multi-data-center {
#cross-data-center-gossip-probability = 0.5
}
akka.loglevel = INFO
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.loggers = ["akka.testkit.TestEventListener"]
"""))
val one = role("one")
val two = role("two")
val three = role("three")
val four = role("four")
val five = role("five")
nodeConfig(one, two, three) {
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1")
}
nodeConfig(four, five) {
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC2")
}
}
class InitialMembersOfNewDcSpecMultiJvmNode1 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode2 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode3 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode4 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode5 extends InitialMembersOfNewDcSpec
abstract class InitialMembersOfNewDcSpec extends MultiNodeSpec(InitialMembersOfNewDcSpec) with STMultiNodeSpec with ImplicitSender {
import InitialMembersOfNewDcSpec._
def initialParticipants = roles.size
val cluster = Cluster(system)
"Joining a new DC" must {
"join node one" in {
runOn(one) {
cluster.join(node(one).address)
}
enterBarrier("node one up")
}
"see all dc1 nodes join" in {
runOn(two, three) {
cluster.join(node(one).address)
}
}
"see all dc1 nodes see each other as up" in {
runOn(two, three) {
within(20.seconds) {
awaitAssert({
cluster.state.members.filter(_.status == MemberStatus.Up) should have size 3
})
}
}
enterBarrier("dc1 fully up")
}
"join first member of new dc" in {
enterBarrier("Node 4 about to join")
val startTime = System.nanoTime()
runOn(four) {
log.info("Joining cluster")
cluster.join(node(one).address)
}
// Check how long it takes for all other nodes to see every node as up
runOn(one, two, three, four) {
within(20.seconds) {
awaitAssert({
cluster.state.members.filter(_.status == MemberStatus.Up) should have size 4
})
}
val totalTime = System.nanoTime() - startTime
log.info("Can see new node (and all others as up): {}ms", totalTime.nanos.toMillis)
}
enterBarrier("node 4 joined dc and all nodes know it is up")
}
}
}

View file

@ -3,7 +3,6 @@
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._

View file

@ -90,12 +90,12 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
}
"be produced for reachability observations between data centers" in {
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2")
val dc2BMemberUp = TestMember(Address("akka.tcp", "sys", "dc2B", 2552), Up, Set.empty, "dc2")
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty[String], "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty[String], "dc2")
val dc2BMemberUp = TestMember(Address("akka.tcp", "sys", "dc2B", 2552), Up, Set.empty[String], "dc2")
val dc3AMemberUp = TestMember(Address("akka.tcp", "sys", "dc3A", 2552), Up, Set.empty, "dc3")
val dc3BMemberUp = TestMember(Address("akka.tcp", "sys", "dc3B", 2552), Up, Set.empty, "dc3")
val dc3AMemberUp = TestMember(Address("akka.tcp", "sys", "dc3A", 2552), Up, Set.empty[String], "dc3")
val dc3BMemberUp = TestMember(Address("akka.tcp", "sys", "dc3B", 2552), Up, Set.empty[String], "dc3")
val reachability1 = Reachability.empty
val g1 = Gossip(members = SortedSet(aUp, bUp, dc2AMemberUp, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp), overview = GossipOverview(reachability = reachability1))
@ -121,8 +121,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
}
"not be produced for same reachability observations between data centers" in {
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2")
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty[String], "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty[String], "dc2")
val reachability1 = Reachability.empty
val g1 = Gossip(members = SortedSet(aUp, dc2AMemberUp), overview = GossipOverview(reachability = reachability1))
@ -246,8 +246,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
val s2 = state(g2).copy(selfDc = "dc2")
diffRolesLeader(s0, s1) should ===(Set.empty)
diffRolesLeader(s1, s2) should ===(Set.empty)
diffRolesLeader(s0, s1) should ===(Set.empty[String])
diffRolesLeader(s1, s2) should ===(Set.empty[String])
}
}
}

View file

@ -22,6 +22,8 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
val gDc3 = TestMember(Address("akka.tcp", "sys", "g", 2552), Up, Set.empty, dataCenter = "dc3")
val hDc3 = TestMember(Address("akka.tcp", "sys", "h", 2552), Up, Set.empty, dataCenter = "dc3")
val iDc4 = TestMember(Address("akka.tcp", "sys", "i", 2552), Up, Set.empty, dataCenter = "dc4")
val defaultSelector = new GossipTargetSelector(
reduceGossipDifferentViewProbability = 400,
crossDcGossipProbability = 0.2
@ -29,9 +31,18 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"The gossip target selection" should {
"select remote nodes in a multi dc setting for a single node cluster regardless of probability" in {
val realSelector = new GossipTargetSelector(400, 0.0)
val state = MembershipState(Gossip(SortedSet(iDc4, eDc2, fDc2)), iDc4, iDc4.dataCenter, crossDcConnections = 5)
val gossipTo = realSelector.gossipTargets(state)
gossipTo should ===(Vector[UniqueAddress](eDc2, fDc2))
}
"select local nodes in a multi dc setting when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = true
override protected def selectDcLocalNodes(s: MembershipState): Boolean = true
}
val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5)
@ -43,7 +54,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"select cross dc nodes when chance says so" in {
val alwaysCrossDcSelector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
}
val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5)
@ -55,7 +66,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"select local nodes that hasn't seen the gossip when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = true
override protected def preferNodesWithDifferentView(s: MembershipState): Boolean = true
}
val state = MembershipState(
@ -72,7 +83,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"select among all local nodes regardless if they saw the gossip already when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false
override protected def preferNodesWithDifferentView(s: MembershipState): Boolean = false
}
val state = MembershipState(
@ -89,7 +100,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"not choose unreachable nodes" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false
override protected def preferNodesWithDifferentView(s: MembershipState): Boolean = false
}
val state = MembershipState(
@ -108,7 +119,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"continue with the next dc when doing cross dc and no node where suitable" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
@ -128,7 +139,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"not care about seen/unseen for cross dc" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
@ -145,7 +156,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"limit the numbers of chosen cross dc nodes to the crossDcConnections setting" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.actor.Address
import akka.cluster.MemberStatus.Up
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.immutable.SortedSet
class MembershipStateSpec extends WordSpec with Matchers {
// DC-a is in reverse age order
val a1 = TestMember(Address("akka.tcp", "sys", "a4", 2552), Up, 1, "dc-a")
val a2 = TestMember(Address("akka.tcp", "sys", "a3", 2552), Up, 2, "dc-a")
val a3 = TestMember(Address("akka.tcp", "sys", "a2", 2552), Up, 3, "dc-a")
val a4 = TestMember(Address("akka.tcp", "sys", "a1", 2552), Up, 4, "dc-a")
// DC-b it is the first and the last that are the oldest
val b1 = TestMember(Address("akka.tcp", "sys", "b3", 2552), Up, 1, "dc-b")
val b3 = TestMember(Address("akka.tcp", "sys", "b2", 2552), Up, 3, "dc-b")
// Won't be replaced by b3
val b2 = TestMember(Address("akka.tcp", "sys", "b1", 2552), Up, 2, "dc-b")
// for the case that we don't replace it ever
val bOldest = TestMember(Address("akka.tcp", "sys", "b0", 2552), Up, 0, "dc-b")
"Membership state" must {
"sort by upNumber for oldest top members" in {
val gossip = Gossip(SortedSet(a1, a2, a3, a4, b1, b2, b3, bOldest))
val membershipState = MembershipState(
gossip,
a1.uniqueAddress,
"dc-a",
2
)
membershipState.ageSortedTopOldestMembersPerDc should equal(Map(
"dc-a" -> SortedSet(a1, a2),
"dc-b" -> SortedSet(bOldest, b1)
))
}
}
}

View file

@ -7,11 +7,14 @@ import akka.actor.Address
object TestMember {
def apply(address: Address, status: MemberStatus): Member =
apply(address, status, Set.empty)
apply(address, status, Set.empty[String])
def apply(address: Address, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter = ClusterSettings.DefaultDataCenter): Member =
withUniqueAddress(UniqueAddress(address, 0L), status, roles, dataCenter)
def apply(address: Address, status: MemberStatus, upNumber: Int, dc: ClusterSettings.DataCenter): Member =
apply(address, status, Set.empty, dc, upNumber)
def withUniqueAddress(uniqueAddress: UniqueAddress, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter): Member =
new Member(uniqueAddress, Int.MaxValue, status, roles + (ClusterSettings.DcRolePrefix + dataCenter))
def apply(address: Address, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter = ClusterSettings.DefaultDataCenter, upNumber: Int = Int.MaxValue): Member =
withUniqueAddress(UniqueAddress(address, 0L), status, roles, dataCenter, upNumber)
def withUniqueAddress(uniqueAddress: UniqueAddress, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter, upNumber: Int = Int.MaxValue): Member =
new Member(uniqueAddress, upNumber, status, roles + (ClusterSettings.DcRolePrefix + dataCenter))
}

View file

@ -35,9 +35,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
import MemberStatus._
val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty)
val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty[String])
val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1"))
val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty, "foo")
val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty[String], "foo")
val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1"), "foo")
val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3"))
val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r3"), "foo")