From cfd04bba3dd158ca26705ad0dfc297baf4fae262 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 18 Feb 2012 17:48:07 +0100 Subject: [PATCH] Fixed remaining issues with gossip based failure detection and removal of unreachable nodes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Completed gossip based failure detection. * Completed removal of unreachable nodes according to failure detector. * Added passing tests. * Misc other fixes, more logging, more comments. Signed-off-by: Jonas Bonér --- .../akka/cluster/AccrualFailureDetector.scala | 70 +++--- .../main/scala/akka/cluster/Gossiper.scala | 23 +- .../cluster/AccrualFailureDetectorSpec.scala | 10 +- .../GossipingAccrualFailureDetectorSpec.scala | 199 ++++++++++-------- 4 files changed, 173 insertions(+), 129 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 8ee9f857a0..e0d7cae052 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -23,14 +23,17 @@ import System.{ currentTimeMillis ⇒ newTimestamp } *

* Default threshold is 8, but can be configured in the Akka config. */ -class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000) { +class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000) { private final val PhiFactor = 1.0 / math.log(10.0) - private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) - private val log = Logging(system, "FailureDetector") + /** + * Holds the failure statistics for a specific node Address. + */ + private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) + /** * Implement using optimistic lockless concurrency, all state is represented * by this immutable case class and managed by an AtomicReference. @@ -54,22 +57,26 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma */ @tailrec final def heartbeat(connection: Address) { - log.debug("Heartbeat from connection [{}] ", connection) - val oldState = state.get + log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) + val oldState = state.get + val oldFailureStats = oldState.failureStats + val oldTimestamps = oldState.timestamps val latestTimestamp = oldState.timestamps.get(connection) + if (latestTimestamp.isEmpty) { // this is heartbeat from a new connection // add starter records for this new connection - val failureStats = oldState.failureStats + (connection -> FailureStats()) - val intervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) - val timestamps = oldState.timestamps + (connection -> newTimestamp) + val newFailureStats = oldFailureStats + (connection -> FailureStats()) + val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) + val newTimestamps = oldTimestamps + (connection -> newTimestamp) - val newState = oldState copy (version = oldState.version + 1, - failureStats = failureStats, - intervalHistory = intervalHistory, - timestamps = timestamps) + val newState = oldState copy ( + version = oldState.version + 1, + failureStats = newFailureStats, + intervalHistory = newIntervalHistory, + timestamps = newTimestamps) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -79,7 +86,7 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma val timestamp = newTimestamp val interval = timestamp - latestTimestamp.get - val timestamps = oldState.timestamps + (connection -> timestamp) // record new timestamp + val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp var newIntervalsForConnection = oldState.intervalHistory.get(connection).getOrElse(Vector.empty[Long]) :+ interval // append the new interval to history @@ -89,36 +96,33 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma newIntervalsForConnection = newIntervalsForConnection drop 0 } - val failureStats = + val newFailureStats = if (newIntervalsForConnection.size > 1) { - val mean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble - - val oldFailureStats = oldState.failureStats.get(connection).getOrElse(FailureStats()) + val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble + val oldConnectionFailureStats = oldFailureStats.get(connection).getOrElse(throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")) val deviationSum = newIntervalsForConnection .map(_.toDouble) - .foldLeft(0.0D)((x, y) ⇒ x + (y - mean)) + .foldLeft(0.0D)((x, y) ⇒ x + (y - newMean)) - val variance: Double = deviationSum / newIntervalsForConnection.size.toDouble - val deviation: Double = math.sqrt(variance) + val newVariance: Double = deviationSum / newIntervalsForConnection.size.toDouble + val newDeviation: Double = math.sqrt(newVariance) - val newFailureStats = oldFailureStats copy (mean = mean, - deviation = deviation, - variance = variance) + val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance) + oldFailureStats + (connection -> newFailureStats) - oldState.failureStats + (connection -> newFailureStats) } else { - oldState.failureStats + oldFailureStats } - val intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) + val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) val newState = oldState copy (version = oldState.version + 1, - failureStats = failureStats, - intervalHistory = intervalHistory, - timestamps = timestamps) + failureStats = newFailureStats, + intervalHistory = newIntervalHistory, + timestamps = newTimestamps) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -138,17 +142,21 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma def phi(connection: Address): Double = { val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) + val phi = if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timestampDiff = newTimestamp - oldTimestamp.get - val mean = oldState.failureStats.get(connection).getOrElse(FailureStats()).mean + + val stats = oldState.failureStats.get(connection) + val mean = stats.getOrElse(throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")).mean + if (mean == 0.0D) 0.0D else PhiFactor * timestampDiff / mean } // only log if PHI value is starting to get interesting - if (phi > 0.0D) log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) + if (phi > 0.0D) log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) phi } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index bb1e19e746..73575efec7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -210,11 +210,12 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) + val failureDetector = new AccrualFailureDetector( + system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) private val serialization = remote.serialization - private val failureDetector = new AccrualFailureDetector( - system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) private val isRunning = new AtomicBoolean(true) private val log = Logging(system, "Gossiper") @@ -279,12 +280,10 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { if (isRunning.compareAndSet(true, false)) { log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress) - try connectionManager.shutdown() finally { - try system.stop(clusterDaemon) finally { - try gossipCanceller.cancel() finally { - try scrutinizeCanceller.cancel() finally { - log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress) - } + try system.stop(clusterDaemon) finally { + try gossipCanceller.cancel() finally { + try scrutinizeCanceller.cancel() finally { + log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress) } } } @@ -298,6 +297,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { final def joining(node: Address) { log.info("Node [{}] - Node [{}] is joining", remoteAddress, node) + failureDetector heartbeat node // update heartbeat in failure detector + val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -475,7 +476,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { */ private def gossipTo(address: Address) { setUpConnectionTo(address) foreach { connection ⇒ - log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, address) + log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection) connection ! GossipEnvelope(self, latestGossip) } } @@ -496,7 +497,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { } /** - * Scrutinizes the cluster; marks members detected by the failure detector as unavailable. + * Scrutinizes the cluster; marks members detected by the failure detector as unreachable. */ @tailrec final private def scrutinize() { @@ -517,6 +518,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { val newMembers = localMembers diff newlyDetectedUnreachableMembers val newUnreachableAddresses: Set[Address] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses + log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", ")) + val newOverview = localOverview copy (unreachable = newUnreachableAddresses) val newGossip = localGossip copy (overview = newOverview, members = newMembers) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 034f582e0d..2e00c72ad1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -13,13 +13,13 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" val conn2 = Address("akka", "", "localhost", 2553) "return phi value of 0.0D on startup for each address" in { - val fd = new AccrualFailureDetector(system) + val fd = new AccrualFailureDetector(system, conn) fd.phi(conn) must be(0.0D) fd.phi(conn2) must be(0.0D) } "mark node as available after a series of successful heartbeats" in { - val fd = new AccrualFailureDetector(system) + val fd = new AccrualFailureDetector(system, conn) fd.heartbeat(conn) @@ -34,7 +34,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector "mark node as dead after explicit removal of connection" ignore { - val fd = new AccrualFailureDetector(system) + val fd = new AccrualFailureDetector(system, conn) fd.heartbeat(conn) @@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as dead if heartbeat are missed" in { - val fd = new AccrualFailureDetector(system, threshold = 3) + val fd = new AccrualFailureDetector(system, conn, threshold = 3) fd.heartbeat(conn) @@ -70,7 +70,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { - val fd = new AccrualFailureDetector(system, threshold = 3) + val fd = new AccrualFailureDetector(system, conn, threshold = 3) fd.heartbeat(conn) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 6366a9f65e..413ab7e537 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -1,95 +1,128 @@ -// /** -// * Copyright (C) 2009-2011 Typesafe Inc. -// */ -// package akka.cluster +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster -// import java.net.InetSocketAddress +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ +import akka.util.duration._ -// import akka.testkit._ -// import akka.dispatch._ -// import akka.actor._ -// import com.typesafe.config._ +import com.typesafe.config._ -// class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" -// akka { -// loglevel = "INFO" -// actor.provider = "akka.remote.RemoteActorRefProvider" +import java.net.InetSocketAddress -// remote.server.hostname = localhost -// remote.server.port = 5550 -// remote.failure-detector.threshold = 3 -// cluster.seed-nodes = ["akka://localhost:5551"] -// } -// """) with ImplicitSender { +class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" + akka { + loglevel = "INFO" + cluster.failure-detector.threshold = 3 + actor.debug.lifecycle = on + actor.debug.autoreceive = on + } + """) with ImplicitSender { -// val conn1 = Address("akka", system.systemName, Some("localhost"), Some(5551)) -// val node1 = ActorSystem("GossiperSpec", ConfigFactory -// .parseString("akka { remote.server.port=5551, cluster.use-cluster = on }") -// .withFallback(system.settings.config)) -// val remote1 = -// node1.asInstanceOf[ActorSystemImpl] -// .provider.asInstanceOf[RemoteActorRefProvider] -// .remote -// val gossiper1 = remote1.gossiper -// val fd1 = remote1.failureDetector -// gossiper1 must be('defined) + var gossiper1: Gossiper = _ + var gossiper2: Gossiper = _ + var gossiper3: Gossiper = _ -// val conn2 = RemoteNettyAddress("localhost", 5552) -// val node2 = ActorSystem("GossiperSpec", ConfigFactory -// .parseString("akka { remote.server.port=5552, cluster.use-cluster = on }") -// .withFallback(system.settings.config)) -// val remote2 = -// node2.asInstanceOf[ActorSystemImpl] -// .provider.asInstanceOf[RemoteActorRefProvider] -// .remote -// val gossiper2 = remote2.gossiper -// val fd2 = remote2.failureDetector -// gossiper2 must be('defined) + var node1: ActorSystemImpl = _ + var node2: ActorSystemImpl = _ + var node3: ActorSystemImpl = _ -// val conn3 = RemoteNettyAddress("localhost", 5553) -// val node3 = ActorSystem("GossiperSpec", ConfigFactory -// .parseString("akka { remote.server.port=5553, cluster.use-cluster = on }") -// .withFallback(system.settings.config)) -// val remote3 = -// node3.asInstanceOf[ActorSystemImpl] -// .provider.asInstanceOf[RemoteActorRefProvider] -// .remote -// val gossiper3 = remote3.gossiper -// val fd3 = remote3.failureDetector -// gossiper3 must be('defined) + try { + "A Gossip-driven Failure Detector" must { -// "A Gossip-driven Failure Detector" must { + // ======= NODE 1 ======== + node1 = ActorSystem("node1", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port=5550 + } + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider] + gossiper1 = Gossiper(node1, remote1) + val fd1 = gossiper1.failureDetector + val address1 = gossiper1.self.address -// "receive gossip heartbeats so that all healthy nodes in the cluster are marked 'available'" ignore { -// Thread.sleep(5000) // let them gossip for 10 seconds -// fd1.isAvailable(conn2) must be(true) -// fd1.isAvailable(conn3) must be(true) -// fd2.isAvailable(conn1) must be(true) -// fd2.isAvailable(conn3) must be(true) -// fd3.isAvailable(conn1) must be(true) -// fd3.isAvailable(conn2) must be(true) -// } + // ======= NODE 2 ======== + node2 = ActorSystem("node2", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port = 5551 + } + cluster.node-to-join = "akka://node1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote2 = node2.provider.asInstanceOf[RemoteActorRefProvider] + gossiper2 = Gossiper(node2, remote2) + val fd2 = gossiper2.failureDetector + val address2 = gossiper2.self.address -// "mark node as 'unavailable' if a node in the cluster is shut down and its heartbeats stops" ignore { -// // kill node 3 -// gossiper3.get.shutdown() -// node3.shutdown() -// Thread.sleep(5000) // let them gossip for 10 seconds + // ======= NODE 3 ======== + node3 = ActorSystem("node3", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port=5552 + } + cluster.node-to-join = "akka://node1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote3 = node3.provider.asInstanceOf[RemoteActorRefProvider] + gossiper3 = Gossiper(node3, remote3) + val fd3 = gossiper3.failureDetector + val address3 = gossiper3.self.address -// fd1.isAvailable(conn2) must be(true) -// fd1.isAvailable(conn3) must be(false) -// fd2.isAvailable(conn1) must be(true) -// fd2.isAvailable(conn3) must be(false) -// } -// } + "receive gossip heartbeats so that all healthy nodes in the cluster are marked 'available'" in { + println("Let the nodes gossip for a while...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + fd1.isAvailable(address2) must be(true) + fd1.isAvailable(address3) must be(true) + fd2.isAvailable(address1) must be(true) + fd2.isAvailable(address3) must be(true) + fd3.isAvailable(address1) must be(true) + fd3.isAvailable(address2) must be(true) + } -// override def atTermination() { -// gossiper1.get.shutdown() -// gossiper2.get.shutdown() -// gossiper3.get.shutdown() -// node1.shutdown() -// node2.shutdown() -// node3.shutdown() -// // FIXME Ordering problem - If we shut down the ActorSystem before the Gossiper then we get an IllegalStateException -// } -// } + "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" in { + // shut down node3 + gossiper3.shutdown() + node3.shutdown() + println("Give the remaning nodes time to detect failure...") + Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of node3 + fd1.isAvailable(address2) must be(true) + fd1.isAvailable(address3) must be(false) + fd2.isAvailable(address1) must be(true) + fd2.isAvailable(address3) must be(false) + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + gossiper1.shutdown() + node1.shutdown() + + gossiper2.shutdown() + node2.shutdown() + + gossiper3.shutdown() + node3.shutdown() + } +}