diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
index 2bb2bf6766..dc2847b258 100644
--- a/akka-cluster/src/main/resources/reference.conf
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -205,6 +205,41 @@ akka {
}
+ # Configures mult-dc specific heartbeating and other mechanisms,
+ # many of them have a direct counter-part in "one datacenter mode",
+ # in which case these settings would not be used at all - they only apply,
+ # if your cluster nodes are configured with at-least 2 different `akka.cluster.data-center` values.
+ multi-data-center {
+
+ failure-detector {
+ # FQCN of the failure detector implementation.
+ # It must implement akka.remote.FailureDetector and have
+ # a public constructor with a com.typesafe.config.Config and
+ # akka.actor.EventStream parameter.
+ implementation-class = "akka.remote.DeadlineFailureDetector"
+
+ # Number of potentially lost/delayed heartbeats that will be
+ # accepted before considering it to be an anomaly.
+ # This margin is important to be able to survive sudden, occasional,
+ # pauses in heartbeat arrivals, due to for example garbage collect or
+ # network drop.
+ acceptable-heartbeat-pause = 10 s
+
+ # How often keep-alive heartbeat messages should be sent to each connection.
+ heartbeat-interval = 3 s
+
+ # After the heartbeat request has been sent the first failure detection
+ # will start after this period, even though no heartbeat message has
+ # been received.
+ expected-response-after = 1 s
+
+ # Maximum number of oldest members in a data center that will monitor other (oldest nodes in other) data centers.
+ # This is done to lessen the cross data center communication, as only those top-n-oldest nodes
+ # need to maintain connections to the other data-centers.
+ nr-of-monitoring-members = 5
+ }
+ }
+
# If the tick-duration of the default scheduler is longer than the
# tick-duration configured here a dedicated scheduler will be used for
# periodic tasks of the cluster, otherwise the default scheduler is used.
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 3576cc6f0f..d4b7ddcb4d 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.ConfigurationException
import akka.actor._
+import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.MonitorableThreadFactory
import akka.event.{ Logging, LoggingAdapter }
import akka.japi.Util
@@ -77,6 +78,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
*/
def selfAddress: Address = selfUniqueAddress.address
+ /** Data center to which this node belongs to (defaults to "default" if not configured explicitly) */
+ def selfDataCenter: DataCenter = settings.DataCenter
+
/**
* roles that this member has
*/
@@ -96,10 +100,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
logInfo("Starting up...")
val failureDetector: FailureDetectorRegistry[Address] = {
- def createFailureDetector(): FailureDetector =
+ val createFailureDetector = () ⇒
FailureDetectorLoader.load(settings.FailureDetectorImplementationClass, settings.FailureDetectorConfig, system)
- new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
+ new DefaultFailureDetectorRegistry(createFailureDetector)
+ }
+
+ val crossDcFailureDetector: FailureDetectorRegistry[Address] = {
+ val createFailureDetector = () ⇒
+ FailureDetectorLoader.load(settings.CrossDcFailureDetectorSettings.ImplementationClass, settings.CrossDcFailureDetectorSettings.config, system)
+
+ new DefaultFailureDetectorRegistry(createFailureDetector)
}
// needs to be lazy to allow downing provider impls to access Cluster (if not we get deadlock)
@@ -411,7 +422,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
private def closeScheduler(): Unit = scheduler match {
case x: Closeable ⇒ x.close()
- case _ ⇒
+ case _ ⇒ // ignore, this is fine
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
index 5f8d539235..80391ec612 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -296,6 +296,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
import cluster.settings._
import cluster.InfoLogger._
+ val selfDc = cluster.selfDataCenter
+
protected def selfUniqueAddress = cluster.selfUniqueAddress
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
@@ -334,8 +336,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
var exitingConfirmed = Set.empty[UniqueAddress]
- def selfDc = cluster.settings.DataCenter
-
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@@ -431,8 +431,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def becomeInitialized(): Unit = {
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
- context.actorOf(Props[ClusterHeartbeatSender].
- withDispatcher(UseDispatcher), name = "heartbeatSender")
+ val internalHeartbeatSenderProps = Props(new ClusterHeartbeatSender()).withDispatcher(UseDispatcher)
+ context.actorOf(internalHeartbeatSenderProps, name = "heartbeatSender")
+
+ val externalHeartbeatProps = Props(new CrossDcHeartbeatSender()).withDispatcher(UseDispatcher)
+ context.actorOf(externalHeartbeatProps, name = "crossDcHeartbeatSender")
+
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(initialized)
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
index bc3ac1bffc..a6a454a7f5 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
@@ -5,17 +5,18 @@ package akka.cluster
import scala.annotation.tailrec
import scala.collection.immutable
-import akka.actor.{ ActorLogging, ActorSelection, Address, Actor, RootActorPath }
+import akka.actor.{ Actor, ActorLogging, ActorPath, ActorRef, ActorSelection, Address, DeadLetterSuppression, RootActorPath }
import akka.cluster.ClusterEvent._
import akka.remote.FailureDetectorRegistry
import akka.remote.HeartbeatMessage
-import akka.actor.DeadLetterSuppression
+import akka.annotation.InternalApi
/**
* INTERNAL API.
*
* Receives Heartbeat messages and replies.
*/
+@InternalApi
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
import ClusterHeartbeatSender._
@@ -29,6 +30,15 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
}
+/** INTERNAL API: Utilities to obtain ClusterHeartbeatReceiver paths */
+@InternalApi
+private[cluster] object ClusterHeartbeatReceiver {
+
+ def name: String = "heartbeatReceiver"
+ def path(address: Address): ActorPath =
+ RootActorPath(address) / "system" / "cluster" / name
+}
+
/**
* INTERNAL API
*/
@@ -65,12 +75,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
import cluster.settings._
import context.dispatcher
- // the failureDetector is only updated by this actor, but read from other places
- val failureDetector = Cluster(context.system).failureDetector
+ val filterInternalClusterMembers: Member ⇒ Boolean =
+ _.dataCenter == cluster.selfDataCenter
val selfHeartbeat = Heartbeat(selfAddress)
- var state = ClusterHeartbeatSenderState(
+ val failureDetector = cluster.failureDetector
+
+ var state: ClusterHeartbeatSenderState = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, MonitoredByNrOfMembers),
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
failureDetector)
@@ -94,7 +106,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def heartbeatReceiver(address: Address): ActorSelection =
- context.actorSelection(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
+ context.actorSelection(ClusterHeartbeatReceiver.path(address))
def receive = initializing
@@ -116,22 +128,28 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
def init(snapshot: CurrentClusterState): Unit = {
- val nodes: Set[UniqueAddress] = snapshot.members.map(_.uniqueAddress)
- val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
+ val nodes = snapshot.members.collect { case m if filterInternalClusterMembers(m) ⇒ m.uniqueAddress }
+ val unreachable = snapshot.unreachable.collect { case m if filterInternalClusterMembers(m) ⇒ m.uniqueAddress }
state = state.init(nodes, unreachable)
}
def addMember(m: Member): Unit =
- if (m.uniqueAddress != selfUniqueAddress && !state.contains(m.uniqueAddress))
+ if (m.uniqueAddress != selfUniqueAddress && // is not self
+ !state.contains(m.uniqueAddress) && // not already added
+ filterInternalClusterMembers(m) // should be watching members from this DC (internal / external)
+ ) {
state = state.addMember(m.uniqueAddress)
+ }
def removeMember(m: Member): Unit =
- if (m.uniqueAddress == cluster.selfUniqueAddress) {
- // This cluster node will be shutdown, but stop this actor immediately
- // to avoid further updates
- context stop self
- } else {
- state = state.removeMember(m.uniqueAddress)
+ if (filterInternalClusterMembers(m)) { // we only ever deal with internal cluster members here
+ if (m.uniqueAddress == cluster.selfUniqueAddress) {
+ // This cluster node will be shutdown, but stop this actor immediately
+ // to avoid further updates
+ context stop self
+ } else {
+ state = state.removeMember(m.uniqueAddress)
+ }
}
def unreachableMember(m: Member): Unit =
@@ -142,7 +160,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def heartbeat(): Unit = {
state.activeReceivers foreach { to ⇒
- if (cluster.failureDetector.isMonitoring(to.address)) {
+ if (failureDetector.isMonitoring(to.address)) {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address)
} else {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address)
@@ -152,7 +170,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
heartbeatReceiver(to.address) ! selfHeartbeat
}
-
}
def heartbeatRsp(from: UniqueAddress): Unit = {
@@ -173,6 +190,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
* State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing.
* It is immutable, but it updates the failureDetector.
*/
+@InternalApi
private[cluster] final case class ClusterHeartbeatSenderState(
ring: HeartbeatNodeRing,
oldReceiversNowUnreachable: Set[UniqueAddress],
@@ -262,7 +280,7 @@ private[cluster] final case class HeartbeatNodeRing(
/**
* Receivers for `selfAddress`. Cached for subsequent access.
*/
- lazy val myReceivers: immutable.Set[UniqueAddress] = receivers(selfAddress)
+ lazy val myReceivers: Set[UniqueAddress] = receivers(selfAddress)
private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1)
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
index b7106526ca..df37649d2b 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -10,7 +10,7 @@ import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.AddressFromURIString
-import akka.annotation.InternalApi
+import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.Dispatchers
import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
@@ -117,6 +117,21 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
val DataCenter: DataCenter = cc.getString("data-center")
+
+ final class CrossDcFailureDetectorSettings(val config: Config) {
+ val ImplementationClass: String = config.getString("implementation-class")
+ val HeartbeatInterval: FiniteDuration = {
+ config.getMillisDuration("heartbeat-interval")
+ } requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
+ val HeartbeatExpectedResponseAfter: FiniteDuration = {
+ config.getMillisDuration("expected-response-after")
+ } requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0")
+ val NrOfMonitoringActors: Int = {
+ config.getInt("nr-of-monitoring-members")
+ } requiring (_ > 0, "failure-detector.nr-of-monitoring-members must be > 0")
+ }
+ val CrossDcFailureDetectorSettings = new CrossDcFailureDetectorSettings(cc.getConfig("multi-data-center.failure-detector"))
+
val Roles: Set[String] = {
val configuredRoles = (immutableSeq(cc.getStringList("roles")).toSet) requiring (
_.forall(!_.startsWith(DcRolePrefix)),
@@ -124,6 +139,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
configuredRoles + s"$DcRolePrefix$DataCenter"
}
+
val MinNrOfMembers: Int = {
cc.getInt("min-nr-of-members")
} requiring (_ > 0, "min-nr-of-members must be > 0")
diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala
new file mode 100644
index 0000000000..b022051bb6
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala
@@ -0,0 +1,318 @@
+/*
+ * Copyright (C) 2017 Lightbend Inc.
+ */
+
+package akka.cluster
+
+import akka.actor.{ Actor, ActorLogging, ActorSelection, Address, NoSerializationVerificationNeeded, RootActorPath }
+import akka.annotation.InternalApi
+import akka.cluster.ClusterEvent._
+import akka.cluster.ClusterSettings.DataCenter
+import akka.event.Logging
+import akka.remote.FailureDetectorRegistry
+import akka.util.ConstantFun
+
+import scala.collection.{ SortedSet, immutable, breakOut }
+
+/**
+ * INTERNAL API
+ *
+ * This actor is will be started on all nodes participating in a cluster,
+ * however unlike the within-dc heartbeat sender ([[ClusterHeartbeatSender]]),
+ * it will only actively work on `n` "oldest" nodes of a given data center.
+ *
+ * It will monitor it's oldest counterparts in other data centers.
+ * For example, a DC configured to have (up to) 4 monitoring actors,
+ * will have 4 such active at any point in time, and those will monitor
+ * the (at most) 4 oldest nodes of each data center.
+ *
+ * This monitoring mode is both simple and predictable, and also uses the assumption that
+ * "nodes which stay around for a long time, become old", and those rarely change. In a way,
+ * they are the "core" of a cluster, while other nodes may be very dynamically changing worked
+ * nodes which aggresively come and go as the traffic in the service changes.
+ */
+@InternalApi
+private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogging {
+ import CrossDcHeartbeatSender._
+
+ val cluster = Cluster(context.system)
+ val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
+ import cluster.settings._
+ import cluster.{ scheduler, selfAddress, selfDataCenter, selfUniqueAddress }
+ import context.dispatcher
+
+ // For inspecting if in active state; allows avoiding "becoming active" when already active
+ var activelyMonitoring = false
+
+ val isExternalClusterMember: Member ⇒ Boolean =
+ member ⇒ member.dataCenter != cluster.selfDataCenter
+
+ val crossDcSettings: cluster.settings.CrossDcFailureDetectorSettings = cluster.settings.CrossDcFailureDetectorSettings
+ val crossDcFailureDetector = cluster.crossDcFailureDetector
+
+ val selfHeartbeat = ClusterHeartbeatSender.Heartbeat(selfAddress)
+
+ var dataCentersState: CrossDcHeartbeatingState = CrossDcHeartbeatingState.init(
+ crossDcFailureDetector,
+ crossDcSettings.NrOfMonitoringActors,
+ SortedSet.empty
+ )
+
+ // start periodic heartbeat to other nodes in cluster
+ val heartbeatTask = scheduler.schedule(
+ PeriodicTasksInitialDelay max HeartbeatInterval,
+ HeartbeatInterval, self, ClusterHeartbeatSender.HeartbeatTick)
+
+ override def preStart(): Unit = {
+ cluster.subscribe(self, classOf[MemberEvent])
+ if (verboseHeartbeat) log.debug("Initialized cross-dc heartbeat sender as DORMANT in DC: [{}]", selfDataCenter)
+ }
+
+ override def postStop(): Unit = {
+ dataCentersState.activeReceivers.foreach(a ⇒ crossDcFailureDetector.remove(a.address))
+ heartbeatTask.cancel()
+ cluster.unsubscribe(self)
+ }
+
+ /**
+ * Looks up and returns the remote cluster heartbeat connection for the specific address.
+ */
+ def heartbeatReceiver(address: Address): ActorSelection =
+ context.actorSelection(ClusterHeartbeatReceiver.path(address))
+
+ def receive: Actor.Receive =
+ dormant orElse introspecting
+
+ /**
+ * In this state no cross-datacenter heartbeats are sent by this actor.
+ * This may be because one of those reasons:
+ * - no nodes in other DCs were detected yet
+ * - nodes in other DCs are present, but this node is not tht n-th oldest in this DC (see
+ * `number-of-cross-datacenter-monitoring-actors`), so it does not have to monitor that other data centers
+ *
+ * In this state it will however listen to cluster events to eventually take over monitoring other DCs
+ * in case it becomes "old enough".
+ */
+ def dormant: Actor.Receive = {
+ case s: CurrentClusterState ⇒ init(s)
+ case MemberRemoved(m, _) ⇒ removeMember(m)
+ case evt: MemberEvent ⇒ addMember(evt.member)
+ case ClusterHeartbeatSender.HeartbeatTick ⇒ // ignore...
+ }
+
+ def active: Actor.Receive = {
+ case ClusterHeartbeatSender.HeartbeatTick ⇒ heartbeat()
+ case ClusterHeartbeatSender.HeartbeatRsp(from) ⇒ heartbeatRsp(from)
+ case MemberRemoved(m, _) ⇒ removeMember(m)
+ case evt: MemberEvent ⇒ addMember(evt.member)
+ case ClusterHeartbeatSender.ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
+ }
+
+ def introspecting: Actor.Receive = {
+ case ReportStatus() ⇒
+ sender() ! {
+ if (activelyMonitoring) CrossDcHeartbeatSender.MonitoringActive(dataCentersState)
+ else CrossDcHeartbeatSender.MonitoringDormant()
+ }
+ }
+
+ def init(snapshot: CurrentClusterState): Unit = {
+ // val unreachable = snapshot.unreachable.collect({ case m if isExternalClusterMember(m) => m.uniqueAddress })
+ // nr of monitored nodes is the same as the number of monitoring nodes (`n` oldest in one DC watch `n` oldest in other)
+ val nodes = snapshot.members
+ val nrOfMonitoredNodes = crossDcSettings.NrOfMonitoringActors
+ dataCentersState = CrossDcHeartbeatingState.init(crossDcFailureDetector, nrOfMonitoredNodes, nodes)
+ }
+
+ def addMember(m: Member): Unit =
+ if (m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) {
+ // since we only monitor nodes in Up or later states, due to the n-th oldest requirement
+ dataCentersState = dataCentersState.addMember(m)
+ if (verboseHeartbeat && m.dataCenter != selfDataCenter)
+ log.debug("Register member {} for cross DC heartbeat (will only heartbeat if oldest)", m)
+
+ becomeActiveIfResponsibleForHeartbeat()
+ }
+
+ def removeMember(m: Member): Unit =
+ if (m.uniqueAddress == cluster.selfUniqueAddress) {
+ // This cluster node will be shutdown, but stop this actor immediately to avoid further updates
+ context stop self
+ } else {
+ dataCentersState = dataCentersState.removeMember(m)
+ becomeActiveIfResponsibleForHeartbeat()
+ }
+
+ def heartbeat(): Unit = {
+ dataCentersState.activeReceivers foreach { to ⇒
+ if (crossDcFailureDetector.isMonitoring(to.address)) {
+ if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address)
+ } else {
+ if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - First (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address)
+ // schedule the expected first heartbeat for later, which will give the
+ // other side a chance to reply, and also trigger some resends if needed
+ scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ClusterHeartbeatSender.ExpectedFirstHeartbeat(to))
+ }
+ heartbeatReceiver(to.address) ! selfHeartbeat
+ }
+ }
+
+ def heartbeatRsp(from: UniqueAddress): Unit = {
+ if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat response from [{}]", selfDataCenter, selfAddress, from.address)
+ dataCentersState = dataCentersState.heartbeatRsp(from)
+ }
+
+ def triggerFirstHeartbeat(from: UniqueAddress): Unit =
+ if (dataCentersState.activeReceivers.contains(from) && !crossDcFailureDetector.isMonitoring(from.address)) {
+ if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - Trigger extra expected (cross) heartbeat from [{}]", selfAddress, from.address)
+ crossDcFailureDetector.heartbeat(from.address)
+ }
+
+ private def selfIsResponsibleForCrossDcHeartbeat(): Boolean = {
+ val activeDcs: Int = dataCentersState.dataCenters.size
+ if (activeDcs > 1) dataCentersState.shouldActivelyMonitorNodes(selfDataCenter, selfUniqueAddress)
+ else false
+ }
+
+ /** Idempotent, become active if this node is n-th oldest and should monitor other nodes */
+ private def becomeActiveIfResponsibleForHeartbeat(): Unit = {
+ if (!activelyMonitoring && selfIsResponsibleForCrossDcHeartbeat()) {
+ if (verboseHeartbeat) log.debug("Becoming ACTIVE (for DC: {}), monitoring other DCs oldest nodes", selfDataCenter)
+ activelyMonitoring = true
+
+ context.become(active orElse introspecting)
+ } else if (!activelyMonitoring)
+ if (verboseHeartbeat) log.info("Remaining DORMANT; others in {} handle heartbeating other DCs", selfDataCenter)
+ }
+
+}
+
+/** INTERNAL API */
+@InternalApi
+private[akka] object CrossDcHeartbeatSender {
+
+ // -- messages intended only for local messaging during testing --
+ sealed trait InspectionCommand extends NoSerializationVerificationNeeded
+ final case class ReportStatus()
+
+ sealed trait StatusReport extends NoSerializationVerificationNeeded
+ sealed trait MonitoringStateReport extends StatusReport
+ final case class MonitoringActive(state: CrossDcHeartbeatingState) extends MonitoringStateReport
+ final case class MonitoringDormant() extends MonitoringStateReport
+ // -- end of messages intended only for local messaging during testing --
+}
+
+/** INTERNAL API */
+@InternalApi
+private[cluster] final case class CrossDcHeartbeatingState(
+ failureDetector: FailureDetectorRegistry[Address],
+ nrOfMonitoredNodesPerDc: Int,
+ state: Map[ClusterSettings.DataCenter, SortedSet[Member]]) {
+ import CrossDcHeartbeatingState._
+
+ /**
+ * Decides if `self` node should become active and monitor other nodes with heartbeats.
+ * Only the `nrOfMonitoredNodesPerDc`-oldest nodes in each DC fulfil this role.
+ */
+ def shouldActivelyMonitorNodes(selfDc: ClusterSettings.DataCenter, selfAddress: UniqueAddress): Boolean = {
+ /** Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes */
+ def atLeastInUpState(m: Member): Boolean =
+ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining
+
+ val selfDcNeighbours: SortedSet[Member] = state.getOrElse(selfDc, emptyMembersSortedSet)
+ val selfDcOldOnes = selfDcNeighbours.filter(atLeastInUpState).take(nrOfMonitoredNodesPerDc)
+
+ // if this node is part of the "n oldest nodes" it should indeed monitor other nodes:
+ val shouldMonitorActively = selfDcOldOnes.exists(_.uniqueAddress == selfAddress)
+ shouldMonitorActively
+ }
+
+ def addMember(m: Member): CrossDcHeartbeatingState = {
+ val dc = m.dataCenter
+
+ // we need to remove the member first, to avoid having "duplicates"
+ // this is because the removal and uniqueness we need is only by uniqueAddress
+ // which is not used by the `ageOrdering`
+ val oldMembersWithoutM = state.getOrElse(dc, emptyMembersSortedSet)
+ .filterNot(_.uniqueAddress == m.uniqueAddress)
+
+ val updatedMembers = oldMembersWithoutM + m
+ val updatedState = this.copy(state = state.updated(dc, updatedMembers))
+
+ // guarding against the case of two members having the same upNumber, in which case the activeReceivers
+ // which are based on the ageOrdering could actually have changed by adding a node. In practice this
+ // should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes
+ // in the same DC. If it happens though, we need to remove the previously monitored node from the failure
+ // detector, to prevent both a resource leak and that node actually appearing as unreachable in the gossip (!)
+ val stoppedMonitoringReceivers = updatedState.activeReceiversIn(dc) diff this.activeReceiversIn(dc)
+ stoppedMonitoringReceivers.foreach(m ⇒ failureDetector.remove(m.address)) // at most one element difference
+
+ updatedState
+ }
+
+ def removeMember(m: Member): CrossDcHeartbeatingState = {
+ val dc = m.dataCenter
+ state.get(dc) match {
+ case Some(dcMembers) ⇒
+ val updatedMembers = dcMembers.filterNot(_.uniqueAddress == m.uniqueAddress)
+
+ failureDetector.remove(m.address)
+ copy(state = state.updated(dc, updatedMembers))
+ case None ⇒
+ this // no change needed, was certainly not present (not even its DC was)
+ }
+ }
+
+ val activeReceivers: Set[UniqueAddress] =
+ dataCenters.flatMap(k ⇒ state(k).take(nrOfMonitoredNodesPerDc).map(_.uniqueAddress)(breakOut))
+
+ private def activeReceiversIn(dc: DataCenter): Set[UniqueAddress] =
+ state.getOrElse(dc, emptyMembersSortedSet).take(nrOfMonitoredNodesPerDc).map(_.uniqueAddress)(breakOut)
+
+ def allMembers: Iterable[Member] =
+ state.values.flatMap(ConstantFun.scalaIdentityFunction)
+
+ def heartbeatRsp(from: UniqueAddress): CrossDcHeartbeatingState = {
+ if (activeReceivers.contains(from)) {
+ failureDetector heartbeat from.address
+ }
+ this
+ }
+
+ def dataCenters: Set[DataCenter] =
+ state.keys.toSet
+
+}
+
+/** INTERNAL API */
+@InternalApi
+private[cluster] object CrossDcHeartbeatingState {
+
+ /** Sorted by age */
+ private def emptyMembersSortedSet: SortedSet[Member] = SortedSet.empty[Member](Member.ageOrdering)
+
+ def init(
+ crossDcFailureDetector: FailureDetectorRegistry[Address],
+ nrOfMonitoredNodesPerDc: Int,
+ members: SortedSet[Member]): CrossDcHeartbeatingState = {
+ CrossDcHeartbeatingState(
+ crossDcFailureDetector,
+ nrOfMonitoredNodesPerDc,
+ state = {
+ // TODO unduplicate this with other places where we do this
+ val groupedByDc = members.groupBy(_.dataCenter)
+
+ if (members.ordering == Member.ageOrdering) {
+ // we already have the right ordering
+ groupedByDc
+ } else {
+ // we need to enforce the ageOrdering for the SortedSet in each DC
+ groupedByDc.map {
+ case (dc, ms) ⇒
+ dc → (SortedSet.empty[Member](Member.ageOrdering) union ms)
+ }
+ }
+ })
+ }
+
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala
index c258e0a871..e784067c74 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Member.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala
@@ -55,7 +55,7 @@ class Member private[cluster] (
* cluster. A member that joined after removal of another member may be
* considered older than the removed member. Note that is only makes
* sense to compare with other members inside of one data center (upNumber has
- * a higher risk of being reused across data centers).
+ * a higher risk of being reused across data centers). // TODO should we enforce this to compare only within DCs?
*/
def isOlderThan(other: Member): Boolean =
if (upNumber == other.upNumber)
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala
new file mode 100644
index 0000000000..ff4eaf9156
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2009-2017 Lightbend Inc.
+ */
+package akka.cluster
+
+import akka.annotation.InternalApi
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+
+import scala.collection.immutable
+import scala.collection.immutable.SortedSet
+import scala.concurrent.duration._
+
+object MultiDcSunnyWeatherMultiJvmSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val fourth = role("fourth")
+ val fifth = role("fifth")
+
+ nodeConfig(first, second, third)(ConfigFactory.parseString(
+ """
+ akka {
+ cluster.data-center = alpha
+ }
+ """))
+
+ nodeConfig(fourth, fifth)(ConfigFactory.parseString(
+ """
+ akka {
+ cluster.data-center = beta
+ }
+ """))
+
+ commonConfig(ConfigFactory.parseString(
+ """
+ akka {
+ actor.provider = cluster
+
+ loggers = ["akka.testkit.TestEventListener"]
+ loglevel = INFO
+
+ remote.log-remote-lifecycle-events = off
+
+ cluster {
+
+ debug.verbose-heartbeat-logging = off
+
+ failure-detector {
+ monitored-by-nr-of-members = 2
+ }
+
+ multi-data-center {
+ failure-detector {
+ nr-of-monitoring-members = 2
+ }
+ }
+ }
+ }
+ """))
+
+}
+
+class MultiDcSunnyWeatherMultiJvmNode1 extends MultiDcSunnyWeatherSpec
+class MultiDcSunnyWeatherMultiJvmNode2 extends MultiDcSunnyWeatherSpec
+class MultiDcSunnyWeatherMultiJvmNode3 extends MultiDcSunnyWeatherSpec
+class MultiDcSunnyWeatherMultiJvmNode4 extends MultiDcSunnyWeatherSpec
+class MultiDcSunnyWeatherMultiJvmNode5 extends MultiDcSunnyWeatherSpec
+
+abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeatherMultiJvmSpec)
+ with MultiNodeClusterSpec {
+
+ "A normal cluster" must {
+ "be healthy" taggedAs LongRunningTest in {
+
+ val observer = TestProbe("alpha-observer")
+
+ // allow all nodes to join:
+ awaitClusterUp(roles: _*)
+
+ val crossDcHeartbeatSenderPath = "/system/cluster/core/daemon/crossDcHeartbeatSender"
+ val selectCrossDcHeartbeatSender = system.actorSelection(crossDcHeartbeatSenderPath)
+
+ val expectedAlphaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "alpha", 2)
+ val expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes)
+
+ val expectedBetaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "beta", 2)
+ val expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes)
+
+ val expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles)
+
+ enterBarrier("found-expectations")
+
+ info(s"expectedAlphaHeartbeaterNodes = ${expectedAlphaHeartbeaterNodes.map(_.address.port.get)}")
+ info(s"expectedBetaHeartbeaterNodes = ${expectedBetaHeartbeaterNodes.map(_.address.port.get)}")
+ info(s"expectedNoActiveHeartbeatSenderRoles = ${expectedNoActiveHeartbeatSenderRoles.map(_.port.get)}")
+
+ expectedAlphaHeartbeaterRoles.size should ===(2)
+ expectedBetaHeartbeaterRoles.size should ===(2)
+
+ implicit val sender = observer.ref
+ runOn(expectedAlphaHeartbeaterRoles.toList: _*) {
+ selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
+ val status = observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds)
+ }
+ runOn(expectedBetaHeartbeaterRoles.toList: _*) {
+ selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
+ val status = observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds)
+ }
+ runOn(expectedNoActiveHeartbeatSenderRoles.toList: _*) {
+ selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
+ val status = observer.expectMsgType[CrossDcHeartbeatSender.MonitoringDormant](5.seconds)
+ }
+
+ enterBarrier("done")
+ }
+ }
+
+ /**
+ * INTERNAL API
+ * Returns `Up` (or in "later" status, like Leaving etc, but never `Joining` or `WeaklyUp`) members,
+ * sorted by Member.ageOrdering (from oldest to youngest). This restriction on status is needed to
+ * strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up
+ * (since marking that transition is a Leader action).
+ */
+ private def membersByAge(): immutable.SortedSet[Member] =
+ SortedSet.empty(Member.ageOrdering)
+ .union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp))
+
+ /** INTERNAL API */
+ @InternalApi
+ private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] =
+ membersByAge()
+ .filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)
+ .filter(memberFilter)
+ .take(n)
+
+ private def membersAsRoles(ms: immutable.Set[Member]): immutable.Set[RoleName] = {
+ val res = ms.flatMap(m ⇒ roleName(m.address))
+ require(res.size == ms.size, s"Not all members were converted to roles! Got: ${ms}, found ${res}")
+ res
+ }
+}