Fixed remaining issues with gossip based failure detection and removal of unreachable nodes.

* Completed gossip based failure detection.
* Completed removal of unreachable nodes according to failure detector.
* Added passing tests.
* Misc other fixes, more logging, more comments.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-02-18 17:48:07 +01:00
parent db1e1da7e7
commit cfd04bba3d
4 changed files with 173 additions and 129 deletions

View file

@ -23,14 +23,17 @@ import System.{ currentTimeMillis ⇒ newTimestamp }
* <p/> * <p/>
* Default threshold is 8, but can be configured in the Akka config. * Default threshold is 8, but can be configured in the Akka config.
*/ */
class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000) { class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000) {
private final val PhiFactor = 1.0 / math.log(10.0) private final val PhiFactor = 1.0 / math.log(10.0)
private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D)
private val log = Logging(system, "FailureDetector") private val log = Logging(system, "FailureDetector")
/**
* Holds the failure statistics for a specific node Address.
*/
private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D)
/** /**
* Implement using optimistic lockless concurrency, all state is represented * Implement using optimistic lockless concurrency, all state is represented
* by this immutable case class and managed by an AtomicReference. * by this immutable case class and managed by an AtomicReference.
@ -54,22 +57,26 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
*/ */
@tailrec @tailrec
final def heartbeat(connection: Address) { final def heartbeat(connection: Address) {
log.debug("Heartbeat from connection [{}] ", connection) log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection)
val oldState = state.get
val oldState = state.get
val oldFailureStats = oldState.failureStats
val oldTimestamps = oldState.timestamps
val latestTimestamp = oldState.timestamps.get(connection) val latestTimestamp = oldState.timestamps.get(connection)
if (latestTimestamp.isEmpty) { if (latestTimestamp.isEmpty) {
// this is heartbeat from a new connection // this is heartbeat from a new connection
// add starter records for this new connection // add starter records for this new connection
val failureStats = oldState.failureStats + (connection -> FailureStats()) val newFailureStats = oldFailureStats + (connection -> FailureStats())
val intervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
val timestamps = oldState.timestamps + (connection -> newTimestamp) val newTimestamps = oldTimestamps + (connection -> newTimestamp)
val newState = oldState copy (version = oldState.version + 1, val newState = oldState copy (
failureStats = failureStats, version = oldState.version + 1,
intervalHistory = intervalHistory, failureStats = newFailureStats,
timestamps = timestamps) intervalHistory = newIntervalHistory,
timestamps = newTimestamps)
// if we won the race then update else try again // if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -79,7 +86,7 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
val timestamp = newTimestamp val timestamp = newTimestamp
val interval = timestamp - latestTimestamp.get val interval = timestamp - latestTimestamp.get
val timestamps = oldState.timestamps + (connection -> timestamp) // record new timestamp val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp
var newIntervalsForConnection = var newIntervalsForConnection =
oldState.intervalHistory.get(connection).getOrElse(Vector.empty[Long]) :+ interval // append the new interval to history oldState.intervalHistory.get(connection).getOrElse(Vector.empty[Long]) :+ interval // append the new interval to history
@ -89,36 +96,33 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
newIntervalsForConnection = newIntervalsForConnection drop 0 newIntervalsForConnection = newIntervalsForConnection drop 0
} }
val failureStats = val newFailureStats =
if (newIntervalsForConnection.size > 1) { if (newIntervalsForConnection.size > 1) {
val mean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble
val oldConnectionFailureStats = oldFailureStats.get(connection).getOrElse(throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history"))
val oldFailureStats = oldState.failureStats.get(connection).getOrElse(FailureStats())
val deviationSum = val deviationSum =
newIntervalsForConnection newIntervalsForConnection
.map(_.toDouble) .map(_.toDouble)
.foldLeft(0.0D)((x, y) x + (y - mean)) .foldLeft(0.0D)((x, y) x + (y - newMean))
val variance: Double = deviationSum / newIntervalsForConnection.size.toDouble val newVariance: Double = deviationSum / newIntervalsForConnection.size.toDouble
val deviation: Double = math.sqrt(variance) val newDeviation: Double = math.sqrt(newVariance)
val newFailureStats = oldFailureStats copy (mean = mean, val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
deviation = deviation, oldFailureStats + (connection -> newFailureStats)
variance = variance)
oldState.failureStats + (connection -> newFailureStats)
} else { } else {
oldState.failureStats oldFailureStats
} }
val intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)
val newState = oldState copy (version = oldState.version + 1, val newState = oldState copy (version = oldState.version + 1,
failureStats = failureStats, failureStats = newFailureStats,
intervalHistory = intervalHistory, intervalHistory = newIntervalHistory,
timestamps = timestamps) timestamps = newTimestamps)
// if we won the race then update else try again // if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -138,17 +142,21 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
def phi(connection: Address): Double = { def phi(connection: Address): Double = {
val oldState = state.get val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection) val oldTimestamp = oldState.timestamps.get(connection)
val phi = val phi =
if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else { else {
val timestampDiff = newTimestamp - oldTimestamp.get val timestampDiff = newTimestamp - oldTimestamp.get
val mean = oldState.failureStats.get(connection).getOrElse(FailureStats()).mean
val stats = oldState.failureStats.get(connection)
val mean = stats.getOrElse(throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")).mean
if (mean == 0.0D) 0.0D if (mean == 0.0D) 0.0D
else PhiFactor * timestampDiff / mean else PhiFactor * timestampDiff / mean
} }
// only log if PHI value is starting to get interesting // only log if PHI value is starting to get interesting
if (phi > 0.0D) log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) if (phi > 0.0D) log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection)
phi phi
} }

