=clu #23229 multi-dc heartbeating, only N nodes perform monitoring

This commit is contained in:
Konrad `ktoso` Malawski 2017-07-07 13:17:41 +02:00 committed by Johan Andrén
parent b3c372eada
commit b568975acc
8 changed files with 574 additions and 27 deletions

View file

@ -205,6 +205,41 @@ akka {
}
# Configures mult-dc specific heartbeating and other mechanisms,
# many of them have a direct counter-part in "one datacenter mode",
# in which case these settings would not be used at all - they only apply,
# if your cluster nodes are configured with at-least 2 different `akka.cluster.data-center` values.
multi-data-center {
failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a public constructor with a com.typesafe.config.Config and
# akka.actor.EventStream parameter.
implementation-class = "akka.remote.DeadlineFailureDetector"
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 10 s
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 3 s
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat message has
# been received.
expected-response-after = 1 s
# Maximum number of oldest members in a data center that will monitor other (oldest nodes in other) data centers.
# This is done to lessen the cross data center communication, as only those top-n-oldest nodes
# need to maintain connections to the other data-centers.
nr-of-monitoring-members = 5
}
}
# If the tick-duration of the default scheduler is longer than the
# tick-duration configured here a dedicated scheduler will be used for
# periodic tasks of the cluster, otherwise the default scheduler is used.

View file

