diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 8ee9f857a0..e0d7cae052 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -23,14 +23,17 @@ import System.{ currentTimeMillis ⇒ newTimestamp } *
* Default threshold is 8, but can be configured in the Akka config. */ -class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000) { +class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000) { private final val PhiFactor = 1.0 / math.log(10.0) - private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) - private val log = Logging(system, "FailureDetector") + /** + * Holds the failure statistics for a specific node Address. + */ + private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) + /** * Implement using optimistic lockless concurrency, all state is represented * by this immutable case class and managed by an AtomicReference. @@ -54,22 +57,26 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma */ @tailrec final def heartbeat(connection: Address) { - log.debug("Heartbeat from connection [{}] ", connection) - val oldState = state.get + log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) + val oldState = state.get + val oldFailureStats = oldState.failureStats + val oldTimestamps = oldState.timestamps val latestTimestamp = oldState.timestamps.get(connection) + if (latestTimestamp.isEmpty) { // this is heartbeat from a new connection // add starter records for this new connection - val failureStats = oldState.failureStats + (connection -> FailureStats()) - val intervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) - val timestamps = oldState.timestamps + (connection -> newTimestamp) + val newFailureStats = oldFailureStats + (connection -> FailureStats()) + val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) + val newTimestamps = oldTimestamps + (connection -> newTimestamp) - val newState = oldState copy (version = oldState.version + 1, - failureStats = failureStats, - intervalHistory = intervalHistory, - timestamps = timestamps) + val newState = oldState copy ( + version = oldState.version + 1, + failureStats = newFailureStats, + intervalHistory = newIntervalHistory, + timestamps = newTimestamps) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -79,7 +86,7 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma val timestamp = newTimestamp val interval = timestamp - latestTimestamp.get - val timestamps = oldState.timestamps + (connection -> timestamp) // record new timestamp + val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp var newIntervalsForConnection = oldState.intervalHistory.get(connection).getOrElse(Vector.empty[Long]) :+ interval // append the new interval to history @@ -89,36 +96,33 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma newIntervalsForConnection = newIntervalsForConnection drop 0 } - val failureStats = + val newFailureStats = if (newIntervalsForConnection.size > 1) { - val mean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble - - val oldFailureStats = oldState.failureStats.get(connection).getOrElse(FailureStats()) + val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble + val oldConnectionFailureStats = oldFailureStats.get(connection).getOrElse(throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")) val deviationSum = newIntervalsForConnection .map(_.toDouble) - .foldLeft(0.0D)((x, y) ⇒ x + (y - mean)) + .foldLeft(0.0D)((x, y) ⇒ x + (y - newMean)) - val variance: Double = deviationSum / newIntervalsForConnection.size.toDouble - val deviation: Double = math.sqrt(variance) + val newVariance: Double = deviationSum / newIntervalsForConnection.size.toDouble + val newDeviation: Double = math.sqrt(newVariance) - val newFailureStats = oldFailureStats copy (mean = mean, - deviation = deviation, - variance = variance) + val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance) + oldFailureStats + (connection -> newFailureStats) - oldState.failureStats + (connection -> newFailureStats) } else { - oldState.failureStats + oldFailureStats } - val intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) + val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) val newState = oldState copy (version = oldState.version + 1, - failureStats = failureStats, - intervalHistory = intervalHistory, - timestamps = timestamps) + failureStats = newFailureStats, + intervalHistory = newIntervalHistory, + timestamps = newTimestamps) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -138,17 +142,21 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma def phi(connection: Address): Double = { val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) + val phi = if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timestampDiff = newTimestamp - oldTimestamp.get - val mean = oldState.failureStats.get(connection).getOrElse(FailureStats()).mean + + val stats = oldState.failureStats.get(connection) + val mean = stats.getOrElse(throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")).mean + if (mean == 0.0D) 0.0D else PhiFactor * timestampDiff / mean } // only log if PHI value is starting to get interesting - if (phi > 0.0D) log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) + if (phi > 0.0D) log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) phi } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index bb1e19e746..73575efec7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -210,11 +210,12 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) + val failureDetector = new AccrualFailureDetector( + system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) private val serialization = remote.serialization - private val failureDetector = new AccrualFailureDetector( - system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) private val isRunning = new AtomicBoolean(true) private val log = Logging(system, "Gossiper") @@ -279,12 +280,10 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { if (isRunning.compareAndSet(true, false)) { log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress) - try connectionManager.shutdown() finally { - try system.stop(clusterDaemon) finally { - try gossipCanceller.cancel() finally { - try scrutinizeCanceller.cancel() finally { - log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress) - } + try system.stop(clusterDaemon) finally { + try gossipCanceller.cancel() finally { + try scrutinizeCanceller.cancel() finally { + log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress) } } } @@ -298,6 +297,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { final def joining(node: Address) { log.info("Node [{}] - Node [{}] is joining", remoteAddress, node) + failureDetector heartbeat node // update heartbeat in failure detector + val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -475,7 +476,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { */ private def gossipTo(address: Address) { setUpConnectionTo(address) foreach { connection ⇒ - log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, address) + log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection) connection ! GossipEnvelope(self, latestGossip) } } @@ -496,7 +497,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { } /** - * Scrutinizes the cluster; marks members detected by the failure detector as unavailable. + * Scrutinizes the cluster; marks members detected by the failure detector as unreachable. */ @tailrec final private def scrutinize() { @@ -517,6 +518,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { val newMembers = localMembers diff newlyDetectedUnreachableMembers val newUnreachableAddresses: Set[Address] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses + log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", ")) + val newOverview = localOverview copy (unreachable = newUnreachableAddresses) val newGossip = localGossip copy (overview = newOverview, members = newMembers) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 034f582e0d..2e00c72ad1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -13,13 +13,13 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" val conn2 = Address("akka", "", "localhost", 2553) "return phi value of 0.0D on startup for each address" in { - val fd = new AccrualFailureDetector(system) + val fd = new AccrualFailureDetector(system, conn) fd.phi(conn) must be(0.0D) fd.phi(conn2) must be(0.0D) } "mark node as available after a series of successful heartbeats" in { - val fd = new AccrualFailureDetector(system) + val fd = new AccrualFailureDetector(system, conn) fd.heartbeat(conn) @@ -34,7 +34,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector "mark node as dead after explicit removal of connection" ignore { - val fd = new AccrualFailureDetector(system) + val fd = new AccrualFailureDetector(system, conn) fd.heartbeat(conn) @@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as dead if heartbeat are missed" in { - val fd = new AccrualFailureDetector(system, threshold = 3) + val fd = new AccrualFailureDetector(system, conn, threshold = 3) fd.heartbeat(conn) @@ -70,7 +70,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { - val fd = new AccrualFailureDetector(system, threshold = 3) + val fd = new AccrualFailureDetector(system, conn, threshold = 3) fd.heartbeat(conn) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 6366a9f65e..413ab7e537 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -1,95 +1,128 @@ -// /** -// * Copyright (C) 2009-2011 Typesafe Inc.