Merge pull request #550 from akka/wip-2066-phi-patriknw

Improve phi in AccrualFailureDetector, see #2066
This commit is contained in:
patriknw 2012-06-20 02:24:42 -07:00
commit 09e92b6bd9
7 changed files with 363 additions and 151 deletions

View file

@ -43,7 +43,21 @@ akka {
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes
threshold = 8
threshold = 8.0
# Minimum standard deviation to use for the normal distribution in
# AccrualFailureDetector. Too low standard deviation might result in
# too much sensitivity for sudden, but normal, deviations in heartbeat
# inter arrival times.
min-std-deviation = 100 ms
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# It is a factor of heartbeat-interval.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 3s
implementation-class = ""

View file

@ -7,50 +7,103 @@ package akka.cluster
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.remote.RemoteActorRefProvider
import akka.event.Logging
import scala.collection.immutable.Map
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.util.Duration
import akka.util.duration._
object AccrualFailureDetector {
private def realClock: () Long = () NANOSECONDS.toMillis(System.nanoTime)
}
/**
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
* [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
* <p/>
* A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
*
* The suspicion level of failure is given by a value called φ (phi).
* The basic idea of the φ failure detector is to express the value of φ on a scale that
* is dynamically adjusted to reflect current network conditions. A configurable
* threshold is used to decide if φ is considered to be a failure.
*
* The value of φ is calculated as:
*
* {{{
* φ = -log10(1 - F(timeSinceLastHeartbeat)
* }}}
* where F is the cumulative distribution function of a normal distribution with mean
* and standard deviation estimated from historical heartbeat inter-arrival times.
*
*
* @param system Belongs to the [[akka.actor.ActorSystem]]. Used for logging.
*
* @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
* of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
* actual crashes
* <p/>
* Default threshold is 8, but can be configured in the Akka config.
*
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
* inter-arrival times.
*
* @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi.
* Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations
* in heartbeat inter arrival times.
*
* @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed
* heartbeats that will be accepted before considering it to be an anomaly.
* This margin is important to be able to survive sudden, occasional, pauses in heartbeat
* arrivals, due to for example garbage collect or network drop.
*
* @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to
* to this duration, with a with rather high standard deviation (since environment is unknown
* in the beginning)
*
* @clock The clock, returning current time in milliseconds, but can be faked for testing
* purposes. It is only used for measuring intervals (duration).
*
*/
class AccrualFailureDetector(
val system: ActorSystem,
val threshold: Int = 8,
val maxSampleSize: Int = 1000,
val timeMachine: () Long = System.currentTimeMillis) extends FailureDetector {
val threshold: Double,
val maxSampleSize: Int,
val minStdDeviation: Duration,
val acceptableHeartbeatPause: Duration,
val firstHeartbeatEstimate: Duration,
val clock: () Long) extends FailureDetector {
import AccrualFailureDetector._
/**
* Constructor that picks configuration from the settings.
*/
def this(
system: ActorSystem,
settings: ClusterSettings,
timeMachine: () Long = System.currentTimeMillis) =
clock: () Long = AccrualFailureDetector.realClock) =
this(
system,
settings.FailureDetectorThreshold,
settings.FailureDetectorMaxSampleSize,
timeMachine)
private final val PhiFactor = 1.0 / math.log(10.0)
settings.FailureDetectorAcceptableHeartbeatPause,
settings.FailureDetectorMinStdDeviation,
// we use a conservative estimate for the first heartbeat because
// gossip needs to spread back to the joining node before the
// first real heartbeat is sent. Initial heartbeat is added when joining.
// FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed
settings.GossipInterval * 3 + settings.HeartbeatInterval,
clock)
private val log = Logging(system, "FailureDetector")
/**
* Holds the failure statistics for a specific node Address.
*/
private case class FailureStats(mean: Double = 0.0, variance: Double = 0.0, deviation: Double = 0.0)
// guess statistics for first heartbeat,
// important so that connections with only one heartbeat becomes unavailble
private val failureStatsFirstHeartbeat = FailureStats(mean = 1000.0)
// important so that connections with only one heartbeat becomes unavailable
private val firstHeartbeat: HeartbeatHistory = {
// bootstrap with 2 entries with rather high standard deviation
val mean = firstHeartbeatEstimate.toMillis
val stdDeviation = mean / 4
HeartbeatHistory(maxSampleSize) :+ (mean - stdDeviation) :+ (mean + stdDeviation)
}
private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis
/**
* Implement using optimistic lockless concurrency, all state is represented
@ -58,8 +111,7 @@ class AccrualFailureDetector(
*/
private case class State(
version: Long = 0L,
failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats],
intervalHistory: Map[Address, IndexedSeq[Long]] = Map.empty[Address, IndexedSeq[Long]],
history: Map[Address, HeartbeatHistory] = Map.empty,
timestamps: Map[Address, Long] = Map.empty[Address, Long],
explicitRemovals: Set[Address] = Set.empty[Address])
@ -78,95 +130,76 @@ class AccrualFailureDetector(
final def heartbeat(connection: Address) {
log.debug("Heartbeat from connection [{}] ", connection)
val timestamp = clock()
val oldState = state.get
val latestTimestamp = oldState.timestamps.get(connection)
if (latestTimestamp.isEmpty) {
val newHistory = oldState.timestamps.get(connection) match {
case None
// this is heartbeat from a new connection
// add starter records for this new connection
val newState = oldState copy (
version = oldState.version + 1,
failureStats = oldState.failureStats + (connection -> failureStatsFirstHeartbeat),
intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]),
timestamps = oldState.timestamps + (connection -> timeMachine()),
explicitRemovals = oldState.explicitRemovals - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
} else {
firstHeartbeat
case Some(latestTimestamp)
// this is a known connection
val timestamp = timeMachine()
val interval = timestamp - latestTimestamp.get
val newIntervalsForConnection = (oldState.intervalHistory.get(connection) match {
case Some(history) if history.size >= maxSampleSize
// reached max history, drop first interval
history drop 1
case Some(history) history
case _ IndexedSeq.empty[Long]
}) :+ interval
val newFailureStats = {
val newMean: Double = newIntervalsForConnection.sum.toDouble / newIntervalsForConnection.size
val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse {
throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
}
val deviationSum = (0.0d /: newIntervalsForConnection) { (mean, interval)
mean + interval.toDouble - newMean
}
val newVariance: Double = deviationSum / newIntervalsForConnection.size
val newDeviation: Double = math.sqrt(newVariance)
val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
oldState.failureStats + (connection -> newFailureStats)
val interval = timestamp - latestTimestamp
oldState.history(connection) :+ interval
}
val newState = oldState copy (version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection),
history = oldState.history + (connection -> newHistory),
timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp,
explicitRemovals = oldState.explicitRemovals - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
}
}
/**
* Calculates how likely it is that the connection has failed.
* <p/>
* The suspicion level of the accrual failure detector.
*
* If a connection does not have any records in failure detector then it is
* considered healthy.
* <p/>
* Implementations of 'Cumulative Distribution Function' for Exponential Distribution.
* For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597].
*/
def phi(connection: Address): Double = {
val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection)
val phi =
// if connection has been removed explicitly
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = timeMachine() - oldTimestamp.get
val timeDiff = clock() - oldTimestamp.get
val mean = oldState.failureStats.get(connection) match {
case Some(FailureStats(mean, _, _)) mean
case _ throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")
val history = oldState.history(connection)
val mean = history.mean
val stdDeviation = ensureValidStdDeviation(history.stdDeviation)
val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation)
// FIXME change to debug log level, when failure detector is stable
if (φ > 1.0) log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]",
φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")")
φ
}
}
if (mean == 0.0) 0.0
else PhiFactor * timestampDiff / mean
private[cluster] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = {
val cdf = cumulativeDistributionFunction(timeDiff, mean, stdDeviation)
-math.log10(1.0 - cdf)
}
log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
phi
private val minStdDeviationMillis = minStdDeviation.toMillis
private def ensureValidStdDeviation(stdDeviation: Double): Double = math.max(stdDeviation, minStdDeviationMillis)
/**
* Cumulative distribution function for N(mean, stdDeviation) normal distribution.
* This is an approximation defined in β Mathematics Handbook.
*/
private[cluster] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = {
val y = (x - mean) / stdDeviation
// Cumulative distribution function for N(0, 1)
1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
}
/**
@ -177,10 +210,9 @@ class AccrualFailureDetector(
log.debug("Remove connection [{}] ", connection)
val oldState = state.get
if (oldState.failureStats.contains(connection)) {
if (oldState.history.contains(connection)) {
val newState = oldState copy (version = oldState.version + 1,
failureStats = oldState.failureStats - connection,
intervalHistory = oldState.intervalHistory - connection,
history = oldState.history - connection,
timestamps = oldState.timestamps - connection,
explicitRemovals = oldState.explicitRemovals + connection)
@ -189,3 +221,66 @@ class AccrualFailureDetector(
}
}
}
private[cluster] object HeartbeatHistory {
/**
* Create an empty HeartbeatHistory, without any history.
* Can only be used as starting point for appending intervals.
* The stats (mean, variance, stdDeviation) are not defined for
* for empty HeartbeatHistory, i.e. throws AritmeticException.
*/
def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory(
maxSampleSize = maxSampleSize,
intervals = IndexedSeq.empty,
intervalSum = 0L,
squaredIntervalSum = 0L)
}
/**
* Holds the heartbeat statistics for a specific node Address.
* It is capped by the number of samples specified in `maxSampleSize`.
*
* The stats (mean, variance, stdDeviation) are not defined for
* for empty HeartbeatHistory, i.e. throws AritmeticException.
*/
private[cluster] case class HeartbeatHistory private (
maxSampleSize: Int,
intervals: IndexedSeq[Long],
intervalSum: Long,
squaredIntervalSum: Long) {
if (maxSampleSize < 1)
throw new IllegalArgumentException("maxSampleSize must be >= 1, got [%s]" format maxSampleSize)
if (intervalSum < 0L)
throw new IllegalArgumentException("intervalSum must be >= 0, got [%s]" format intervalSum)
if (squaredIntervalSum < 0L)
throw new IllegalArgumentException("squaredIntervalSum must be >= 0, got [%s]" format squaredIntervalSum)
def mean: Double = intervalSum.toDouble / intervals.size
def variance: Double = (squaredIntervalSum.toDouble / intervals.size) - (mean * mean)
def stdDeviation: Double = math.sqrt(variance)
@tailrec
final def :+(interval: Long): HeartbeatHistory = {
if (intervals.size < maxSampleSize)
HeartbeatHistory(
maxSampleSize,
intervals = intervals :+ interval,
intervalSum = intervalSum + interval,
squaredIntervalSum = squaredIntervalSum + pow2(interval))
else
dropOldest :+ interval // recur
}
private def dropOldest: HeartbeatHistory = HeartbeatHistory(
maxSampleSize,
intervals = intervals drop 1,
intervalSum = intervalSum - intervals.head,
squaredIntervalSum = squaredIntervalSum - pow2(intervals.head))
private def pow2(x: Long) = x * x
}