@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.ConfigurationException
import akka.actor._
import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.MonitorableThreadFactory
import akka.event.{ Logging, LoggingAdapter }
import akka.japi.Util
@ -77,6 +78,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
*/
def selfAddress: Address = selfUniqueAddress.address
/** Data center to which this node belongs to (defaults to "default" if not configured explicitly) */
def selfDataCenter: DataCenter = settings.DataCenter
/**
* roles that this member has
*/
@ -96,10 +100,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
logInfo("Starting up...")
val failureDetector: FailureDetectorRegistry[Address] = {
def createFailureDetector(): FailureDetector =
val createFailureDetector = ()
FailureDetectorLoader.load(settings.FailureDetectorImplementationClass, settings.FailureDetectorConfig, system)
new DefaultFailureDetectorRegistry(() createFailureDetector())
new DefaultFailureDetectorRegistry(createFailureDetector)
}
val crossDcFailureDetector: FailureDetectorRegistry[Address] = {
val createFailureDetector = ()
FailureDetectorLoader.load(settings.CrossDcFailureDetectorSettings.ImplementationClass, settings.CrossDcFailureDetectorSettings.config, system)
new DefaultFailureDetectorRegistry(createFailureDetector)
}
// needs to be lazy to allow downing provider impls to access Cluster (if not we get deadlock)
@ -411,7 +422,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
private def closeScheduler(): Unit = scheduler match {
case x: Closeable x.close()
case _
case _ // ignore, this is fine
}
/**

View file

@ -296,6 +296,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
import cluster.settings._
import cluster.InfoLogger._
val selfDc = cluster.selfDataCenter
protected def selfUniqueAddress = cluster.selfUniqueAddress
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
@ -334,8 +336,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
var exitingConfirmed = Set.empty[UniqueAddress]
def selfDc = cluster.settings.DataCenter
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@ -431,8 +431,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def becomeInitialized(): Unit = {
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
val internalHeartbeatSenderProps = Props(new ClusterHeartbeatSender()).withDispatcher(UseDispatcher)
context.actorOf(internalHeartbeatSenderProps, name = "heartbeatSender")
val externalHeartbeatProps = Props(new CrossDcHeartbeatSender()).withDispatcher(UseDispatcher)
context.actorOf(externalHeartbeatProps, name = "crossDcHeartbeatSender")
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(initialized)

View file

@ -5,17 +5,18 @@ package akka.cluster
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.{ ActorLogging, ActorSelection, Address, Actor, RootActorPath }
import akka.actor.{ Actor, ActorLogging, ActorPath, ActorRef, ActorSelection, Address, DeadLetterSuppression, RootActorPath }
import akka.cluster.ClusterEvent._
import akka.remote.FailureDetectorRegistry
import akka.remote.HeartbeatMessage
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
/**
* INTERNAL API.
*
* Receives Heartbeat messages and replies.
*/
@InternalApi
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
import ClusterHeartbeatSender._
@ -29,6 +30,15 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
}
/** INTERNAL API: Utilities to obtain ClusterHeartbeatReceiver paths */
@InternalApi
private[cluster] object ClusterHeartbeatReceiver {
def name: String = "heartbeatReceiver"
def path(address: Address): ActorPath =
RootActorPath(address) / "system" / "cluster" / name
}
/**
* INTERNAL API
*/
@ -65,12 +75,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
import cluster.settings._
import context.dispatcher
// the failureDetector is only updated by this actor, but read from other places
val failureDetector = Cluster(context.system).failureDetector
val filterInternalClusterMembers: Member Boolean =
_.dataCenter == cluster.selfDataCenter
val selfHeartbeat = Heartbeat(selfAddress)
var state = ClusterHeartbeatSenderState(
val failureDetector = cluster.failureDetector
var state: ClusterHeartbeatSenderState = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, MonitoredByNrOfMembers),
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
failureDetector)
@ -94,7 +106,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def heartbeatReceiver(address: Address): ActorSelection =
context.actorSelection(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
context.actorSelection(ClusterHeartbeatReceiver.path(address))
def receive = initializing
@ -116,16 +128,21 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
def init(snapshot: CurrentClusterState): Unit = {
val nodes: Set[UniqueAddress] = snapshot.members.map(_.uniqueAddress)
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
val nodes = snapshot.members.collect { case m if filterInternalClusterMembers(m) m.uniqueAddress }
val unreachable = snapshot.unreachable.collect { case m if filterInternalClusterMembers(m) m.uniqueAddress }
state = state.init(nodes, unreachable)
}
def addMember(m: Member): Unit =
if (m.uniqueAddress != selfUniqueAddress && !state.contains(m.uniqueAddress))
if (m.uniqueAddress != selfUniqueAddress && // is not self
!state.contains(m.uniqueAddress) && // not already added
filterInternalClusterMembers(m) // should be watching members from this DC (internal / external)
) {
state = state.addMember(m.uniqueAddress)
}
def removeMember(m: Member): Unit =
if (filterInternalClusterMembers(m)) { // we only ever deal with internal cluster members here
if (m.uniqueAddress == cluster.selfUniqueAddress) {
// This cluster node will be shutdown, but stop this actor immediately
// to avoid further updates
@ -133,6 +150,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
} else {
state = state.removeMember(m.uniqueAddress)
}
}
def unreachableMember(m: Member): Unit =
state = state.unreachableMember(m.uniqueAddress)
@ -142,7 +160,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def heartbeat(): Unit = {
state.activeReceivers foreach { to
if (cluster.failureDetector.isMonitoring(to.address)) {
if (failureDetector.isMonitoring(to.address)) {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address)
} else {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address)
@ -152,7 +170,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
heartbeatReceiver(to.address) ! selfHeartbeat
}
}
def heartbeatRsp(from: UniqueAddress): Unit = {
@ -173,6 +190,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
* State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing.
* It is immutable, but it updates the failureDetector.
*/
@InternalApi
private[cluster] final case class ClusterHeartbeatSenderState(
ring: HeartbeatNodeRing,
oldReceiversNowUnreachable: Set[UniqueAddress],
@ -262,7 +280,7 @@ private[cluster] final case class HeartbeatNodeRing(
/**
* Receivers for `selfAddress`. Cached for subsequent access.
*/
lazy val myReceivers: immutable.Set[UniqueAddress] = receivers(selfAddress)
lazy val myReceivers: Set[UniqueAddress] = receivers(selfAddress)
private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1)

View file

@ -10,7 +10,7 @@ import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.annotation.InternalApi
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.Dispatchers
import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
@ -117,6 +117,21 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
val DataCenter: DataCenter = cc.getString("data-center")
final class CrossDcFailureDetectorSettings(val config: Config) {
val ImplementationClass: String = config.getString("implementation-class")
val HeartbeatInterval: FiniteDuration = {
config.getMillisDuration("heartbeat-interval")
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
val HeartbeatExpectedResponseAfter: FiniteDuration = {
config.getMillisDuration("expected-response-after")
} requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0")
val NrOfMonitoringActors: Int = {
config.getInt("nr-of-monitoring-members")
} requiring (_ > 0, "failure-detector.nr-of-monitoring-members must be > 0")
}
val CrossDcFailureDetectorSettings = new CrossDcFailureDetectorSettings(cc.getConfig("multi-data-center.failure-detector"))
val Roles: Set[String] = {
val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring (
_.forall(!_.startsWith(DcRolePrefix)),
@ -124,6 +139,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
configuredRoles + s"$DcRolePrefix$DataCenter"
}
val MinNrOfMembers: Int = {
cc.getInt("min-nr-of-members")
} requiring (_ > 0, "min-nr-of-members must be > 0")

View file

@ -0,0 +1,318 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster
import akka.actor.{ Actor, ActorLogging, ActorSelection, Address, NoSerializationVerificationNeeded, RootActorPath }
import akka.annotation.InternalApi
import akka.cluster.ClusterEvent._
import akka.cluster.ClusterSettings.DataCenter
import akka.event.Logging
import akka.remote.FailureDetectorRegistry
import akka.util.ConstantFun
import scala.collection.{ SortedSet, immutable, breakOut }
/**
* INTERNAL API
*
* This actor is will be started on all nodes participating in a cluster,
* however unlike the within-dc heartbeat sender ([[ClusterHeartbeatSender]]),
* it will only actively work on `n` "oldest" nodes of a given data center.
*
* It will monitor it's oldest counterparts in other data centers.
* For example, a DC configured to have (up to) 4 monitoring actors,
* will have 4 such active at any point in time, and those will monitor
* the (at most) 4 oldest nodes of each data center.
*
* This monitoring mode is both simple and predictable, and also uses the assumption that
* "nodes which stay around for a long time, become old", and those rarely change. In a way,
* they are the "core" of a cluster, while other nodes may be very dynamically changing worked
* nodes which aggresively come and go as the traffic in the service changes.
*/
@InternalApi
private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogging {
import CrossDcHeartbeatSender._
val cluster = Cluster(context.system)
val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
import cluster.settings._
import cluster.{ scheduler, selfAddress, selfDataCenter, selfUniqueAddress }
import context.dispatcher
// For inspecting if in active state; allows avoiding "becoming active" when already active
var activelyMonitoring = false
val isExternalClusterMember: Member Boolean =
member member.dataCenter != cluster.selfDataCenter
val crossDcSettings: cluster.settings.CrossDcFailureDetectorSettings = cluster.settings.CrossDcFailureDetectorSettings
val crossDcFailureDetector = cluster.crossDcFailureDetector
val selfHeartbeat = ClusterHeartbeatSender.Heartbeat(selfAddress)
var dataCentersState: CrossDcHeartbeatingState = CrossDcHeartbeatingState.init(
crossDcFailureDetector,
crossDcSettings.NrOfMonitoringActors,
SortedSet.empty
)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(
PeriodicTasksInitialDelay max HeartbeatInterval,
HeartbeatInterval, self, ClusterHeartbeatSender.HeartbeatTick)
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
if (verboseHeartbeat) log.debug("Initialized cross-dc heartbeat sender as DORMANT in DC: [{}]", selfDataCenter)
}
override def postStop(): Unit = {
dataCentersState.activeReceivers.foreach(a crossDcFailureDetector.remove(a.address))
heartbeatTask.cancel()
cluster.unsubscribe(self)
}
/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def heartbeatReceiver(address: Address): ActorSelection =
context.actorSelection(ClusterHeartbeatReceiver.path(address))
def receive: Actor.Receive =
dormant orElse introspecting
/**
* In this state no cross-datacenter heartbeats are sent by this actor.
* This may be because one of those reasons:
* - no nodes in other DCs were detected yet
* - nodes in other DCs are present, but this node is not tht n-th oldest in this DC (see
* `number-of-cross-datacenter-monitoring-actors`), so it does not have to monitor that other data centers
*
* In this state it will however listen to cluster events to eventually take over monitoring other DCs
* in case it becomes "old enough".
*/
def dormant: Actor.Receive = {
case s: CurrentClusterState init(s)
case MemberRemoved(m, _) removeMember(m)
case evt: MemberEvent addMember(evt.member)
case ClusterHeartbeatSender.HeartbeatTick // ignore...
}
def active: Actor.Receive = {
case ClusterHeartbeatSender.HeartbeatTick heartbeat()
case ClusterHeartbeatSender.HeartbeatRsp(from) heartbeatRsp(from)
case MemberRemoved(m, _) removeMember(m)
case evt: MemberEvent addMember(evt.member)
case ClusterHeartbeatSender.ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
}
def introspecting: Actor.Receive = {
case ReportStatus()
sender() ! {
if (activelyMonitoring) CrossDcHeartbeatSender.MonitoringActive(dataCentersState)
else CrossDcHeartbeatSender.MonitoringDormant()
}
}
def init(snapshot: CurrentClusterState): Unit = {
// val unreachable = snapshot.unreachable.collect({ case m if isExternalClusterMember(m) => m.uniqueAddress })
// nr of monitored nodes is the same as the number of monitoring nodes (`n` oldest in one DC watch `n` oldest in other)
val nodes = snapshot.members
val nrOfMonitoredNodes = crossDcSettings.NrOfMonitoringActors
dataCentersState = CrossDcHeartbeatingState.init(crossDcFailureDetector, nrOfMonitoredNodes, nodes)
}
def addMember(m: Member): Unit =
if (m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) {
// since we only monitor nodes in Up or later states, due to the n-th oldest requirement
dataCentersState = dataCentersState.addMember(m)
if (verboseHeartbeat && m.dataCenter != selfDataCenter)
log.debug("Register member {} for cross DC heartbeat (will only heartbeat if oldest)", m)
becomeActiveIfResponsibleForHeartbeat()
}
def removeMember(m: Member): Unit =
if (m.uniqueAddress == cluster.selfUniqueAddress) {
// This cluster node will be shutdown, but stop this actor immediately to avoid further updates
context stop self
} else {
dataCentersState = dataCentersState.removeMember(m)
becomeActiveIfResponsibleForHeartbeat()
}
def heartbeat(): Unit = {
dataCentersState.activeReceivers foreach { to
if (crossDcFailureDetector.isMonitoring(to.address)) {
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address)
} else {
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - First (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address)
// schedule the expected first heartbeat for later, which will give the
// other side a chance to reply, and also trigger some resends if needed
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ClusterHeartbeatSender.ExpectedFirstHeartbeat(to))
}
heartbeatReceiver(to.address) ! selfHeartbeat
}
}
def heartbeatRsp(from: UniqueAddress): Unit = {
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat response from [{}]", selfDataCenter, selfAddress, from.address)
dataCentersState = dataCentersState.heartbeatRsp(from)
}
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
if (dataCentersState.activeReceivers.contains(from) && !crossDcFailureDetector.isMonitoring(from.address)) {
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - Trigger extra expected (cross) heartbeat from [{}]", selfAddress, from.address)
crossDcFailureDetector.heartbeat(from.address)
}
private def selfIsResponsibleForCrossDcHeartbeat(): Boolean = {
val activeDcs: Int = dataCentersState.dataCenters.size
if (activeDcs > 1) dataCentersState.shouldActivelyMonitorNodes(selfDataCenter, selfUniqueAddress)
else false
}
/** Idempotent, become active if this node is n-th oldest and should monitor other nodes */
private def becomeActiveIfResponsibleForHeartbeat(): Unit = {
if (!activelyMonitoring && selfIsResponsibleForCrossDcHeartbeat()) {
if (verboseHeartbeat) log.debug("Becoming ACTIVE (for DC: {}), monitoring other DCs oldest nodes", selfDataCenter)
activelyMonitoring = true
context.become(active orElse introspecting)
} else if (!activelyMonitoring)
if (verboseHeartbeat) log.info("Remaining DORMANT; others in {} handle heartbeating other DCs", selfDataCenter)
}
}
/** INTERNAL API */
@InternalApi
private[akka] object CrossDcHeartbeatSender {
// -- messages intended only for local messaging during testing --
sealed trait InspectionCommand extends NoSerializationVerificationNeeded
final case class ReportStatus()
sealed trait StatusReport extends NoSerializationVerificationNeeded
sealed trait MonitoringStateReport extends StatusReport
final case class MonitoringActive(state: CrossDcHeartbeatingState) extends MonitoringStateReport
final case class MonitoringDormant() extends MonitoringStateReport
// -- end of messages intended only for local messaging during testing --
}
/** INTERNAL API */
@InternalApi
private[cluster] final case class CrossDcHeartbeatingState(
failureDetector: FailureDetectorRegistry[Address],
nrOfMonitoredNodesPerDc: Int,
state: Map[ClusterSettings.DataCenter, SortedSet[Member]]) {
import CrossDcHeartbeatingState._
/**
* Decides if `self` node should become active and monitor other nodes with heartbeats.
* Only the `nrOfMonitoredNodesPerDc`-oldest nodes in each DC fulfil this role.
*/
def shouldActivelyMonitorNodes(selfDc: ClusterSettings.DataCenter, selfAddress: UniqueAddress): Boolean = {
/** Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes */
def atLeastInUpState(m: Member): Boolean =
m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining
val selfDcNeighbours: SortedSet[Member] = state.getOrElse(selfDc, emptyMembersSortedSet)
val selfDcOldOnes = selfDcNeighbours.filter(atLeastInUpState).take(nrOfMonitoredNodesPerDc)
// if this node is part of the "n oldest nodes" it should indeed monitor other nodes:
val shouldMonitorActively = selfDcOldOnes.exists(_.uniqueAddress == selfAddress)
shouldMonitorActively
}
def addMember(m: Member): CrossDcHeartbeatingState = {
val dc = m.dataCenter
// we need to remove the member first, to avoid having "duplicates"
// this is because the removal and uniqueness we need is only by uniqueAddress
// which is not used by the `ageOrdering`
val oldMembersWithoutM = state.getOrElse(dc, emptyMembersSortedSet)
.filterNot(_.uniqueAddress == m.uniqueAddress)
val updatedMembers = oldMembersWithoutM + m
val updatedState = this.copy(state = state.updated(dc, updatedMembers))
// guarding against the case of two members having the same upNumber, in which case the activeReceivers
// which are based on the ageOrdering could actually have changed by adding a node. In practice this
// should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes
// in the same DC. If it happens though, we need to remove the previously monitored node from the failure
// detector, to prevent both a resource leak and that node actually appearing as unreachable in the gossip (!)
val stoppedMonitoringReceivers = updatedState.activeReceiversIn(dc) diff this.activeReceiversIn(dc)
stoppedMonitoringReceivers.foreach(m failureDetector.remove(m.address)) // at most one element difference
updatedState
}
def removeMember(m: Member): CrossDcHeartbeatingState = {
val dc = m.dataCenter
state.get(dc) match {
case Some(dcMembers)
val updatedMembers = dcMembers.filterNot(_.uniqueAddress == m.uniqueAddress)
failureDetector.remove(m.address)
copy(state = state.updated(dc, updatedMembers))
case None
this // no change needed, was certainly not present (not even its DC was)
}
}
val activeReceivers: Set[UniqueAddress] =
dataCenters.flatMap(k state(k).take(nrOfMonitoredNodesPerDc).map(_.uniqueAddress)(breakOut))
private def activeReceiversIn(dc: DataCenter): Set[UniqueAddress] =
state.getOrElse(dc, emptyMembersSortedSet).take(nrOfMonitoredNodesPerDc).map(_.uniqueAddress)(breakOut)
def allMembers: Iterable[Member] =
state.values.flatMap(ConstantFun.scalaIdentityFunction)
def heartbeatRsp(from: UniqueAddress): CrossDcHeartbeatingState = {
if (activeReceivers.contains(from)) {
failureDetector heartbeat from.address
}
this
}
def dataCenters: Set[DataCenter] =
state.keys.toSet
}
/** INTERNAL API */
@InternalApi
private[cluster] object CrossDcHeartbeatingState {
/** Sorted by age */
private def emptyMembersSortedSet: SortedSet[Member] = SortedSet.empty[Member](Member.ageOrdering)
def init(
crossDcFailureDetector: FailureDetectorRegistry[Address],
nrOfMonitoredNodesPerDc: Int,
members: SortedSet[Member]): CrossDcHeartbeatingState = {
CrossDcHeartbeatingState(
crossDcFailureDetector,
nrOfMonitoredNodesPerDc,
state = {
// TODO unduplicate this with other places where we do this
val groupedByDc = members.groupBy(_.dataCenter)
if (members.ordering == Member.ageOrdering) {
// we already have the right ordering
groupedByDc
} else {
// we need to enforce the ageOrdering for the SortedSet in each DC
groupedByDc.map {
case (dc, ms)
dc (SortedSet.empty[Member](Member.ageOrdering) union ms)
}
}
})
}
}

View file

@ -55,7 +55,7 @@ class Member private[cluster] (
* cluster. A member that joined after removal of another member may be
* considered older than the removed member. Note that is only makes
* sense to compare with other members inside of one data center (upNumber has
* a higher risk of being reused across data centers).
* a higher risk of being reused across data centers). // TODO should we enforce this to compare only within DCs?
*/
def isOlderThan(other: Member): Boolean =
if (upNumber == other.upNumber)

View file

@ -0,0 +1,145 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.annotation.InternalApi
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.collection.immutable
import scala.collection.immutable.SortedSet
import scala.concurrent.duration._
object MultiDcSunnyWeatherMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
nodeConfig(first, second, third)(ConfigFactory.parseString(
"""
akka {
cluster.data-center = alpha
}
"""))
nodeConfig(fourth, fifth)(ConfigFactory.parseString(
"""
akka {
cluster.data-center = beta
}
"""))
commonConfig(ConfigFactory.parseString(
"""
akka {
actor.provider = cluster
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster {
debug.verbose-heartbeat-logging = off
failure-detector {
monitored-by-nr-of-members = 2
}
multi-data-center {
failure-detector {
nr-of-monitoring-members = 2
}
}
}
}
"""))
}
class MultiDcSunnyWeatherMultiJvmNode1 extends MultiDcSunnyWeatherSpec
class MultiDcSunnyWeatherMultiJvmNode2 extends MultiDcSunnyWeatherSpec
class MultiDcSunnyWeatherMultiJvmNode3 extends MultiDcSunnyWeatherSpec
class MultiDcSunnyWeatherMultiJvmNode4 extends MultiDcSunnyWeatherSpec
class MultiDcSunnyWeatherMultiJvmNode5 extends MultiDcSunnyWeatherSpec
abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeatherMultiJvmSpec)
with MultiNodeClusterSpec {
"A normal cluster" must {
"be healthy" taggedAs LongRunningTest in {
val observer = TestProbe("alpha-observer")
// allow all nodes to join:
awaitClusterUp(roles: _*)
val crossDcHeartbeatSenderPath = "/system/cluster/core/daemon/crossDcHeartbeatSender"
val selectCrossDcHeartbeatSender = system.actorSelection(crossDcHeartbeatSenderPath)
val expectedAlphaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "alpha", 2)
val expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes)
val expectedBetaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "beta", 2)
val expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes)
val expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles)
enterBarrier("found-expectations")
info(s"expectedAlphaHeartbeaterNodes = ${expectedAlphaHeartbeaterNodes.map(_.address.port.get)}")
info(s"expectedBetaHeartbeaterNodes = ${expectedBetaHeartbeaterNodes.map(_.address.port.get)}")
info(s"expectedNoActiveHeartbeatSenderRoles = ${expectedNoActiveHeartbeatSenderRoles.map(_.port.get)}")
expectedAlphaHeartbeaterRoles.size should ===(2)
expectedBetaHeartbeaterRoles.size should ===(2)
implicit val sender = observer.ref
runOn(expectedAlphaHeartbeaterRoles.toList: _*) {
selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
val status = observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds)
}
runOn(expectedBetaHeartbeaterRoles.toList: _*) {
selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
val status = observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds)
}
runOn(expectedNoActiveHeartbeatSenderRoles.toList: _*) {
selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
val status = observer.expectMsgType[CrossDcHeartbeatSender.MonitoringDormant](5.seconds)
}
enterBarrier("done")
}
}
/**
* INTERNAL API
* Returns `Up` (or in "later" status, like Leaving etc, but never `Joining` or `WeaklyUp`) members,
* sorted by Member.ageOrdering (from oldest to youngest). This restriction on status is needed to
* strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up
* (since marking that transition is a Leader action).
*/
private def membersByAge(): immutable.SortedSet[Member] =
SortedSet.empty(Member.ageOrdering)
.union(cluster.state.members.filter(m m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp))
/** INTERNAL API */
@InternalApi
private[cluster] def takeNOldestMembers(memberFilter: Member Boolean, n: Int): immutable.SortedSet[Member] =
membersByAge()
.filter(m m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)
.filter(memberFilter)
.take(n)
private def membersAsRoles(ms: immutable.Set[Member]): immutable.Set[RoleName] = {
val res = ms.flatMap(m roleName(m.address))
require(res.size == ms.size, s"Not all members were converted to roles! Got: ${ms}, found ${res}")
res
}
}