From 410fd6ca58d8bd444d18b6f047f06962566a9ee7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 18 Jun 2012 11:10:59 +0200 Subject: [PATCH 1/3] Improve phi in AccrualFailureDetector, see #2066 * Implementation of phi according to the paper * Config properties and documentation, min-std-deviation, * acceptable-lost-heartbeats * Restructure code, HeartbeatHistory is responsible for stats from historical heartbeats * Correct and efficient calculation of mean and standard deviation * More tests --- .../src/main/resources/reference.conf | 16 +- .../akka/cluster/AccrualFailureDetector.scala | 285 ++++++++++++------ .../scala/akka/cluster/ClusterSettings.scala | 6 +- .../cluster/AccrualFailureDetectorSpec.scala | 161 +++++++--- .../akka/cluster/ClusterConfigSpec.scala | 4 +- 5 files changed, 335 insertions(+), 137 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b9104fe6cf..90d02d4fd1 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -43,7 +43,21 @@ akka { # a quick detection in the event of a real crash. Conversely, a high # threshold generates fewer mistakes but needs more time to detect # actual crashes - threshold = 8 + threshold = 8.0 + + # Minimum standard deviation to use for the normal distribution in + # AccrualFailureDetector. Too low standard deviation might result in + # too much sensitivity for sudden, but normal, deviations in heartbeat + # inter arrival times. + min-std-deviation = 100 ms + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # It is a factor of heartbeat-interval. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-lost-heartbeats = 3.0 implementation-class = "" diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 6632111f00..1dfac252fe 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -7,50 +7,89 @@ package akka.cluster import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } import akka.remote.RemoteActorRefProvider import akka.event.Logging - import scala.collection.immutable.Map import scala.annotation.tailrec - import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.TimeUnit.NANOSECONDS +import akka.util.Duration +import akka.util.duration._ +object AccrualFailureDetector { + private def realTimeMachine: () ⇒ Long = () ⇒ NANOSECONDS.toMillis(System.nanoTime) +} /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf] - *

- * A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event - * of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect - * actual crashes - *

