pekko/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala

369 lines
13 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.{ Actor, ActorLogging, ActorPath, ActorSelection, Address, DeadLetterSuppression, RootActorPath }
import akka.cluster.ClusterEvent._
import akka.remote.FailureDetectorRegistry
import akka.remote.HeartbeatMessage
import akka.annotation.InternalApi
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import akka.util.ccompat._
/**
* INTERNAL API.
*
* Receives Heartbeat messages and replies.
*/
@InternalApi
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
import ClusterHeartbeatSender._
// Important - don't use Cluster(context.system) in constructor because that would
// cause deadlock. See startup sequence in ClusterDaemon.
lazy val cluster = Cluster(context.system)
lazy val selfHeartbeatRsp = HeartbeatRsp(cluster.selfUniqueAddress)
lazy val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
def receive = {
case Heartbeat(from)
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat from [{}]", cluster.selfAddress, from)
sender() ! selfHeartbeatRsp
}
}
/** INTERNAL API: Utilities to obtain ClusterHeartbeatReceiver paths */
@InternalApi
private[cluster] object ClusterHeartbeatReceiver {
def name: String = "heartbeatReceiver"
def path(address: Address): ActorPath =
RootActorPath(address) / "system" / "cluster" / name
}
/**
* INTERNAL API
*/
private[cluster] object ClusterHeartbeatSender {
/**
* Sent at regular intervals for failure detection.
*/
final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
/**
* Sent as reply to [[Heartbeat]] messages.
*/
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
// sent to self only
case object HeartbeatTick
2014-03-07 13:20:01 +01:00
final case class ExpectedFirstHeartbeat(from: UniqueAddress)
}
/*
* INTERNAL API
*
* This actor is responsible for sending the heartbeat messages to
* a few other nodes, which will reply and then this actor updates the
* failure detector.
*/
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._
val cluster = Cluster(context.system)
val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
import cluster.{ selfAddress, selfUniqueAddress, scheduler }
import cluster.settings._
import context.dispatcher
val filterInternalClusterMembers: Member Boolean =
_.dataCenter == cluster.selfDataCenter
val selfHeartbeat = Heartbeat(selfAddress)
val failureDetector = cluster.failureDetector
var state: ClusterHeartbeatSenderState = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, MonitoredByNrOfMembers),
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
failureDetector)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(
PeriodicTasksInitialDelay max HeartbeatInterval,
HeartbeatInterval, self, HeartbeatTick)
// used for logging warning if actual tick interval is unexpected (e.g. due to starvation)
private var tickTimestamp = System.nanoTime() + (PeriodicTasksInitialDelay max HeartbeatInterval).toNanos
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
}
override def postStop(): Unit = {
state.activeReceivers.foreach(a failureDetector.remove(a.address))
heartbeatTask.cancel()
cluster.unsubscribe(self)
}
/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def heartbeatReceiver(address: Address): ActorSelection =
context.actorSelection(ClusterHeartbeatReceiver.path(address))
def receive = initializing
def initializing: Actor.Receive = {
case s: CurrentClusterState
init(s)
context.become(active)
case HeartbeatTick
tickTimestamp = System.nanoTime() // start checks when active
}
def active: Actor.Receive = {
case HeartbeatTick heartbeat()
case HeartbeatRsp(from) heartbeatRsp(from)
case MemberRemoved(m, _) removeMember(m)
case evt: MemberEvent addMember(evt.member)
case UnreachableMember(m) unreachableMember(m)
case ReachableMember(m) reachableMember(m)
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
}
def init(snapshot: CurrentClusterState): Unit = {
val nodes = snapshot.members.collect { case m if filterInternalClusterMembers(m) m.uniqueAddress }
val unreachable = snapshot.unreachable.collect { case m if filterInternalClusterMembers(m) m.uniqueAddress }
state = state.init(nodes, unreachable)
}
def addMember(m: Member): Unit =
2017-10-06 10:30:28 +02:00
if (m.uniqueAddress != selfUniqueAddress && // is not self
!state.contains(m.uniqueAddress) && // not already added
filterInternalClusterMembers(m) // should be watching members from this DC (internal / external)
) {
state = state.addMember(m.uniqueAddress)
}
def removeMember(m: Member): Unit =
if (filterInternalClusterMembers(m)) { // we only ever deal with internal cluster members here
if (m.uniqueAddress == cluster.selfUniqueAddress) {
// This cluster node will be shutdown, but stop this actor immediately
// to avoid further updates
context stop self
} else {
state = state.removeMember(m.uniqueAddress)
}
}
def unreachableMember(m: Member): Unit =
state = state.unreachableMember(m.uniqueAddress)
def reachableMember(m: Member): Unit =
state = state.reachableMember(m.uniqueAddress)
def heartbeat(): Unit = {
state.activeReceivers foreach { to
if (failureDetector.isMonitoring(to.address)) {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address)
} else {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address)
// schedule the expected first heartbeat for later, which will give the
// other side a chance to reply, and also trigger some resends if needed
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to))
}
heartbeatReceiver(to.address) ! selfHeartbeat
}
checkTickInterval()
}
private def checkTickInterval(): Unit = {
val now = System.nanoTime()
if ((now - tickTimestamp) >= (HeartbeatInterval.toNanos * 2))
log.warning(
"Cluster Node [{}] - Scheduled sending of heartbeat was delayed. " +
"Previous heartbeat was sent [{}] ms ago, expected interval is [{}] ms. This may cause failure detection " +
"to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the " +
"default dispatcher, CPU overload, or GC.",
selfAddress, TimeUnit.NANOSECONDS.toMillis(now - tickTimestamp), HeartbeatInterval.toMillis)
tickTimestamp = now
}
def heartbeatRsp(from: UniqueAddress): Unit = {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address)
state = state.heartbeatRsp(from)
}
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) {
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address)
failureDetector.heartbeat(from.address)
}
}
/**
* INTERNAL API
* State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing.
* It is immutable, but it updates the failureDetector.
*/
@InternalApi
2014-03-07 13:20:01 +01:00
private[cluster] final case class ClusterHeartbeatSenderState(
ring: HeartbeatNodeRing,
oldReceiversNowUnreachable: Set[UniqueAddress],
failureDetector: FailureDetectorRegistry[Address]) {
val activeReceivers: Set[UniqueAddress] = ring.myReceivers union oldReceiversNowUnreachable
def selfAddress = ring.selfAddress
def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState =
copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable))
def contains(node: UniqueAddress): Boolean = ring.nodes(node)
def addMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring :+ node)
def removeMember(node: UniqueAddress): ClusterHeartbeatSenderState = {
val newState = membershipChange(ring :- node)
failureDetector remove node.address
if (newState.oldReceiversNowUnreachable(node))
newState.copy(oldReceiversNowUnreachable = newState.oldReceiversNowUnreachable - node)
else
newState
}
def unreachableMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring.copy(unreachable = ring.unreachable + node))
def reachableMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring.copy(unreachable = ring.unreachable - node))
private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
val oldReceivers = ring.myReceivers
val removedReceivers = oldReceivers diff newRing.myReceivers
var adjustedOldReceiversNowUnreachable = oldReceiversNowUnreachable
removedReceivers foreach { a
if (failureDetector.isAvailable(a.address))
failureDetector remove a.address
else
adjustedOldReceiversNowUnreachable += a
}
copy(newRing, adjustedOldReceiversNowUnreachable)
}
def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState =
if (activeReceivers(from)) {
failureDetector heartbeat from.address
if (oldReceiversNowUnreachable(from)) {
// back from unreachable, ok to stop heartbeating to it
if (!ring.myReceivers(from))
failureDetector remove from.address
copy(oldReceiversNowUnreachable = oldReceiversNowUnreachable - from)
} else this
} else this
}
/**
* INTERNAL API
*
* Data structure for picking heartbeat receivers. The node ring is
* shuffled by deterministic hashing to avoid picking physically co-located
* neighbors.
*
* It is immutable, i.e. the methods return new instances.
*/
private[cluster] final case class HeartbeatNodeRing(
selfAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
monitoredByNrOfMembers: Int) {
require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]")
private val nodeRing: immutable.SortedSet[UniqueAddress] = {
implicit val ringOrdering: Ordering[UniqueAddress] = Ordering.fromLessThan[UniqueAddress] { (a, b)
val ha = a.##
val hb = b.##
ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0)
}
immutable.SortedSet() union nodes
}
/**
* Receivers for `selfAddress`. Cached for subsequent access.
*/
lazy val myReceivers: Set[UniqueAddress] = receivers(selfAddress)
private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1)
/**
* The receivers to use from a specified sender.
*/
def receivers(sender: UniqueAddress): Set[UniqueAddress] =
if (useAllAsReceivers)
nodeRing - sender
else {
// Pick nodes from the iterator until n nodes that are not unreachable have been selected.
// Intermediate unreachable nodes up to `monitoredByNrOfMembers` are also included in the result.
// The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must
// be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown
// at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes.
// This was reported in issue #16624.
@tailrec def take(n: Int, iter: Iterator[UniqueAddress], acc: Set[UniqueAddress]): (Int, Set[UniqueAddress]) =
if (iter.isEmpty || n == 0) (n, acc)
else {
val next = iter.next()
val isUnreachable = unreachable(next)
if (isUnreachable && acc.size >= monitoredByNrOfMembers)
take(n, iter, acc) // skip the unreachable, since we have already picked `monitoredByNrOfMembers`
else if (isUnreachable)
take(n, iter, acc + next) // include the unreachable, but don't count it
else
take(n - 1, iter, acc + next) // include the reachable
}
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
val (remaining, slice1) = take(monitoredByNrOfMembers, nodeRing.rangeFrom(sender).tail.iterator, Set.empty)
val slice =
if (remaining == 0)
slice1
else {
// wrap around
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
val (_, slice2) = take(remaining, nodeRing.rangeTo(sender).iterator.filterNot(_ == sender), slice1)
slice2
}
slice
}
/**
* Add a node to the ring.
*/
def :+(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node)
/**
* Remove a node from the ring.
*/
def :-(node: UniqueAddress): HeartbeatNodeRing =
if (nodes.contains(node) || unreachable.contains(node))
copy(nodes = nodes - node, unreachable = unreachable - node)
else this
}