From e2551494c41eef95a916b38897bb6ad09aef89a0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Jun 2012 14:59:34 +0200 Subject: [PATCH] Use Use separate heartbeats for FailureDetector, see #2214 * Send Heartbeat message to all members at regular interval * Removed the need to gossip to myself --- .../src/main/resources/reference.conf | 3 + .../akka/cluster/AccrualFailureDetector.scala | 6 +- .../src/main/scala/akka/cluster/Cluster.scala | 76 +++++++++++++------ .../scala/akka/cluster/ClusterSettings.scala | 1 + .../akka/cluster/MultiNodeClusterSpec.scala | 1 + .../scala/akka/cluster/SunnyWeatherSpec.scala | 2 +- .../akka/cluster/ClusterConfigSpec.scala | 1 + .../test/scala/akka/cluster/ClusterSpec.scala | 8 +- 8 files changed, 65 insertions(+), 33 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 8c905d5b29..1e7c0e4c08 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -27,6 +27,9 @@ akka { # how often should the node send out gossip information? gossip-interval = 1s + # how often should the node send out heartbeats? + heartbeat-interval = 1s + # how often should the leader perform maintenance tasks? leader-actions-interval = 1s diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c7aaf12fcf..76c773f759 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -66,7 +66,8 @@ class AccrualFailureDetector( */ @tailrec final def heartbeat(connection: Address) { - log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) + // FIXME change to debug log level, when failure detector is stable + log.info("Node [{}] - Heartbeat from connection [{}] ", address, connection) val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -155,7 +156,8 @@ class AccrualFailureDetector( else PhiFactor * timestampDiff / mean } - log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) + // FIXME change to debug log level, when failure detector is stable + log.info("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) phi } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 4ea43d50e4..df8f2ec89b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -138,7 +138,7 @@ object Member { /** * Envelope adding a sender address to the gossip. */ -case class GossipEnvelope(sender: Member, gossip: Gossip) extends ClusterMessage +case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage /** * Defines the current status of a cluster member node @@ -244,6 +244,8 @@ case class Gossip( ")" } +case class Heartbeat(from: Address) + /** * Manages routing of the different cluster commands. * Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message. @@ -272,7 +274,8 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor { val log = Logging(context.system, this) def receive = { - case GossipEnvelope(sender, gossip) ⇒ cluster.receive(sender, gossip) + case Heartbeat(from) ⇒ cluster.receiveHeartbeat(from) + case GossipEnvelope(from, gossip) ⇒ cluster.receiveGossip(from, gossip) } override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown) @@ -388,7 +391,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ log.info("Cluster Node [{}] - is starting up...", selfAddress) - // create superisor for daemons under path "/system/cluster" + // create supervisor for daemons under path "/system/cluster" private val clusterDaemons = { val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster") Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match { @@ -399,8 +402,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val state = { val member = Member(selfAddress, MemberStatus.Joining) - val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock - new AtomicReference[State](State(gossip)) + val versionedGossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock + val seenVersionedGossip = versionedGossip seen selfAddress + new AtomicReference[State](State(seenVersionedGossip)) } // try to join the node defined in the 'akka.cluster.node-to-join' option @@ -415,6 +419,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ gossip() } + // start periodic heartbeat to all nodes in cluster + private val heartbeatCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, HeartbeatInterval) { + heartbeat() + } + // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { reapUnreachableMembers() @@ -491,6 +500,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) gossipCanceller.cancel() + heartbeatCanceller.cancel() failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() system.stop(clusterDaemons) @@ -588,6 +598,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { + // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens if (node != selfAddress) failureDetector heartbeat node notifyMembershipChangeListeners(localState, newState) } @@ -615,7 +626,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update else { - if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector notifyMembershipChangeListeners(localState, newState) } } @@ -708,7 +718,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Receive new gossip. */ @tailrec - final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = { + final private[cluster] def receiveGossip(from: Address, remoteGossip: Gossip): Unit = { val localState = state.get val localGossip = localState.latestGossip @@ -718,8 +728,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip + vclockNode - log.debug( - "Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]", + // FIXME change to debug log level, when failure detector is stable + log.info( + """Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""", remoteGossip, localGossip, versionedMergedGossip) versionedMergedGossip @@ -736,15 +747,20 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newState = localState copy (latestGossip = winningGossip seen selfAddress) // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update + if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) - - if (sender.address != selfAddress) failureDetector heartbeat sender.address + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) notifyMembershipChangeListeners(localState, newState) } } + /** + * INTERNAL API + */ + private[cluster] def receiveHeartbeat(from: Address): Unit = { + failureDetector heartbeat from + } + /** * Joins the pre-configured contact point. */ @@ -769,14 +785,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newSelf = localSelf copy (status = newStatus) // change my state in 'gossip.members' - val newMembersSet = localMembers map { member ⇒ + val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member } - // ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile) - val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*) - val newGossip = localGossip copy (members = newMembersSortedSet) + val newGossip = localGossip copy (members = newMembers) // version my changes val versionedGossip = newGossip + vclockNode @@ -793,7 +807,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private[akka] def gossipTo(address: Address): Unit = { val connection = clusterGossipConnectionFor(address) log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) - connection ! GossipEnvelope(self, latestGossip) + connection ! GossipEnvelope(selfAddress, latestGossip) } /** @@ -840,12 +854,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) - if (isSingletonCluster(localState)) { - // gossip to myself - // TODO could perhaps be optimized, no need to gossip to myself when Up? - gossipTo(selfAddress) - - } else if (isAvailable(localState)) { + if (!isSingletonCluster(localState) && isAvailable(localState)) { val localGossip = localState.latestGossip // important to not accidentally use `map` of the SortedSet, since the original order is not preserved val localMembers = localGossip.members.toIndexedSeq @@ -876,6 +885,25 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } } + /** + * INTERNAL API + */ + private[akka] def heartbeat(): Unit = { + val localState = state.get + + if (!isSingletonCluster(localState)) { + val liveMembers = localState.latestGossip.members.toIndexedSeq + val unreachableMembers = localState.latestGossip.overview.unreachable + + // FIXME use unreachable? + for (member ← (liveMembers ++ unreachableMembers); if member.address != selfAddress) { + val connection = clusterGossipConnectionFor(member.address) + log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) + connection ! Heartbeat(selfAddress) + } + } + } + /** * INTERNAL API * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 0e7dac06ab..90831db2e6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -21,6 +21,7 @@ class ClusterSettings(val config: Config, val systemName: String) { } val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index b185067ab0..729923699d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -17,6 +17,7 @@ object MultiNodeClusterSpec { akka.cluster { auto-down = off gossip-interval = 200 ms + heartbeat-interval = 400 ms leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index e36980d859..fcb1393f8a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -24,7 +24,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { gossip-interval = 400 ms nr-of-deputy-nodes = 0 } - akka.loglevel = DEBUG + akka.loglevel = INFO """)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6b2ff1962c..3a96451466 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -21,6 +21,7 @@ class ClusterConfigSpec extends AkkaSpec { NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) + HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index d3d1d6d0a2..9b1a9706af 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -99,15 +99,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "A Cluster" must { - "initially be singleton cluster and reach convergence after first gossip" in { + "initially be singleton cluster and reach convergence immediately" in { cluster.isSingletonCluster must be(true) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) - cluster.convergence.isDefined must be(false) - cluster.gossip() - expectMsg(GossipTo(selfAddress)) - awaitCond(cluster.convergence.isDefined) - memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) + cluster.convergence.isDefined must be(true) cluster.leaderActions() memberStatus(selfAddress) must be(Some(MemberStatus.Up)) }