Use consistent hash to heartbeat to a few nodes instead of all, see #2284

* Previously heartbeat messages was sent to all other members, i.e.
  each member was monitored by all other members in the cluster.
* This was the number one know scalability bottleneck, due to the
  number of interconnections.
* Limit sending of heartbeats to a few (5) members. Select and
  re-balance with consistent hashing algorithm when new members
  are added or removed.
* Send a few EndHeartbeat when ending send of Heartbeat messages.
This commit is contained in:
Patrik Nordwall 2012-10-01 14:12:20 +02:00
parent 7557433491
commit 3f73705abc
8 changed files with 172 additions and 60 deletions

1
.gitignore vendored
View file

@ -67,3 +67,4 @@ redis/
beanstalk/ beanstalk/
.scalastyle .scalastyle
bin/ bin/
.worksheet

View file

@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats? # how often should the node send out heartbeats?
heartbeat-interval = 1s heartbeat-interval = 1s
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 5
# defines the failure detector threshold # defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures # A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high # a quick detection in the event of a real crash. Conversely, a high

View file

@ -62,7 +62,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val settings = new ClusterSettings(system.settings.config, system.name) val settings = new ClusterSettings(system.settings.config, system.name)
import settings._ import settings._
val selfAddress = system.provider match { val selfAddress: Address = system.provider match {
case c: ClusterActorRefProvider c.transport.address case c: ClusterActorRefProvider c.transport.address
case other throw new ConfigurationException( case other throw new ConfigurationException(
"ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]".
@ -74,7 +74,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
log.info("Cluster Node [{}] - is starting up...", selfAddress) log.info("Cluster Node [{}] - is starting up...", selfAddress)
val failureDetector = { val failureDetector: FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn } import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector]( system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({

View file

@ -287,8 +287,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// wipe the failure detector since we are starting fresh and shouldn't care about the past // wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset() failureDetector.reset()
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
publish(localGossip) publish(localGossip)
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
context.become(initialized) context.become(initialized)
if (address == selfAddress) if (address == selfAddress)

View file

@ -5,18 +5,32 @@ package akka.cluster
import language.postfixOps import language.postfixOps
import scala.collection.immutable.SortedSet import scala.collection.immutable.SortedSet
import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } import scala.annotation.tailrec
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import scala.concurrent.util.duration._ import scala.concurrent.util.duration._
import scala.concurrent.util.Deadline import scala.concurrent.util.Deadline
import scala.concurrent.util.FiniteDuration import scala.concurrent.util.FiniteDuration
import akka.cluster.ClusterEvent._
import java.net.URLEncoder import java.net.URLEncoder
import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props }
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import akka.cluster.ClusterEvent._
import akka.routing.ConsistentHash
/** /**
* Sent at regular intervals for failure detection. * INTERNAL API
*/ */
case class Heartbeat(from: Address) extends ClusterMessage private[akka] object ClusterHeartbeatReceiver {
/**
* Sent at regular intervals for failure detection.
*/
case class Heartbeat(from: Address) extends ClusterMessage
/**
* Tell failure detector at receiving side that it should
* remove the monitoring, because heartbeats will end from
* this node.
*/
case class EndHeartbeat(from: Address) extends ClusterMessage
}
/** /**
* INTERNAL API. * INTERNAL API.
@ -26,11 +40,13 @@ case class Heartbeat(from: Address) extends ClusterMessage
* to Cluster message after message, but concurrent with other types of messages. * to Cluster message after message, but concurrent with other types of messages.
*/ */
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
import ClusterHeartbeatReceiver._
val failureDetector = Cluster(context.system).failureDetector val failureDetector = Cluster(context.system).failureDetector
def receive = { def receive = {
case Heartbeat(from) failureDetector heartbeat from case Heartbeat(from) failureDetector heartbeat from
case EndHeartbeat(from) failureDetector remove from
} }
} }
@ -39,16 +55,11 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
* INTERNAL API * INTERNAL API
*/ */
private[cluster] object ClusterHeartbeatSender { private[cluster] object ClusterHeartbeatSender {
/**
* Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]]
* to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
/** /**
* Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
* another node and heartbeats should be sent until it becomes member or deadline is overdue. * another node and heartbeats should be sent unconditionally until it becomes
* member or deadline is overdue. This is done to be able to detect immediate death
* of the joining node.
* Local only, no need to serialize. * Local only, no need to serialize.
*/ */
case class JoinInProgress(address: Address, deadline: Deadline) case class JoinInProgress(address: Address, deadline: Deadline)
@ -58,14 +69,17 @@ private[cluster] object ClusterHeartbeatSender {
* INTERNAL API * INTERNAL API
* *
* This actor is responsible for sending the heartbeat messages to * This actor is responsible for sending the heartbeat messages to
* other nodes. Netty blocks when sending to broken connections. This actor * a few other nodes that will monitor this node.
* isolates sending to different nodes by using child workers for each target *
* Netty blocks when sending to broken connections. This actor
* isolates sending to different nodes by using child actors for each target
* address and thereby reduce the risk of irregular heartbeats to healty * address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes. * nodes due to broken connections to other nodes.
*/ */
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._ import ClusterHeartbeatSender._
import Member.addressOrdering import ClusterHeartbeatSenderConnection._
import ClusterHeartbeatReceiver._
import InternalClusterAction.HeartbeatTick import InternalClusterAction.HeartbeatTick
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
@ -74,9 +88,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
import context.dispatcher import context.dispatcher
val selfHeartbeat = Heartbeat(selfAddress) val selfHeartbeat = Heartbeat(selfAddress)
val selfEndHeartbeat = EndHeartbeat(selfAddress)
val selfAddressStr = selfAddress.toString
var nodes: SortedSet[Address] = SortedSet.empty var all = Set.empty[Address]
var joinInProgress: Map[Address, Deadline] = Map.empty var current = Set.empty[Address]
var ending = Map.empty[Address, Int]
var joinInProgress = Map.empty[Address, Deadline]
var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
// start periodic heartbeat to other nodes in cluster // start periodic heartbeat to other nodes in cluster
val heartbeatTask = val heartbeatTask =
@ -99,63 +118,146 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def clusterHeartbeatConnectionFor(address: Address): ActorRef = def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
/**
* Child name URL encoded target address.
*/
def encodeChildName(name: String): String = URLEncoder.encode(name, "UTF-8")
def receive = { def receive = {
case HeartbeatTick heartbeat()
case state: CurrentClusterState init(state) case state: CurrentClusterState init(state)
case MemberUnreachable(m) removeMember(m) case MemberUnreachable(m) removeMember(m)
case MemberRemoved(m) removeMember(m) case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member) case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) joinInProgress += (a -> d) case JoinInProgress(a, d) addJoinInProgress(a, d)
case HeartbeatTick heartbeat()
} }
def init(state: CurrentClusterState): Unit = { def init(state: CurrentClusterState): Unit = {
nodes = state.members.map(_.address) all = state.members.collect { case m if m.address != selfAddress m.address }
joinInProgress --= nodes joinInProgress --= all
consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
} }
def addMember(m: Member): Unit = { def addMember(m: Member): Unit = if (m.address != selfAddress) {
nodes += m.address all += m.address
joinInProgress -= m.address consistentHash = consistentHash :+ m.address
removeJoinInProgress(m.address)
update()
} }
def removeMember(m: Member): Unit = { def removeMember(m: Member): Unit = if (m.address != selfAddress) {
nodes -= m.address all -= m.address
joinInProgress -= m.address consistentHash = consistentHash :- m.address
removeJoinInProgress(m.address)
update()
}
def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) {
joinInProgress -= address
ending += (address -> 0)
}
def addJoinInProgress(address: Address, deadline: Deadline): Unit = {
if (address != selfAddress && !all.contains(address))
joinInProgress += (address -> deadline)
} }
def heartbeat(): Unit = { def heartbeat(): Unit = {
removeOverdueJoinInProgress() removeOverdueJoinInProgress()
val beatTo = nodes ++ joinInProgress.keys def connection(to: Address): ActorRef = {
// URL encoded target address as child actor name
val deadline = Deadline.now + HeartbeatInterval val connectionName = URLEncoder.encode(to.toString, "UTF-8")
for (to beatTo; if to != selfAddress) { context.actorFor(connectionName) match {
val workerName = encodeChildName(to.toString)
val worker = context.actorFor(workerName) match {
case notFound if notFound.isTerminated case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName)
case child child case child child
} }
worker ! SendHeartbeat(selfHeartbeat, to, deadline) }
val deadline = Deadline.now + HeartbeatInterval
(current ++ joinInProgress.keys) foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
// When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
// sent to notify it that no more heartbeats will be sent.
for ((to, count) ending) {
val c = connection(to)
c ! SendEndHeartbeat(selfEndHeartbeat, to)
if (count == NumberOfEndHeartbeats) {
ending -= to
c ! PoisonPill
} else {
ending += (to -> (count + 1))
}
} }
} }
/** /**
* Removes overdue joinInProgress from State. * Update current peers to send heartbeats to, and
* keep track of which nodes to stop sending heartbeats to.
*/
def update(): Unit = {
val previous = current
current = selectPeers
// start ending process for nodes not selected any more
ending ++= (previous -- current).map(_ -> 0)
// abort ending process for nodes that have been selected again
ending --= current
}
/**
* Select a few peers that heartbeats will be sent to, i.e. that will
* monitor this node. Try to send heartbeats to same nodes as much
* as possible, but re-balance with consistent hashing algorithm when
* new members are added or removed.
*/
def selectPeers: Set[Address] = {
val allSize = all.size
val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers)
// try more if consistentHash results in same node as already selected
val attemptLimit = nrOfPeers * 2
@tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
if (acc.size == nrOfPeers || n == attemptLimit) acc
else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1)
}
if (nrOfPeers >= allSize) all
else select(Set.empty[Address], 0)
}
/**
* Cleanup overdue joinInProgress, in case a joining node never
* became member, for some reason.
*/ */
def removeOverdueJoinInProgress(): Unit = { def removeOverdueJoinInProgress(): Unit = {
joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue address } val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
if (overdue.nonEmpty) {
log.info("Overdue join in progress [{}]", overdue.mkString(", "))
ending ++= overdue.map(_ -> 0)
joinInProgress --= overdue
}
} }
} }
/** /**
* Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. * INTERNAL API
*/
private[cluster] object ClusterHeartbeatSenderConnection {
import ClusterHeartbeatReceiver._
/**
* Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
* [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
/**
* Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
* [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node.
* Local only, no need to serialize.
*/
case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address)
}
/**
* Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
* and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
* *
* Netty blocks when sending to broken connections, and this actor uses * Netty blocks when sending to broken connections, and this actor uses
* a configurable circuit breaker to reduce connect attempts to broken * a configurable circuit breaker to reduce connect attempts to broken
@ -163,10 +265,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
* *
* @see ClusterHeartbeatSender * @see ClusterHeartbeatSender
*/ */
private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging { extends Actor with ActorLogging {
import ClusterHeartbeatSender._ import ClusterHeartbeatSenderConnection._
val breaker = { val breaker = {
val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings
@ -177,21 +279,19 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
} }
// make sure it will cleanup when not used any more
context.setReceiveTimeout(30 seconds)
def receive = { def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline) case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) { if (!deadline.isOverdue) {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// the CircuitBreaker will measure elapsed time and open if too many long calls // the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker { try breaker.withSyncCircuitBreaker {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
toRef ! heartbeatMsg toRef ! heartbeatMsg
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ } } catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
} }
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
case ReceiveTimeout context.stop(self) // cleanup when not used case SendEndHeartbeat(endHeartbeatMsg, _)
log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef)
toRef ! endHeartbeatMsg
} }
} }