View file

@ -13,12 +13,18 @@ import akka.actor.AddressFromURIString
class ClusterSettings(val config: Config, val systemName: String) {
import config._
final val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match {
case "" None
case fqcn Some(fqcn)
}
final val FailureDetectorMinStdDeviation: Duration =
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
final val FailureDetectorAcceptableHeartbeatPause: Duration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
case "" None
case AddressFromURIString(addr) Some(addr)

View file

@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec
import akka.util.duration._
import akka.testkit._
object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
@ -19,22 +19,22 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class GossipingWithAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class GossipingWithAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class GossipingWithAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
abstract class GossipingAccrualFailureDetectorSpec
extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec)
abstract class ClusterAccrualFailureDetectorSpec
extends MultiNodeSpec(ClusterAccrualFailureDetectorMultiJvmSpec)
with MultiNodeClusterSpec {
import GossipingAccrualFailureDetectorMultiJvmSpec._
import ClusterAccrualFailureDetectorMultiJvmSpec._
"A Gossip-driven Failure Detector" must {
"A heartbeat driven Failure Detector" must {
"receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
"receive heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
5.seconds.dilated.sleep // let them gossip
5.seconds.dilated.sleep // let them heartbeat
cluster.failureDetector.isAvailable(first) must be(true)
cluster.failureDetector.isAvailable(second) must be(true)
cluster.failureDetector.isAvailable(third) must be(true)
@ -47,9 +47,11 @@ abstract class GossipingAccrualFailureDetectorSpec
testConductor.shutdown(third, 0)
}
enterBarrier("third-shutdown")
runOn(first, second) {
// remaning nodes should detect failure...
awaitCond(!cluster.failureDetector.isAvailable(third), 10.seconds)
awaitCond(!cluster.failureDetector.isAvailable(third), 15.seconds)
// other connections still ok
cluster.failureDetector.isAvailable(first) must be(true)
cluster.failureDetector.isAvailable(second) must be(true)

View file

@ -395,7 +395,7 @@ abstract class TransitionSpec
seenLatestGossip must be(Set(fifth))
}
testConductor.enter("after-second-unavailble")
enterBarrier("after-second-unavailble")
// spread the word
val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth)
@ -413,7 +413,7 @@ abstract class TransitionSpec
awaitMemberStatus(second, Down)
}
testConductor.enter("after-second-down")
enterBarrier("after-second-down")
// spread the word
val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth)