View file

@ -210,11 +210,12 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
val failureDetector = new AccrualFailureDetector(
system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress)
private val serialization = remote.serialization private val serialization = remote.serialization
private val failureDetector = new AccrualFailureDetector(
system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val isRunning = new AtomicBoolean(true) private val isRunning = new AtomicBoolean(true)
private val log = Logging(system, "Gossiper") private val log = Logging(system, "Gossiper")
@ -279,12 +280,10 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
if (isRunning.compareAndSet(true, false)) { if (isRunning.compareAndSet(true, false)) {
log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress) log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress)
try connectionManager.shutdown() finally { try system.stop(clusterDaemon) finally {
try system.stop(clusterDaemon) finally { try gossipCanceller.cancel() finally {
try gossipCanceller.cancel() finally { try scrutinizeCanceller.cancel() finally {
try scrutinizeCanceller.cancel() finally { log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress)
log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress)
}
} }
} }
} }
@ -298,6 +297,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
final def joining(node: Address) { final def joining(node: Address) {
log.info("Node [{}] - Node [{}] is joining", remoteAddress, node) log.info("Node [{}] - Node [{}] is joining", remoteAddress, node)
failureDetector heartbeat node // update heartbeat in failure detector
val localState = state.get val localState = state.get
val localGossip = localState.latestGossip val localGossip = localState.latestGossip
val localMembers = localGossip.members val localMembers = localGossip.members
@ -475,7 +476,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
*/ */
private def gossipTo(address: Address) { private def gossipTo(address: Address) {
setUpConnectionTo(address) foreach { connection setUpConnectionTo(address) foreach { connection
log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, address) log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection)
connection ! GossipEnvelope(self, latestGossip) connection ! GossipEnvelope(self, latestGossip)
} }
} }
@ -496,7 +497,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
} }
/** /**
* Scrutinizes the cluster; marks members detected by the failure detector as unavailable. * Scrutinizes the cluster; marks members detected by the failure detector as unreachable.
*/ */
@tailrec @tailrec
final private def scrutinize() { final private def scrutinize() {
@ -517,6 +518,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
val newMembers = localMembers diff newlyDetectedUnreachableMembers val newMembers = localMembers diff newlyDetectedUnreachableMembers
val newUnreachableAddresses: Set[Address] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses val newUnreachableAddresses: Set[Address] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses
log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", "))
val newOverview = localOverview copy (unreachable = newUnreachableAddresses) val newOverview = localOverview copy (unreachable = newUnreachableAddresses)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers)

