Move heartbeat sending out from ClusterCoreDaemon, see #2284

This commit is contained in:
Patrik Nordwall 2012-10-01 10:02:48 +02:00
parent 5b0a2ec7ee
commit cecde67226
2 changed files with 86 additions and 46 deletions

View file

@ -155,8 +155,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
withDispatcher(context.props.dispatcher), name = "publisher") withDispatcher(context.props.dispatcher), name = "publisher")
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)). val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "core") withDispatcher(context.props.dispatcher), name = "core")
context.actorOf(Props[ClusterHeartbeatDaemon]. context.actorOf(Props[ClusterHeartbeatReceiver].
withDispatcher(context.props.dispatcher), name = "heartbeat") withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)). if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics") withDispatcher(context.props.dispatcher), name = "metrics")
@ -172,26 +172,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
import ClusterLeaderAction._ import ClusterLeaderAction._
import InternalClusterAction._ import InternalClusterAction._
import ClusterHeartbeatSender._ import ClusterHeartbeatSender.JoinInProgress
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, failureDetector } import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._ import cluster.settings._
val vclockNode = VectorClock.Node(selfAddress.toString) val vclockNode = VectorClock.Node(selfAddress.toString)
val selfHeartbeat = Heartbeat(selfAddress)
// note that self is not initially member, // note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet // and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip() var latestGossip: Gossip = Gossip()
var joinInProgress: Map[Address, Deadline] = Map.empty
var stats = ClusterStats() var stats = ClusterStats()
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props[ClusterCoreSender]. val coreSender = context.actorOf(Props[ClusterCoreSender].
withDispatcher(UseDispatcher), name = "coreSender") withDispatcher(UseDispatcher), name = "coreSender")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
import context.dispatcher import context.dispatcher
@ -201,12 +199,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
self ! GossipTick self ! GossipTick
} }
// start periodic heartbeat to all nodes in cluster
val heartbeatTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
self ! HeartbeatTick
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask = val failureDetectorReaperTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) {
@ -232,7 +224,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
override def postStop(): Unit = { override def postStop(): Unit = {
gossipTask.cancel() gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel() failureDetectorReaperTask.cancel()
leaderActionsTask.cancel() leaderActionsTask.cancel()
publishStatsTask foreach { _.cancel() } publishStatsTask foreach { _.cancel() }
@ -250,7 +241,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case msg: GossipEnvelope receiveGossip(msg) case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg) case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip() case GossipTick gossip()
case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers() case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions() case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats() case PublishStatsTick publishInternalStats()
@ -293,11 +283,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localGossip = latestGossip val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty // wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip() latestGossip = Gossip()
joinInProgress = Map(address -> (Deadline.now + JoinTimeout))
// wipe the failure detector since we are starting fresh and shouldn't care about the past // wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset() failureDetector.reset()
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
publish(localGossip) publish(localGossip)
context.become(initialized) context.become(initialized)
@ -517,12 +507,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer else remoteGossip // remote gossip is newer
val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress
else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address)
latestGossip = winningGossip seen selfAddress latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector // for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).foreach { (latestGossip.members -- localGossip.members).foreach {
@ -744,27 +729,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} }
} }
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval
beatTo.foreach { address if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) }
}
/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
}
/** /**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/ */
def reapUnreachableMembers(): Unit = { def reapUnreachableMembers(): Unit = {
if (!isSingletonCluster && isAvailable) { if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available // only scrutinize if we are a non-singleton cluster and available

View file

@ -4,12 +4,14 @@
package akka.cluster package akka.cluster
import language.postfixOps import language.postfixOps
import scala.collection.immutable.SortedSet
import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props }
import java.security.MessageDigest import java.security.MessageDigest
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import scala.concurrent.util.duration._ import scala.concurrent.util.duration._
import scala.concurrent.util.Deadline import scala.concurrent.util.Deadline
import scala.concurrent.util.FiniteDuration
import akka.cluster.ClusterEvent._
/** /**
* Sent at regular intervals for failure detection. * Sent at regular intervals for failure detection.
@ -19,11 +21,11 @@ case class Heartbeat(from: Address) extends ClusterMessage
/** /**
* INTERNAL API. * INTERNAL API.
* *
* Receives Heartbeat messages and delegates to Cluster. * Receives Heartbeat messages and updates failure detector.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages. * to Cluster message after message, but concurrent with other types of messages.
*/ */
private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
val failureDetector = Cluster(context.system).failureDetector val failureDetector = Cluster(context.system).failureDetector
@ -38,12 +40,18 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg
*/ */
private[cluster] object ClusterHeartbeatSender { private[cluster] object ClusterHeartbeatSender {
/** /**
* * Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]]
* Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]]
* to the other node. * to the other node.
* Local only, no need to serialize. * Local only, no need to serialize.
*/ */
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
/**
* Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
* another node and heartbeats should be sent until it becomes member or deadline is overdue.
* Local only, no need to serialize.
*/
case class JoinInProgress(address: Address, deadline: Deadline)
} }
/* /*
@ -57,12 +65,39 @@ private[cluster] object ClusterHeartbeatSender {
*/ */
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._ import ClusterHeartbeatSender._
import Member.addressOrdering
import InternalClusterAction.HeartbeatTick
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler }
import cluster.settings._
import context.dispatcher
val selfHeartbeat = Heartbeat(selfAddress)
var nodes: SortedSet[Address] = SortedSet.empty
var joinInProgress: Map[Address, Deadline] = Map.empty
// start periodic heartbeat to other nodes in cluster
val heartbeatTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
self ! HeartbeatTick
}
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
}
override def postStop(): Unit = {
heartbeatTask.cancel()
cluster.unsubscribe(self)
}
/** /**
* Looks up and returns the remote cluster heartbeat connection for the specific address. * Looks up and returns the remote cluster heartbeat connection for the specific address.
*/ */
def clusterHeartbeatConnectionFor(address: Address): ActorRef = def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
val digester = MessageDigest.getInstance("MD5") val digester = MessageDigest.getInstance("MD5")
@ -76,14 +111,51 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
} }
def receive = { def receive = {
case msg @ SendHeartbeat(from, to, deadline) case state: CurrentClusterState init(state)
case MemberUnreachable(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) joinInProgress += (a -> d)
case HeartbeatTick heartbeat()
}
def init(state: CurrentClusterState): Unit = {
nodes = state.members.map(_.address)
joinInProgress --= nodes
}
def addMember(m: Member): Unit = {
nodes += m.address
joinInProgress -= m.address
}
def removeMember(m: Member): Unit = {
nodes -= m.address
joinInProgress -= m.address
}
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val beatTo = nodes ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval
for (to beatTo; if to != selfAddress) {
val workerName = encodeChildName(to.toString) val workerName = encodeChildName(to.toString)
val worker = context.actorFor(workerName) match { val worker = context.actorFor(workerName) match {
case notFound if notFound.isTerminated case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName)
case child child case child child
} }
worker ! msg worker ! SendHeartbeat(selfHeartbeat, to, deadline)
}
}
/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue address }
} }
} }