- * Default threshold is 8, but can be configured in the Akka config. + * + * @param system Belongs to the [[akka.actor.ActorSystem]]. Used for logging. + * + * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event + * of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect + * actual crashes + * + * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of + * inter-arrival times. + * + * @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi. + * Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations + * in heartbeat inter arrival times. + * + * @param acceptableLostDuration Duration corresponding to number of potentially lost/delayed + * heartbeats that will be accepted before considering it to be an anomaly. + * This margin is important to be able to survive sudden, occasional, pauses in heartbeat + * arrivals, due to for example garbage collect or network drop. + * + * @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to + * to this duration, with a with rather high standard deviation (since environment is unknown + * in the beginning) + * + * @timeMachine The clock, returning time in milliseconds, but can be faked for testing + * purposes. It is only used for measuring intervals (duration). + * */ class AccrualFailureDetector( val system: ActorSystem, - val threshold: Int = 8, - val maxSampleSize: Int = 1000, - val timeMachine: () ⇒ Long = System.currentTimeMillis) extends FailureDetector { + val threshold: Double, + val maxSampleSize: Int, + val minStdDeviation: Duration, + val acceptableLostDuration: Duration, + val firstHeartbeatEstimate: Duration, + val timeMachine: () ⇒ Long) extends FailureDetector { + import AccrualFailureDetector._ + + /** + * Constructor that picks configuration from the settings. + */ def this( system: ActorSystem, settings: ClusterSettings, - timeMachine: () ⇒ Long = System.currentTimeMillis) = + timeMachine: () ⇒ Long = AccrualFailureDetector.realTimeMachine) = this( system, settings.FailureDetectorThreshold, settings.FailureDetectorMaxSampleSize, + settings.HeartbeatInterval * settings.FailureDetectorAcceptableLostHeartbeats, + settings.FailureDetectorMinStdDeviation, + // we use a conservative estimate for the first heartbeat because + // gossip needs to spread back to the joining node before the + // first real heartbeat is sent. Initial heartbeat is added when joining. + // FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed + settings.GossipInterval * 3 + settings.HeartbeatInterval, timeMachine) - private final val PhiFactor = 1.0 / math.log(10.0) - private val log = Logging(system, "FailureDetector") - /** - * Holds the failure statistics for a specific node Address. - */ - private case class FailureStats(mean: Double = 0.0, variance: Double = 0.0, deviation: Double = 0.0) - // guess statistics for first heartbeat, - // important so that connections with only one heartbeat becomes unavailble - private val failureStatsFirstHeartbeat = FailureStats(mean = 1000.0) + // important so that connections with only one heartbeat becomes unavailable + private val firstHeartbeat: HeartbeatHistory = { + // bootstrap with 2 entries with rather high standard deviation + val mean = firstHeartbeatEstimate.toMillis + val stdDeviation = mean / 4 + HeartbeatHistory(maxSampleSize) :+ (mean - stdDeviation) :+ (mean + stdDeviation) + } + + private val acceptableLostMillis = acceptableLostDuration.toMillis /** * Implement using optimistic lockless concurrency, all state is represented @@ -58,8 +97,7 @@ class AccrualFailureDetector( */ private case class State( version: Long = 0L, - failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats], - intervalHistory: Map[Address, IndexedSeq[Long]] = Map.empty[Address, IndexedSeq[Long]], + history: Map[Address, HeartbeatHistory] = Map.empty, timestamps: Map[Address, Long] = Map.empty[Address, Long], explicitRemovals: Set[Address] = Set.empty[Address]) @@ -78,96 +116,88 @@ class AccrualFailureDetector( final def heartbeat(connection: Address) { log.debug("Heartbeat from connection [{}] ", connection) + val timestamp = timeMachine() val oldState = state.get - val latestTimestamp = oldState.timestamps.get(connection) - if (latestTimestamp.isEmpty) { - // this is heartbeat from a new connection - // add starter records for this new connection - val newState = oldState copy ( - version = oldState.version + 1, - failureStats = oldState.failureStats + (connection -> failureStatsFirstHeartbeat), - intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]), - timestamps = oldState.timestamps + (connection -> timeMachine()), - explicitRemovals = oldState.explicitRemovals - connection) - - // if we won the race then update else try again - if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur - - } else { - // this is a known connection - val timestamp = timeMachine() - val interval = timestamp - latestTimestamp.get - - val newIntervalsForConnection = (oldState.intervalHistory.get(connection) match { - case Some(history) if history.size >= maxSampleSize ⇒ - // reached max history, drop first interval - history drop 1 - case Some(history) ⇒ history - case _ ⇒ IndexedSeq.empty[Long] - }) :+ interval - - val newFailureStats = { - val newMean: Double = newIntervalsForConnection.sum.toDouble / newIntervalsForConnection.size - - val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse { - throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history") - } - - val deviationSum = (0.0d /: newIntervalsForConnection) { (mean, interval) ⇒ - mean + interval.toDouble - newMean - } - - val newVariance: Double = deviationSum / newIntervalsForConnection.size - val newDeviation: Double = math.sqrt(newVariance) - - val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance) - oldState.failureStats + (connection -> newFailureStats) - } - - val newState = oldState copy (version = oldState.version + 1, - failureStats = newFailureStats, - intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection), - timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp, - explicitRemovals = oldState.explicitRemovals - connection) - - // if we won the race then update else try again - if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur + val newHistory = oldState.timestamps.get(connection) match { + case None ⇒ + // this is heartbeat from a new connection + // add starter records for this new connection + firstHeartbeat + case (Some(latestTimestamp)) ⇒ + // this is a known connection + val interval = timestamp - latestTimestamp + oldState.history(connection) :+ interval } + + val newState = oldState copy (version = oldState.version + 1, + history = oldState.history + (connection -> newHistory), + timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp, + explicitRemovals = oldState.explicitRemovals - connection) + + // if we won the race then update else try again + if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur } /** - * Calculates how likely it is that the connection has failed. - *

+ * The suspicion level of accrual failure detector is given by a value called φ (phi). + * The basic idea of the φ failure detector is to express the value of φ on a scale that + * is dynamically adjusted to reflect current network conditions. + * + * The value of φ is calculated as: + * + * {{{ + * φ = -log10(1 - F(timeSinceLastHeartbeat) + * }}} + * where F is the cumulative distribution function of a normal distribution with mean + * and standard deviation estimated from historical heartbeat inter-arrival times. + * * If a connection does not have any records in failure detector then it is * considered healthy. - *

- * Implementations of 'Cumulative Distribution Function' for Exponential Distribution. - * For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597]. + * */ def phi(connection: Address): Double = { val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) - val phi = - // if connection has been removed explicitly - if (oldState.explicitRemovals.contains(connection)) Double.MaxValue - else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections - else { - val timestampDiff = timeMachine() - oldTimestamp.get + // if connection has been removed explicitly + if (oldState.explicitRemovals.contains(connection)) Double.MaxValue + else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections + else { + val timeDiff = timeMachine() - oldTimestamp.get - val mean = oldState.failureStats.get(connection) match { - case Some(FailureStats(mean, _, _)) ⇒ mean - case _ ⇒ throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history") - } + val history = oldState.history(connection) + val mean = history.mean + val stdDeviation = ensureValidStdDeviation(history.stdDeviation) - if (mean == 0.0) 0.0 - else PhiFactor * timestampDiff / mean - } + val φ = phi(timeDiff, mean + acceptableLostMillis, stdDeviation) - // FIXME change to debug log level, when failure detector is stable - log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) - phi + // FIXME change to debug log level, when failure detector is stable + if (φ > 1.0) + log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]", + φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")") + + φ + } + } + + private[cluster] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = { + val cdf = cumulativeDistributionFunction(timeDiff, mean, stdDeviation) + -math.log10(1.0 - cdf) + } + + private val minStdDeviationMillis = minStdDeviation.toMillis + + private def ensureValidStdDeviation(stdDeviation: Double): Double = math.max(stdDeviation, minStdDeviationMillis) + + /** + * Cumulative distribution function for N(mean, stdDeviation) normal distribution. + * This is an approximation defined in β Mathematics Handbook. + */ + private[cluster] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = { + val y = (x - mean) / stdDeviation + // Cumulative distribution function for N(0, 1) + 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))) } /** @@ -178,10 +208,9 @@ class AccrualFailureDetector( log.debug("Remove connection [{}] ", connection) val oldState = state.get - if (oldState.failureStats.contains(connection)) { + if (oldState.history.contains(connection)) { val newState = oldState copy (version = oldState.version + 1, - failureStats = oldState.failureStats - connection, - intervalHistory = oldState.intervalHistory - connection, + history = oldState.history - connection, timestamps = oldState.timestamps - connection, explicitRemovals = oldState.explicitRemovals + connection) @@ -190,3 +219,59 @@ class AccrualFailureDetector( } } } + +private[cluster] object HeartbeatHistory { + + /** + * Create an empty HeartbeatHistory, without any history. + * Can only be used as starting point for appending intervals. + * The stats (mean, variance, stdDeviation) are not defined for + * for empty HeartbeatHistory, i.e. throws AritmeticException. + */ + def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory( + maxSampleSize = maxSampleSize, + intervals = IndexedSeq.empty, + intervalSum = 0L, + interval2Sum = 0L) + +} + +/** + * Holds the heartbeat statistics for a specific node Address. + * It is capped by the number of samples specified in `maxSampleSize`. + * + * The stats (mean, variance, stdDeviation) are not defined for + * for empty HeartbeatHistory, i.e. throws AritmeticException. + */ +private[cluster] case class HeartbeatHistory private ( + maxSampleSize: Int, + intervals: IndexedSeq[Long], + intervalSum: Long, + interval2Sum: Long) { + + def mean: Double = intervalSum.toDouble / intervals.size + + def variance: Double = (interval2Sum.toDouble / intervals.size) - (mean * mean) + + def stdDeviation: Double = math.sqrt(variance) + + @tailrec + final def :+(interval: Long): HeartbeatHistory = { + if (intervals.size < maxSampleSize) + HeartbeatHistory( + maxSampleSize, + intervals = intervals :+ interval, + intervalSum = intervalSum + interval, + interval2Sum = interval2Sum + pow2(interval)) + else + dropOldest :+ interval // recur + } + + private def dropOldest: HeartbeatHistory = HeartbeatHistory( + maxSampleSize, + intervals = intervals drop 1, + intervalSum = intervalSum - intervals.head, + interval2Sum = interval2Sum - pow2(intervals.head)) + + private def pow2(x: Long) = x * x +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index ee4f6a03d2..f1e0c2d31b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -13,12 +13,16 @@ import akka.actor.AddressFromURIString class ClusterSettings(val config: Config, val systemName: String) { import config._ - final val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") + + final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold") final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { case "" ⇒ None case fqcn ⇒ Some(fqcn) } + final val FailureDetectorMinStdDeviation: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) + final val FailureDetectorAcceptableLostHeartbeats: Double = getDouble("akka.cluster.failure-detector.acceptable-lost-heartbeats") + final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index bd4d5d2c52..081fc9f0fd 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -6,6 +6,9 @@ package akka.cluster import akka.actor.Address import akka.testkit.{ LongRunningTest, AkkaSpec } +import scala.collection.immutable.TreeMap +import akka.util.duration._ +import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class AccrualFailureDetectorSpec extends AkkaSpec(""" @@ -27,33 +30,72 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" timeGenerator } + val defaultFakeTimeIntervals = Vector.fill(20)(1000L) + def createFailureDetector( + threshold: Double = 8.0, + maxSampleSize: Int = 1000, + minStdDeviation: Duration = 10.millis, + acceptableLostDuration: Duration = Duration.Zero, + firstHeartbeatEstimate: Duration = 1.second, + timeMachine: () ⇒ Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector = + new AccrualFailureDetector(system, + threshold, + maxSampleSize, + minStdDeviation, + acceptableLostDuration, + firstHeartbeatEstimate = firstHeartbeatEstimate, + timeMachine = timeMachine) + + "use good enough cumulative distribution function" in { + val fd = createFailureDetector() + fd.cumulativeDistributionFunction(0.0, 0, 1) must be(0.5 plusOrMinus (0.001)) + fd.cumulativeDistributionFunction(0.6, 0, 1) must be(0.7257 plusOrMinus (0.001)) + fd.cumulativeDistributionFunction(1.5, 0, 1) must be(0.9332 plusOrMinus (0.001)) + fd.cumulativeDistributionFunction(2.0, 0, 1) must be(0.97725 plusOrMinus (0.01)) + fd.cumulativeDistributionFunction(2.5, 0, 1) must be(0.9379 plusOrMinus (0.1)) + fd.cumulativeDistributionFunction(3.5, 0, 1) must be(0.99977 plusOrMinus (0.1)) + fd.cumulativeDistributionFunction(4.0, 0, 1) must be(0.99997 plusOrMinus (0.1)) + + for (x :: y :: Nil ← (0.0 to 4.0 by 0.1).toList.sliding(2)) { + fd.cumulativeDistributionFunction(x, 0, 1) must be < ( + fd.cumulativeDistributionFunction(y, 0, 1)) + } + + fd.cumulativeDistributionFunction(2.2, 2.0, 0.3) must be(0.7475 plusOrMinus (0.001)) + } + + "return realistic phi values" in { + val fd = createFailureDetector() + val test = TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3) + for ((timeDiff, expectedPhi) ← test) { + fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1)) + } + + // larger stdDeviation results => lower phi + fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 500.0) must be < ( + fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 100.0)) + } + "return phi value of 0.0 on startup for each address, when no heartbeats" in { - val fd = new AccrualFailureDetector(system) + val fd = createFailureDetector() fd.phi(conn) must be(0.0) fd.phi(conn2) must be(0.0) } "return phi based on guess when only one heartbeat" in { - // 1 second ticks - val timeInterval = Vector.fill(30)(1000L) - val fd = new AccrualFailureDetector(system, + val timeInterval = List[Long](0, 1000, 1000, 1000, 1000) + val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) - fd.phi(conn) must be > (0.0) - // let time go - for (n ← 2 to 8) - fd.phi(conn) must be < (4.0) - for (n ← 9 to 18) - fd.phi(conn) must be < (8.0) - - fd.phi(conn) must be > (8.0) + fd.phi(conn) must be(0.3 plusOrMinus 0.2) + fd.phi(conn) must be(4.5 plusOrMinus 0.3) + fd.phi(conn) must be > (15.0) } "return phi value using first interval after second heartbeat" in { val timeInterval = List[Long](0, 100, 100, 100) - val fd = new AccrualFailureDetector(system, - timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.phi(conn) must be > (0.0) @@ -63,8 +105,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, - timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.heartbeat(conn) @@ -75,8 +116,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) - val fd = new AccrualFailureDetector(system, - timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.heartbeat(conn) @@ -89,8 +129,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val fd = new AccrualFailureDetector(system, - timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -112,40 +151,65 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as dead if heartbeat are missed" in { - val timeInterval = List[Long](0, 1000, 100, 100, 5000) + val timeInterval = List[Long](0, 1000, 100, 100, 7000) val ft = fakeTimeGenerator(timeInterval) - val fd = new AccrualFailureDetector(system, threshold = 3, - timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 fd.heartbeat(conn) //1000 fd.heartbeat(conn) //1100 fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) //6200 + fd.isAvailable(conn) must be(false) //8200 } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { - val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, threshold = 3, - timeMachine = fakeTimeGenerator(timeInterval)) + val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100) + val fd = createFailureDetector(threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 fd.heartbeat(conn) //1000 fd.heartbeat(conn) //1100 fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) //6200 - fd.heartbeat(conn) //6300 - fd.heartbeat(conn) //7300 - fd.heartbeat(conn) //7400 + fd.isAvailable(conn) must be(false) //8200 + fd.heartbeat(conn) //8300 + fd.heartbeat(conn) //9300 + fd.heartbeat(conn) //9400 - fd.isAvailable(conn) must be(true) //7500 + fd.isAvailable(conn) must be(true) //9500 + } + + "accept some configured missing heartbeats" in { + val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000) + val fd = createFailureDetector(acceptableLostDuration = 3.seconds, timeMachine = fakeTimeGenerator(timeInterval)) + + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.isAvailable(conn) must be(true) + fd.heartbeat(conn) + fd.isAvailable(conn) must be(true) + } + + "fail after configured acceptable missing heartbeats" in { + val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000) + val fd = createFailureDetector(acceptableLostDuration = 3.seconds, timeMachine = fakeTimeGenerator(timeInterval)) + + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.heartbeat(conn) + fd.isAvailable(conn) must be(true) + fd.heartbeat(conn) + fd.isAvailable(conn) must be(false) } "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val fd = new AccrualFailureDetector(system, maxSampleSize = 3, - timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(maxSampleSize = 3, timeMachine = fakeTimeGenerator(timeInterval)) // 100 ms interval fd.heartbeat(conn) //0 @@ -163,4 +227,33 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } } + + "Statistics for heartbeats" must { + + "calculate correct mean and variance" in { + val samples = Seq(100, 200, 125, 340, 130) + val stats = (HeartbeatHistory(maxSampleSize = 20) /: samples) { (stats, value) ⇒ stats :+ value } + stats.mean must be(179.0 plusOrMinus 0.00001) + stats.variance must be(7584.0 plusOrMinus 0.00001) + } + + "have 0.0 variance for one sample" in { + (HeartbeatHistory(600) :+ 1000L).variance must be(0.0 plusOrMinus 0.00001) + } + + "be capped by the specified maxSampleSize" in { + val history3 = HeartbeatHistory(maxSampleSize = 3) :+ 100 :+ 110 :+ 90 + history3.mean must be(100.0 plusOrMinus 0.00001) + history3.variance must be(66.6666667 plusOrMinus 0.00001) + + val history4 = history3 :+ 140 + history4.mean must be(113.333333 plusOrMinus 0.00001) + history4.variance must be(422.222222 plusOrMinus 0.00001) + + val history5 = history4 :+ 80 + history5.mean must be(103.333333 plusOrMinus 0.00001) + history5.variance must be(688.88888889 plusOrMinus 0.00001) + + } + } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 481d9f7e5a..ab8ffcf157 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -16,9 +16,11 @@ class ClusterConfigSpec extends AkkaSpec { "be able to parse generic cluster config elements" in { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - FailureDetectorThreshold must be(8) + FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001) FailureDetectorMaxSampleSize must be(1000) FailureDetectorImplementationClass must be(None) + FailureDetectorMinStdDeviation must be(100 millis) + FailureDetectorAcceptableLostHeartbeats must be(3.0 plusOrMinus 0.0001) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) From e02310847129d456dd726528ade3500f0786c68f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jun 2012 10:18:15 +0200 Subject: [PATCH 2/3] Incorporated feedback from review, see #2066 --- .../src/main/resources/reference.conf | 2 +- .../akka/cluster/AccrualFailureDetector.scala | 75 +++++++++++-------- .../scala/akka/cluster/ClusterSettings.scala | 6 +- .../cluster/AccrualFailureDetectorSpec.scala | 24 +++--- .../akka/cluster/ClusterConfigSpec.scala | 2 +- 5 files changed, 60 insertions(+), 49 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 90d02d4fd1..8bf51b4fa5 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -57,7 +57,7 @@ akka { # This margin is important to be able to survive sudden, occasional, # pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-lost-heartbeats = 3.0 + acceptable-heartbeat-pause = 3s implementation-class = "" diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 1dfac252fe..b10962ce11 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -15,12 +15,26 @@ import akka.util.Duration import akka.util.duration._ object AccrualFailureDetector { - private def realTimeMachine: () ⇒ Long = () ⇒ NANOSECONDS.toMillis(System.nanoTime) + private def realClock: () ⇒ Long = () ⇒ NANOSECONDS.toMillis(System.nanoTime) } /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf] * + * The suspicion level of failure is given by a value called φ (phi). + * The basic idea of the φ failure detector is to express the value of φ on a scale that + * is dynamically adjusted to reflect current network conditions. A configurable + * threshold is used to decide if φ is considered to be a failure. + * + * The value of φ is calculated as: + * + * {{{ + * φ = -log10(1 - F(timeSinceLastHeartbeat) + * }}} + * where F is the cumulative distribution function of a normal distribution with mean + * and standard deviation estimated from historical heartbeat inter-arrival times. + * + * * @param system Belongs to the [[akka.actor.ActorSystem]]. Used for logging. * * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event @@ -34,7 +48,7 @@ object AccrualFailureDetector { * Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations * in heartbeat inter arrival times. * - * @param acceptableLostDuration Duration corresponding to number of potentially lost/delayed + * @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed * heartbeats that will be accepted before considering it to be an anomaly. * This margin is important to be able to survive sudden, occasional, pauses in heartbeat * arrivals, due to for example garbage collect or network drop. @@ -43,7 +57,7 @@ object AccrualFailureDetector { * to this duration, with a with rather high standard deviation (since environment is unknown * in the beginning) * - * @timeMachine The clock, returning time in milliseconds, but can be faked for testing + * @clock The clock, returning current time in milliseconds, but can be faked for testing * purposes. It is only used for measuring intervals (duration). * */ @@ -52,9 +66,9 @@ class AccrualFailureDetector( val threshold: Double, val maxSampleSize: Int, val minStdDeviation: Duration, - val acceptableLostDuration: Duration, + val acceptableHeartbeatPause: Duration, val firstHeartbeatEstimate: Duration, - val timeMachine: () ⇒ Long) extends FailureDetector { + val clock: () ⇒ Long) extends FailureDetector { import AccrualFailureDetector._ @@ -64,19 +78,19 @@ class AccrualFailureDetector( def this( system: ActorSystem, settings: ClusterSettings, - timeMachine: () ⇒ Long = AccrualFailureDetector.realTimeMachine) = + clock: () ⇒ Long = AccrualFailureDetector.realClock) = this( system, settings.FailureDetectorThreshold, settings.FailureDetectorMaxSampleSize, - settings.HeartbeatInterval * settings.FailureDetectorAcceptableLostHeartbeats, + settings.FailureDetectorAcceptableHeartbeatPause, settings.FailureDetectorMinStdDeviation, // we use a conservative estimate for the first heartbeat because // gossip needs to spread back to the joining node before the // first real heartbeat is sent. Initial heartbeat is added when joining. // FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed settings.GossipInterval * 3 + settings.HeartbeatInterval, - timeMachine) + clock) private val log = Logging(system, "FailureDetector") @@ -89,7 +103,7 @@ class AccrualFailureDetector( HeartbeatHistory(maxSampleSize) :+ (mean - stdDeviation) :+ (mean + stdDeviation) } - private val acceptableLostMillis = acceptableLostDuration.toMillis + private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis /** * Implement using optimistic lockless concurrency, all state is represented @@ -116,7 +130,7 @@ class AccrualFailureDetector( final def heartbeat(connection: Address) { log.debug("Heartbeat from connection [{}] ", connection) - val timestamp = timeMachine() + val timestamp = clock() val oldState = state.get val newHistory = oldState.timestamps.get(connection) match { @@ -124,7 +138,7 @@ class AccrualFailureDetector( // this is heartbeat from a new connection // add starter records for this new connection firstHeartbeat - case (Some(latestTimestamp)) ⇒ + case Some(latestTimestamp) ⇒ // this is a known connection val interval = timestamp - latestTimestamp oldState.history(connection) :+ interval @@ -140,21 +154,10 @@ class AccrualFailureDetector( } /** - * The suspicion level of accrual failure detector is given by a value called φ (phi). - * The basic idea of the φ failure detector is to express the value of φ on a scale that - * is dynamically adjusted to reflect current network conditions. - * - * The value of φ is calculated as: - * - * {{{ - * φ = -log10(1 - F(timeSinceLastHeartbeat) - * }}} - * where F is the cumulative distribution function of a normal distribution with mean - * and standard deviation estimated from historical heartbeat inter-arrival times. + * The suspicion level of the accrual failure detector. * * If a connection does not have any records in failure detector then it is * considered healthy. - * */ def phi(connection: Address): Double = { val oldState = state.get @@ -164,18 +167,17 @@ class AccrualFailureDetector( if (oldState.explicitRemovals.contains(connection)) Double.MaxValue else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { - val timeDiff = timeMachine() - oldTimestamp.get + val timeDiff = clock() - oldTimestamp.get val history = oldState.history(connection) val mean = history.mean val stdDeviation = ensureValidStdDeviation(history.stdDeviation) - val φ = phi(timeDiff, mean + acceptableLostMillis, stdDeviation) + val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation) // FIXME change to debug log level, when failure detector is stable - if (φ > 1.0) - log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]", - φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")") + if (φ > 1.0) log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]", + φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")") φ } @@ -232,7 +234,7 @@ private[cluster] object HeartbeatHistory { maxSampleSize = maxSampleSize, intervals = IndexedSeq.empty, intervalSum = 0L, - interval2Sum = 0L) + squaredIntervalSum = 0L) } @@ -247,11 +249,18 @@ private[cluster] case class HeartbeatHistory private ( maxSampleSize: Int, intervals: IndexedSeq[Long], intervalSum: Long, - interval2Sum: Long) { + squaredIntervalSum: Long) { + + if (maxSampleSize < 1) + throw new IllegalArgumentException("maxSampleSize must be >= 1, got [%s]" format maxSampleSize) + if (intervalSum < 0L) + throw new IllegalArgumentException("intervalSum must be >= 0, got [%s]" format intervalSum) + if (squaredIntervalSum < 0L) + throw new IllegalArgumentException("squaredIntervalSum must be >= 0, got [%s]" format squaredIntervalSum) def mean: Double = intervalSum.toDouble / intervals.size - def variance: Double = (interval2Sum.toDouble / intervals.size) - (mean * mean) + def variance: Double = (squaredIntervalSum.toDouble / intervals.size) - (mean * mean) def stdDeviation: Double = math.sqrt(variance) @@ -262,7 +271,7 @@ private[cluster] case class HeartbeatHistory private ( maxSampleSize, intervals = intervals :+ interval, intervalSum = intervalSum + interval, - interval2Sum = interval2Sum + pow2(interval)) + squaredIntervalSum = squaredIntervalSum + pow2(interval)) else dropOldest :+ interval // recur } @@ -271,7 +280,7 @@ private[cluster] case class HeartbeatHistory private ( maxSampleSize, intervals = intervals drop 1, intervalSum = intervalSum - intervals.head, - interval2Sum = interval2Sum - pow2(intervals.head)) + squaredIntervalSum = squaredIntervalSum - pow2(intervals.head)) private def pow2(x: Long) = x * x } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index f1e0c2d31b..e54e74617d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -20,8 +20,10 @@ class ClusterSettings(val config: Config, val systemName: String) { case "" ⇒ None case fqcn ⇒ Some(fqcn) } - final val FailureDetectorMinStdDeviation: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) - final val FailureDetectorAcceptableLostHeartbeats: Double = getDouble("akka.cluster.failure-detector.acceptable-lost-heartbeats") + final val FailureDetectorMinStdDeviation: Duration = + Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) + final val FailureDetectorAcceptableHeartbeatPause: Duration = + Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 081fc9f0fd..5c7186502c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -37,14 +37,14 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" minStdDeviation: Duration = 10.millis, acceptableLostDuration: Duration = Duration.Zero, firstHeartbeatEstimate: Duration = 1.second, - timeMachine: () ⇒ Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector = + clock: () ⇒ Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector = new AccrualFailureDetector(system, threshold, maxSampleSize, minStdDeviation, acceptableLostDuration, firstHeartbeatEstimate = firstHeartbeatEstimate, - timeMachine = timeMachine) + clock = clock) "use good enough cumulative distribution function" in { val fd = createFailureDetector() @@ -85,7 +85,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi based on guess when only one heartbeat" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 1000) val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds, - timeMachine = fakeTimeGenerator(timeInterval)) + clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.phi(conn) must be(0.3 plusOrMinus 0.2) @@ -95,7 +95,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi value using first interval after second heartbeat" in { val timeInterval = List[Long](0, 100, 100, 100) - val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.phi(conn) must be > (0.0) @@ -105,7 +105,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.heartbeat(conn) @@ -116,7 +116,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) - val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.heartbeat(conn) @@ -129,7 +129,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -153,7 +153,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 7000) val ft = fakeTimeGenerator(timeInterval) - val fd = createFailureDetector(threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 fd.heartbeat(conn) //1000 @@ -165,7 +165,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100) - val fd = createFailureDetector(threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 fd.heartbeat(conn) //1000 @@ -181,7 +181,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "accept some configured missing heartbeats" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000) - val fd = createFailureDetector(acceptableLostDuration = 3.seconds, timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.heartbeat(conn) @@ -194,7 +194,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "fail after configured acceptable missing heartbeats" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000) - val fd = createFailureDetector(acceptableLostDuration = 3.seconds, timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) fd.heartbeat(conn) @@ -209,7 +209,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val fd = createFailureDetector(maxSampleSize = 3, timeMachine = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval)) // 100 ms interval fd.heartbeat(conn) //0 diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index ab8ffcf157..91fab3aea3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -20,7 +20,7 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMaxSampleSize must be(1000) FailureDetectorImplementationClass must be(None) FailureDetectorMinStdDeviation must be(100 millis) - FailureDetectorAcceptableLostHeartbeats must be(3.0 plusOrMinus 0.0001) + FailureDetectorAcceptableHeartbeatPause must be(3 seconds) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) From 4b2316a56b843c5ceac9084f7ac58304f56305dc Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jun 2012 11:06:47 +0200 Subject: [PATCH 3/3] Rename GossipingAccrualFailureDetectorSpec --- ...> ClusterAccrualFailureDetectorSpec.scala} | 24 ++++++++++--------- .../scala/akka/cluster/TransitionSpec.scala | 4 ++-- 2 files changed, 15 insertions(+), 13 deletions(-) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{GossipingAccrualFailureDetectorSpec.scala => ClusterAccrualFailureDetectorSpec.scala} (61%) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala similarity index 61% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index b52695dcf1..d5d41b52aa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.util.duration._ import akka.testkit._ -object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { +object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -19,22 +19,22 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class GossipingWithAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class GossipingWithAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class GossipingWithAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -abstract class GossipingAccrualFailureDetectorSpec - extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) +abstract class ClusterAccrualFailureDetectorSpec + extends MultiNodeSpec(ClusterAccrualFailureDetectorMultiJvmSpec) with MultiNodeClusterSpec { - import GossipingAccrualFailureDetectorMultiJvmSpec._ + import ClusterAccrualFailureDetectorMultiJvmSpec._ - "A Gossip-driven Failure Detector" must { + "A heartbeat driven Failure Detector" must { - "receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in { + "receive heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) - 5.seconds.dilated.sleep // let them gossip + 5.seconds.dilated.sleep // let them heartbeat cluster.failureDetector.isAvailable(first) must be(true) cluster.failureDetector.isAvailable(second) must be(true) cluster.failureDetector.isAvailable(third) must be(true) @@ -47,9 +47,11 @@ abstract class GossipingAccrualFailureDetectorSpec testConductor.shutdown(third, 0) } + enterBarrier("third-shutdown") + runOn(first, second) { // remaning nodes should detect failure... - awaitCond(!cluster.failureDetector.isAvailable(third), 10.seconds) + awaitCond(!cluster.failureDetector.isAvailable(third), 15.seconds) // other connections still ok cluster.failureDetector.isAvailable(first) must be(true) cluster.failureDetector.isAvailable(second) must be(true) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 6330772ef6..ce31d8fe0e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -396,7 +396,7 @@ abstract class TransitionSpec seenLatestGossip must be(Set(fifth)) } - testConductor.enter("after-second-unavailble") + enterBarrier("after-second-unavailble") // spread the word val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth) @@ -414,7 +414,7 @@ abstract class TransitionSpec awaitMemberStatus(second, Down) } - testConductor.enter("after-second-down") + enterBarrier("after-second-down") // spread the word val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth)