Use Use separate heartbeats for FailureDetector, see #2214

* Send Heartbeat message to all members at regular interval
* Removed the need to gossip to myself
This commit is contained in:
Patrik Nordwall 2012-06-11 14:59:34 +02:00
parent f4eaeab43e
commit e2551494c4
8 changed files with 65 additions and 33 deletions

View file

@ -138,7 +138,7 @@ object Member {
/**
* Envelope adding a sender address to the gossip.
*/
case class GossipEnvelope(sender: Member, gossip: Gossip) extends ClusterMessage
case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage
/**
* Defines the current status of a cluster member node
@ -244,6 +244,8 @@ case class Gossip(
")"
}
case class Heartbeat(from: Address)
/**
* Manages routing of the different cluster commands.
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
@ -272,7 +274,8 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
def receive = {
case GossipEnvelope(sender, gossip) cluster.receive(sender, gossip)
case Heartbeat(from) cluster.receiveHeartbeat(from)
case GossipEnvelope(from, gossip) cluster.receiveGossip(from, gossip)
}
override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown)
@ -388,7 +391,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
log.info("Cluster Node [{}] - is starting up...", selfAddress)
// create superisor for daemons under path "/system/cluster"
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons = {
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster")
Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
@ -399,8 +402,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private val state = {
val member = Member(selfAddress, MemberStatus.Joining)
val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
new AtomicReference[State](State(gossip))
val versionedGossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
val seenVersionedGossip = versionedGossip seen selfAddress
new AtomicReference[State](State(seenVersionedGossip))
}
// try to join the node defined in the 'akka.cluster.node-to-join' option
@ -415,6 +419,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
gossip()
}
// start periodic heartbeat to all nodes in cluster
private val heartbeatCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, HeartbeatInterval) {
heartbeat()
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
reapUnreachableMembers()
@ -491,6 +500,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
gossipCanceller.cancel()
heartbeatCanceller.cancel()
failureDetectorReaperCanceller.cancel()
leaderActionsCanceller.cancel()
system.stop(clusterDaemons)
@ -588,6 +598,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) failureDetector heartbeat node
notifyMembershipChangeListeners(localState, newState)
}
@ -615,7 +626,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector
notifyMembershipChangeListeners(localState, newState)
}
}
@ -708,7 +718,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Receive new gossip.
*/
@tailrec
final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = {
final private[cluster] def receiveGossip(from: Address, remoteGossip: Gossip): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
@ -718,8 +728,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip + vclockNode
log.debug(
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
// FIXME change to debug log level, when failure detector is stable
log.info(
"""Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip
@ -736,15 +747,20 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
else {
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
if (sender.address != selfAddress) failureDetector heartbeat sender.address
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
notifyMembershipChangeListeners(localState, newState)
}
}
/**
* INTERNAL API
*/
private[cluster] def receiveHeartbeat(from: Address): Unit = {
failureDetector heartbeat from
}
/**
* Joins the pre-configured contact point.
*/
@ -769,14 +785,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val newSelf = localSelf copy (status = newStatus)
// change my state in 'gossip.members'
val newMembersSet = localMembers map { member
val newMembers = localMembers map { member
if (member.address == selfAddress) newSelf
else member
}
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
val newGossip = localGossip copy (members = newMembersSortedSet)
val newGossip = localGossip copy (members = newMembers)
// version my changes
val versionedGossip = newGossip + vclockNode
@ -793,7 +807,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private[akka] def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
connection ! GossipEnvelope(selfAddress, latestGossip)
}
/**
@ -840,12 +854,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (isSingletonCluster(localState)) {
// gossip to myself
// TODO could perhaps be optimized, no need to gossip to myself when Up?
gossipTo(selfAddress)
} else if (isAvailable(localState)) {
if (!isSingletonCluster(localState) && isAvailable(localState)) {
val localGossip = localState.latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
@ -876,6 +885,25 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
}
}
/**
* INTERNAL API
*/
private[akka] def heartbeat(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState)) {
val liveMembers = localState.latestGossip.members.toIndexedSeq
val unreachableMembers = localState.latestGossip.overview.unreachable
// FIXME use unreachable?
for (member (liveMembers ++ unreachableMembers); if member.address != selfAddress) {
val connection = clusterGossipConnectionFor(member.address)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
connection ! Heartbeat(selfAddress)
}
}
}
/**
* INTERNAL API
*