View file

@ -13,13 +13,13 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
val conn2 = Address("akka", "", "localhost", 2553) val conn2 = Address("akka", "", "localhost", 2553)
"return phi value of 0.0D on startup for each address" in { "return phi value of 0.0D on startup for each address" in {
val fd = new AccrualFailureDetector(system) val fd = new AccrualFailureDetector(system, conn)
fd.phi(conn) must be(0.0D) fd.phi(conn) must be(0.0D)
fd.phi(conn2) must be(0.0D) fd.phi(conn2) must be(0.0D)
} }
"mark node as available after a series of successful heartbeats" in { "mark node as available after a series of successful heartbeats" in {
val fd = new AccrualFailureDetector(system) val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn) fd.heartbeat(conn)
@ -34,7 +34,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
// FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector
"mark node as dead after explicit removal of connection" ignore { "mark node as dead after explicit removal of connection" ignore {
val fd = new AccrualFailureDetector(system) val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn) fd.heartbeat(conn)
@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
} }
"mark node as dead if heartbeat are missed" in { "mark node as dead if heartbeat are missed" in {
val fd = new AccrualFailureDetector(system, threshold = 3) val fd = new AccrualFailureDetector(system, conn, threshold = 3)
fd.heartbeat(conn) fd.heartbeat(conn)
@ -70,7 +70,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
} }
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val fd = new AccrualFailureDetector(system, threshold = 3) val fd = new AccrualFailureDetector(system, conn, threshold = 3)
fd.heartbeat(conn) fd.heartbeat(conn)

View file