View file

@ -24,6 +24,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr case AddressFromURIString(addr) addr

View file

@ -42,7 +42,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
gossip-interval = 500 ms gossip-interval = 500 ms
auto-join = off auto-join = off
auto-down = on auto-down = on
failure-detector.acceptable-heartbeat-pause = 10s failure-detector.acceptable-heartbeat-pause = 5s
publish-stats-interval = 0 s # always, when it happens publish-stats-interval = 0 s # always, when it happens
} }
akka.event-handlers = ["akka.testkit.TestEventListener"] akka.event-handlers = ["akka.testkit.TestEventListener"]
@ -57,7 +57,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
akka.scheduler.tick-duration = 33 ms akka.scheduler.tick-duration = 33 ms
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.remote.netty.execution-pool-size = 4 akka.remote.netty.execution-pool-size = 4
#akka.remote.netty.reconnection-time-window = 1s #akka.remote.netty.reconnection-time-window = 10s
akka.remote.netty.read-timeout = 5s
akka.remote.netty.write-timeout = 5s
akka.remote.netty.backoff-timeout = 500ms akka.remote.netty.backoff-timeout = 500ms
akka.remote.netty.connection-timeout = 500ms akka.remote.netty.connection-timeout = 500ms

View file

@ -29,6 +29,8 @@ class ClusterConfigSpec extends AkkaSpec {
PeriodicTasksInitialDelay must be(1 seconds) PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second) GossipInterval must be(1 second)
HeartbeatInterval must be(1 second) HeartbeatInterval must be(1 second)
NumberOfEndHeartbeats must be(4)
MonitoredByNrOfMembers must be(5)
LeaderActionsInterval must be(1 second) LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second)
PublishStatsInterval must be(10 second) PublishStatsInterval must be(10 second)