View file

@ -6,6 +6,9 @@ package akka.cluster
import akka.actor.Address
import akka.testkit.{ LongRunningTest, AkkaSpec }
import scala.collection.immutable.TreeMap
import akka.util.duration._
import akka.util.Duration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class AccrualFailureDetectorSpec extends AkkaSpec("""
@ -27,33 +30,72 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
timeGenerator
}
val defaultFakeTimeIntervals = Vector.fill(20)(1000L)
def createFailureDetector(
threshold: Double = 8.0,
maxSampleSize: Int = 1000,
minStdDeviation: Duration = 10.millis,
acceptableLostDuration: Duration = Duration.Zero,
firstHeartbeatEstimate: Duration = 1.second,
clock: () Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector =
new AccrualFailureDetector(system,
threshold,
maxSampleSize,
minStdDeviation,
acceptableLostDuration,
firstHeartbeatEstimate = firstHeartbeatEstimate,
clock = clock)
"use good enough cumulative distribution function" in {
val fd = createFailureDetector()
fd.cumulativeDistributionFunction(0.0, 0, 1) must be(0.5 plusOrMinus (0.001))
fd.cumulativeDistributionFunction(0.6, 0, 1) must be(0.7257 plusOrMinus (0.001))
fd.cumulativeDistributionFunction(1.5, 0, 1) must be(0.9332 plusOrMinus (0.001))
fd.cumulativeDistributionFunction(2.0, 0, 1) must be(0.97725 plusOrMinus (0.01))
fd.cumulativeDistributionFunction(2.5, 0, 1) must be(0.9379 plusOrMinus (0.1))
fd.cumulativeDistributionFunction(3.5, 0, 1) must be(0.99977 plusOrMinus (0.1))
fd.cumulativeDistributionFunction(4.0, 0, 1) must be(0.99997 plusOrMinus (0.1))
for (x :: y :: Nil (0.0 to 4.0 by 0.1).toList.sliding(2)) {
fd.cumulativeDistributionFunction(x, 0, 1) must be < (
fd.cumulativeDistributionFunction(y, 0, 1))
}
fd.cumulativeDistributionFunction(2.2, 2.0, 0.3) must be(0.7475 plusOrMinus (0.001))
}
"return realistic phi values" in {
val fd = createFailureDetector()
val test = TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3)
for ((timeDiff, expectedPhi) test) {
fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1))
}
// larger stdDeviation results => lower phi
fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 500.0) must be < (
fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 100.0))
}
"return phi value of 0.0 on startup for each address, when no heartbeats" in {
val fd = new AccrualFailureDetector(system)
val fd = createFailureDetector()
fd.phi(conn) must be(0.0)
fd.phi(conn2) must be(0.0)
}
"return phi based on guess when only one heartbeat" in {
// 1 second ticks
val timeInterval = Vector.fill(30)(1000L)
val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval))
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000)
val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds,
clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
// let time go
for (n 2 to 8)
fd.phi(conn) must be < (4.0)
for (n 9 to 18)
fd.phi(conn) must be < (8.0)
fd.phi(conn) must be > (8.0)
fd.phi(conn) must be(0.3 plusOrMinus 0.2)
fd.phi(conn) must be(4.5 plusOrMinus 0.3)
fd.phi(conn) must be > (15.0)
}
"return phi value using first interval after second heartbeat" in {
val timeInterval = List[Long](0, 100, 100, 100)
val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval))
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
@ -63,8 +105,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as available after a series of successful heartbeats" in {
val timeInterval = List[Long](0, 1000, 100, 100)
val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval))
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
@ -75,8 +116,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as dead after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval))
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
@ -89,8 +129,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval))
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
@ -112,40 +151,65 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as dead if heartbeat are missed" in {
val timeInterval = List[Long](0, 1000, 100, 100, 5000)
val timeInterval = List[Long](0, 1000, 100, 100, 7000)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, threshold = 3,
timeMachine = fakeTimeGenerator(timeInterval))
val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false) //6200
fd.isAvailable(conn) must be(false) //8200
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
val fd = new AccrualFailureDetector(system, threshold = 3,
timeMachine = fakeTimeGenerator(timeInterval))
val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100)
val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false) //6200
fd.heartbeat(conn) //6300
fd.heartbeat(conn) //7300
fd.heartbeat(conn) //7400
fd.isAvailable(conn) must be(false) //8200
fd.heartbeat(conn) //8300
fd.heartbeat(conn) //9300
fd.heartbeat(conn) //9400
fd.isAvailable(conn) must be(true) //7500
fd.isAvailable(conn) must be(true) //9500
}
"accept some configured missing heartbeats" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000)
val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
}
"fail after configured acceptable missing heartbeats" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000)
val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(false)
}
"use maxSampleSize heartbeats" in {
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
val fd = new AccrualFailureDetector(system, maxSampleSize = 3,
timeMachine = fakeTimeGenerator(timeInterval))
val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval))
// 100 ms interval
fd.heartbeat(conn) //0
@ -163,4 +227,33 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
}
"Statistics for heartbeats" must {
"calculate correct mean and variance" in {
val samples = Seq(100, 200, 125, 340, 130)
val stats = (HeartbeatHistory(maxSampleSize = 20) /: samples) { (stats, value) stats :+ value }
stats.mean must be(179.0 plusOrMinus 0.00001)
stats.variance must be(7584.0 plusOrMinus 0.00001)
}
"have 0.0 variance for one sample" in {
(HeartbeatHistory(600) :+ 1000L).variance must be(0.0 plusOrMinus 0.00001)
}
"be capped by the specified maxSampleSize" in {
val history3 = HeartbeatHistory(maxSampleSize = 3) :+ 100 :+ 110 :+ 90
history3.mean must be(100.0 plusOrMinus 0.00001)
history3.variance must be(66.6666667 plusOrMinus 0.00001)
val history4 = history3 :+ 140
history4.mean must be(113.333333 plusOrMinus 0.00001)
history4.variance must be(422.222222 plusOrMinus 0.00001)
val history5 = history4 :+ 80
history5.mean must be(103.333333 plusOrMinus 0.00001)
history5.variance must be(688.88888889 plusOrMinus 0.00001)
}
}
}

View file

@ -16,9 +16,11 @@ class ClusterConfigSpec extends AkkaSpec {
"be able to parse generic cluster config elements" in {
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
FailureDetectorThreshold must be(8)
FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001)
FailureDetectorMaxSampleSize must be(1000)
FailureDetectorImplementationClass must be(None)
FailureDetectorMinStdDeviation must be(100 millis)
FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
NodeToJoin must be(None)
PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second)