@ -1,95 +1,128 @@
// /** /**
// * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
// */ */
// package akka.cluster package akka.cluster
// import java.net.InetSocketAddress import akka.testkit._
import akka.dispatch._
import akka.actor._
import akka.remote._
import akka.util.duration._
// import akka.testkit._ import com.typesafe.config._
// import akka.dispatch._
// import akka.actor._
// import com.typesafe.config._
// class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" import java.net.InetSocketAddress
// akka {
// loglevel = "INFO"
// actor.provider = "akka.remote.RemoteActorRefProvider"
// remote.server.hostname = localhost class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
// remote.server.port = 5550 akka {
// remote.failure-detector.threshold = 3 loglevel = "INFO"
// cluster.seed-nodes = ["akka://localhost:5551"] cluster.failure-detector.threshold = 3
// } actor.debug.lifecycle = on
// """) with ImplicitSender { actor.debug.autoreceive = on
}
""") with ImplicitSender {
// val conn1 = Address("akka", system.systemName, Some("localhost"), Some(5551)) var gossiper1: Gossiper = _
// val node1 = ActorSystem("GossiperSpec", ConfigFactory var gossiper2: Gossiper = _
// .parseString("akka { remote.server.port=5551, cluster.use-cluster = on }") var gossiper3: Gossiper = _
// .withFallback(system.settings.config))
// val remote1 =
// node1.asInstanceOf[ActorSystemImpl]
// .provider.asInstanceOf[RemoteActorRefProvider]
// .remote
// val gossiper1 = remote1.gossiper
// val fd1 = remote1.failureDetector
// gossiper1 must be('defined)
// val conn2 = RemoteNettyAddress("localhost", 5552) var node1: ActorSystemImpl = _
// val node2 = ActorSystem("GossiperSpec", ConfigFactory var node2: ActorSystemImpl = _
// .parseString("akka { remote.server.port=5552, cluster.use-cluster = on }") var node3: ActorSystemImpl = _
// .withFallback(system.settings.config))
// val remote2 =
// node2.asInstanceOf[ActorSystemImpl]
// .provider.asInstanceOf[RemoteActorRefProvider]
// .remote
// val gossiper2 = remote2.gossiper
// val fd2 = remote2.failureDetector
// gossiper2 must be('defined)
// val conn3 = RemoteNettyAddress("localhost", 5553) try {
// val node3 = ActorSystem("GossiperSpec", ConfigFactory "A Gossip-driven Failure Detector" must {
// .parseString("akka { remote.server.port=5553, cluster.use-cluster = on }")
// .withFallback(system.settings.config))
// val remote3 =
// node3.asInstanceOf[ActorSystemImpl]
// .provider.asInstanceOf[RemoteActorRefProvider]
// .remote
// val gossiper3 = remote3.gossiper
// val fd3 = remote3.failureDetector
// gossiper3 must be('defined)
// "A Gossip-driven Failure Detector" must { // ======= NODE 1 ========
node1 = ActorSystem("node1", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port=5550
}
}""")
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider]
gossiper1 = Gossiper(node1, remote1)
val fd1 = gossiper1.failureDetector
val address1 = gossiper1.self.address
// "receive gossip heartbeats so that all healthy nodes in the cluster are marked 'available'" ignore { // ======= NODE 2 ========
// Thread.sleep(5000) // let them gossip for 10 seconds node2 = ActorSystem("node2", ConfigFactory
// fd1.isAvailable(conn2) must be(true) .parseString("""
// fd1.isAvailable(conn3) must be(true) akka {
// fd2.isAvailable(conn1) must be(true) actor.provider = "akka.remote.RemoteActorRefProvider"
// fd2.isAvailable(conn3) must be(true) remote.netty {
// fd3.isAvailable(conn1) must be(true) hostname = localhost
// fd3.isAvailable(conn2) must be(true) port = 5551
// } }
cluster.node-to-join = "akka://node1@localhost:5550"
}""")
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = node2.provider.asInstanceOf[RemoteActorRefProvider]
gossiper2 = Gossiper(node2, remote2)
val fd2 = gossiper2.failureDetector
val address2 = gossiper2.self.address
// "mark node as 'unavailable' if a node in the cluster is shut down and its heartbeats stops" ignore { // ======= NODE 3 ========
// // kill node 3 node3 = ActorSystem("node3", ConfigFactory
// gossiper3.get.shutdown() .parseString("""
// node3.shutdown() akka {
// Thread.sleep(5000) // let them gossip for 10 seconds actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port=5552
}
cluster.node-to-join = "akka://node1@localhost:5550"
}""")
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote3 = node3.provider.asInstanceOf[RemoteActorRefProvider]
gossiper3 = Gossiper(node3, remote3)
val fd3 = gossiper3.failureDetector
val address3 = gossiper3.self.address
// fd1.isAvailable(conn2) must be(true) "receive gossip heartbeats so that all healthy nodes in the cluster are marked 'available'" in {
// fd1.isAvailable(conn3) must be(false) println("Let the nodes gossip for a while...")
// fd2.isAvailable(conn1) must be(true) Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
// fd2.isAvailable(conn3) must be(false) fd1.isAvailable(address2) must be(true)
// } fd1.isAvailable(address3) must be(true)
// } fd2.isAvailable(address1) must be(true)
fd2.isAvailable(address3) must be(true)
fd3.isAvailable(address1) must be(true)
fd3.isAvailable(address2) must be(true)
}
// override def atTermination() { "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" in {
// gossiper1.get.shutdown() // shut down node3
// gossiper2.get.shutdown() gossiper3.shutdown()
// gossiper3.get.shutdown() node3.shutdown()
// node1.shutdown() println("Give the remaning nodes time to detect failure...")
// node2.shutdown() Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of node3
// node3.shutdown() fd1.isAvailable(address2) must be(true)
// // FIXME Ordering problem - If we shut down the ActorSystem before the Gossiper then we get an IllegalStateException fd1.isAvailable(address3) must be(false)
// } fd2.isAvailable(address1) must be(true)
// } fd2.isAvailable(address3) must be(false)
}
}
} catch {
case e: Exception
e.printStackTrace
fail(e.toString)
}
override def atTermination() {
gossiper1.shutdown()
node1.shutdown()
gossiper2.shutdown()
node2.shutdown()
gossiper3.shutdown()
node3.shutdown()
}
}