Improve phi in AccrualFailureDetector, see #2066
* Implementation of phi according to the paper * Config properties and documentation, min-std-deviation, * acceptable-lost-heartbeats * Restructure code, HeartbeatHistory is responsible for stats from historical heartbeats * Correct and efficient calculation of mean and standard deviation * More tests
This commit is contained in:
parent
9862afab84
commit
410fd6ca58
5 changed files with 335 additions and 137 deletions
|
|
@ -43,7 +43,21 @@ akka {
|
||||||
# a quick detection in the event of a real crash. Conversely, a high
|
# a quick detection in the event of a real crash. Conversely, a high
|
||||||
# threshold generates fewer mistakes but needs more time to detect
|
# threshold generates fewer mistakes but needs more time to detect
|
||||||
# actual crashes
|
# actual crashes
|
||||||
threshold = 8
|
threshold = 8.0
|
||||||
|
|
||||||
|
# Minimum standard deviation to use for the normal distribution in
|
||||||
|
# AccrualFailureDetector. Too low standard deviation might result in
|
||||||
|
# too much sensitivity for sudden, but normal, deviations in heartbeat
|
||||||
|
# inter arrival times.
|
||||||
|
min-std-deviation = 100 ms
|
||||||
|
|
||||||
|
# Number of potentially lost/delayed heartbeats that will be
|
||||||
|
# accepted before considering it to be an anomaly.
|
||||||
|
# It is a factor of heartbeat-interval.
|
||||||
|
# This margin is important to be able to survive sudden, occasional,
|
||||||
|
# pauses in heartbeat arrivals, due to for example garbage collect or
|
||||||
|
# network drop.
|
||||||
|
acceptable-lost-heartbeats = 3.0
|
||||||
|
|
||||||
implementation-class = ""
|
implementation-class = ""
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,50 +7,89 @@ package akka.cluster
|
||||||
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
|
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
|
||||||
import akka.remote.RemoteActorRefProvider
|
import akka.remote.RemoteActorRefProvider
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
|
|
||||||
import scala.collection.immutable.Map
|
import scala.collection.immutable.Map
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
import java.util.concurrent.TimeUnit.NANOSECONDS
|
||||||
|
import akka.util.Duration
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
|
object AccrualFailureDetector {
|
||||||
|
private def realTimeMachine: () ⇒ Long = () ⇒ NANOSECONDS.toMillis(System.nanoTime)
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
|
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
|
||||||
* [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
|
* [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
|
||||||
* <p/>
|
*
|
||||||
* A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
|
* @param system Belongs to the [[akka.actor.ActorSystem]]. Used for logging.
|
||||||
|
*
|
||||||
|
* @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
|
||||||
* of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
|
* of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
|
||||||
* actual crashes
|
* actual crashes
|
||||||
* <p/>
|
*
|
||||||
* Default threshold is 8, but can be configured in the Akka config.
|
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
|
||||||
|
* inter-arrival times.
|
||||||
|
*
|
||||||
|
* @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi.
|
||||||
|
* Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations
|
||||||
|
* in heartbeat inter arrival times.
|
||||||
|
*
|
||||||
|
* @param acceptableLostDuration Duration corresponding to number of potentially lost/delayed
|
||||||
|
* heartbeats that will be accepted before considering it to be an anomaly.
|
||||||
|
* This margin is important to be able to survive sudden, occasional, pauses in heartbeat
|
||||||
|
* arrivals, due to for example garbage collect or network drop.
|
||||||
|
*
|
||||||
|
* @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to
|
||||||
|
* to this duration, with a with rather high standard deviation (since environment is unknown
|
||||||
|
* in the beginning)
|
||||||
|
*
|
||||||
|
* @timeMachine The clock, returning time in milliseconds, but can be faked for testing
|
||||||
|
* purposes. It is only used for measuring intervals (duration).
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
class AccrualFailureDetector(
|
class AccrualFailureDetector(
|
||||||
val system: ActorSystem,
|
val system: ActorSystem,
|
||||||
val threshold: Int = 8,
|
val threshold: Double,
|
||||||
val maxSampleSize: Int = 1000,
|
val maxSampleSize: Int,
|
||||||
val timeMachine: () ⇒ Long = System.currentTimeMillis) extends FailureDetector {
|
val minStdDeviation: Duration,
|
||||||
|
val acceptableLostDuration: Duration,
|
||||||
|
val firstHeartbeatEstimate: Duration,
|
||||||
|
val timeMachine: () ⇒ Long) extends FailureDetector {
|
||||||
|
|
||||||
|
import AccrualFailureDetector._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that picks configuration from the settings.
|
||||||
|
*/
|
||||||
def this(
|
def this(
|
||||||
system: ActorSystem,
|
system: ActorSystem,
|
||||||
settings: ClusterSettings,
|
settings: ClusterSettings,
|
||||||
timeMachine: () ⇒ Long = System.currentTimeMillis) =
|
timeMachine: () ⇒ Long = AccrualFailureDetector.realTimeMachine) =
|
||||||
this(
|
this(
|
||||||
system,
|
system,
|
||||||
settings.FailureDetectorThreshold,
|
settings.FailureDetectorThreshold,
|
||||||
settings.FailureDetectorMaxSampleSize,
|
settings.FailureDetectorMaxSampleSize,
|
||||||
|
settings.HeartbeatInterval * settings.FailureDetectorAcceptableLostHeartbeats,
|
||||||
|
settings.FailureDetectorMinStdDeviation,
|
||||||
|
// we use a conservative estimate for the first heartbeat because
|
||||||
|
// gossip needs to spread back to the joining node before the
|
||||||
|
// first real heartbeat is sent. Initial heartbeat is added when joining.
|
||||||
|
// FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed
|
||||||
|
settings.GossipInterval * 3 + settings.HeartbeatInterval,
|
||||||
timeMachine)
|
timeMachine)
|
||||||
|
|
||||||
private final val PhiFactor = 1.0 / math.log(10.0)
|
|
||||||
|
|
||||||
private val log = Logging(system, "FailureDetector")
|
private val log = Logging(system, "FailureDetector")
|
||||||
|
|
||||||
/**
|
|
||||||
* Holds the failure statistics for a specific node Address.
|
|
||||||
*/
|
|
||||||
private case class FailureStats(mean: Double = 0.0, variance: Double = 0.0, deviation: Double = 0.0)
|
|
||||||
|
|
||||||
// guess statistics for first heartbeat,
|
// guess statistics for first heartbeat,
|
||||||
// important so that connections with only one heartbeat becomes unavailble
|
// important so that connections with only one heartbeat becomes unavailable
|
||||||
private val failureStatsFirstHeartbeat = FailureStats(mean = 1000.0)
|
private val firstHeartbeat: HeartbeatHistory = {
|
||||||
|
// bootstrap with 2 entries with rather high standard deviation
|
||||||
|
val mean = firstHeartbeatEstimate.toMillis
|
||||||
|
val stdDeviation = mean / 4
|
||||||
|
HeartbeatHistory(maxSampleSize) :+ (mean - stdDeviation) :+ (mean + stdDeviation)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val acceptableLostMillis = acceptableLostDuration.toMillis
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implement using optimistic lockless concurrency, all state is represented
|
* Implement using optimistic lockless concurrency, all state is represented
|
||||||
|
|
@ -58,8 +97,7 @@ class AccrualFailureDetector(
|
||||||
*/
|
*/
|
||||||
private case class State(
|
private case class State(
|
||||||
version: Long = 0L,
|
version: Long = 0L,
|
||||||
failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats],
|
history: Map[Address, HeartbeatHistory] = Map.empty,
|
||||||
intervalHistory: Map[Address, IndexedSeq[Long]] = Map.empty[Address, IndexedSeq[Long]],
|
|
||||||
timestamps: Map[Address, Long] = Map.empty[Address, Long],
|
timestamps: Map[Address, Long] = Map.empty[Address, Long],
|
||||||
explicitRemovals: Set[Address] = Set.empty[Address])
|
explicitRemovals: Set[Address] = Set.empty[Address])
|
||||||
|
|
||||||
|
|
@ -78,96 +116,88 @@ class AccrualFailureDetector(
|
||||||
final def heartbeat(connection: Address) {
|
final def heartbeat(connection: Address) {
|
||||||
log.debug("Heartbeat from connection [{}] ", connection)
|
log.debug("Heartbeat from connection [{}] ", connection)
|
||||||
|
|
||||||
|
val timestamp = timeMachine()
|
||||||
val oldState = state.get
|
val oldState = state.get
|
||||||
val latestTimestamp = oldState.timestamps.get(connection)
|
|
||||||
|
|
||||||
if (latestTimestamp.isEmpty) {
|
val newHistory = oldState.timestamps.get(connection) match {
|
||||||
|
case None ⇒
|
||||||
// this is heartbeat from a new connection
|
// this is heartbeat from a new connection
|
||||||
// add starter records for this new connection
|
// add starter records for this new connection
|
||||||
val newState = oldState copy (
|
firstHeartbeat
|
||||||
version = oldState.version + 1,
|
case (Some(latestTimestamp)) ⇒
|
||||||
failureStats = oldState.failureStats + (connection -> failureStatsFirstHeartbeat),
|
|
||||||
intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]),
|
|
||||||
timestamps = oldState.timestamps + (connection -> timeMachine()),
|
|
||||||
explicitRemovals = oldState.explicitRemovals - connection)
|
|
||||||
|
|
||||||
// if we won the race then update else try again
|
|
||||||
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// this is a known connection
|
// this is a known connection
|
||||||
val timestamp = timeMachine()
|
val interval = timestamp - latestTimestamp
|
||||||
val interval = timestamp - latestTimestamp.get
|
oldState.history(connection) :+ interval
|
||||||
|
|
||||||
val newIntervalsForConnection = (oldState.intervalHistory.get(connection) match {
|
|
||||||
case Some(history) if history.size >= maxSampleSize ⇒
|
|
||||||
// reached max history, drop first interval
|
|
||||||
history drop 1
|
|
||||||
case Some(history) ⇒ history
|
|
||||||
case _ ⇒ IndexedSeq.empty[Long]
|
|
||||||
}) :+ interval
|
|
||||||
|
|
||||||
val newFailureStats = {
|
|
||||||
val newMean: Double = newIntervalsForConnection.sum.toDouble / newIntervalsForConnection.size
|
|
||||||
|
|
||||||
val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse {
|
|
||||||
throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
|
|
||||||
}
|
|
||||||
|
|
||||||
val deviationSum = (0.0d /: newIntervalsForConnection) { (mean, interval) ⇒
|
|
||||||
mean + interval.toDouble - newMean
|
|
||||||
}
|
|
||||||
|
|
||||||
val newVariance: Double = deviationSum / newIntervalsForConnection.size
|
|
||||||
val newDeviation: Double = math.sqrt(newVariance)
|
|
||||||
|
|
||||||
val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
|
|
||||||
oldState.failureStats + (connection -> newFailureStats)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val newState = oldState copy (version = oldState.version + 1,
|
val newState = oldState copy (version = oldState.version + 1,
|
||||||
failureStats = newFailureStats,
|
history = oldState.history + (connection -> newHistory),
|
||||||
intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection),
|
|
||||||
timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp,
|
timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp,
|
||||||
explicitRemovals = oldState.explicitRemovals - connection)
|
explicitRemovals = oldState.explicitRemovals - connection)
|
||||||
|
|
||||||
// if we won the race then update else try again
|
// if we won the race then update else try again
|
||||||
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
|
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculates how likely it is that the connection has failed.
|
* The suspicion level of accrual failure detector is given by a value called φ (phi).
|
||||||
* <p/>
|
* The basic idea of the φ failure detector is to express the value of φ on a scale that
|
||||||
|
* is dynamically adjusted to reflect current network conditions.
|
||||||
|
*
|
||||||
|
* The value of φ is calculated as:
|
||||||
|
*
|
||||||
|
* {{{
|
||||||
|
* φ = -log10(1 - F(timeSinceLastHeartbeat)
|
||||||
|
* }}}
|
||||||
|
* where F is the cumulative distribution function of a normal distribution with mean
|
||||||
|
* and standard deviation estimated from historical heartbeat inter-arrival times.
|
||||||
|
*
|
||||||
* If a connection does not have any records in failure detector then it is
|
* If a connection does not have any records in failure detector then it is
|
||||||
* considered healthy.
|
* considered healthy.
|
||||||
* <p/>
|
*
|
||||||
* Implementations of 'Cumulative Distribution Function' for Exponential Distribution.
|
|
||||||
* For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597].
|
|
||||||
*/
|
*/
|
||||||
def phi(connection: Address): Double = {
|
def phi(connection: Address): Double = {
|
||||||
val oldState = state.get
|
val oldState = state.get
|
||||||
val oldTimestamp = oldState.timestamps.get(connection)
|
val oldTimestamp = oldState.timestamps.get(connection)
|
||||||
|
|
||||||
val phi =
|
|
||||||
// if connection has been removed explicitly
|
// if connection has been removed explicitly
|
||||||
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
|
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
|
||||||
else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
|
else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
|
||||||
else {
|
else {
|
||||||
val timestampDiff = timeMachine() - oldTimestamp.get
|
val timeDiff = timeMachine() - oldTimestamp.get
|
||||||
|
|
||||||
val mean = oldState.failureStats.get(connection) match {
|
val history = oldState.history(connection)
|
||||||
case Some(FailureStats(mean, _, _)) ⇒ mean
|
val mean = history.mean
|
||||||
case _ ⇒ throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")
|
val stdDeviation = ensureValidStdDeviation(history.stdDeviation)
|
||||||
}
|
|
||||||
|
|
||||||
if (mean == 0.0) 0.0
|
val φ = phi(timeDiff, mean + acceptableLostMillis, stdDeviation)
|
||||||
else PhiFactor * timestampDiff / mean
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME change to debug log level, when failure detector is stable
|
// FIXME change to debug log level, when failure detector is stable
|
||||||
log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
|
if (φ > 1.0)
|
||||||
phi
|
log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]",
|
||||||
|
φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")")
|
||||||
|
|
||||||
|
φ
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private[cluster] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = {
|
||||||
|
val cdf = cumulativeDistributionFunction(timeDiff, mean, stdDeviation)
|
||||||
|
-math.log10(1.0 - cdf)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val minStdDeviationMillis = minStdDeviation.toMillis
|
||||||
|
|
||||||
|
private def ensureValidStdDeviation(stdDeviation: Double): Double = math.max(stdDeviation, minStdDeviationMillis)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cumulative distribution function for N(mean, stdDeviation) normal distribution.
|
||||||
|
* This is an approximation defined in β Mathematics Handbook.
|
||||||
|
*/
|
||||||
|
private[cluster] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = {
|
||||||
|
val y = (x - mean) / stdDeviation
|
||||||
|
// Cumulative distribution function for N(0, 1)
|
||||||
|
1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -178,10 +208,9 @@ class AccrualFailureDetector(
|
||||||
log.debug("Remove connection [{}] ", connection)
|
log.debug("Remove connection [{}] ", connection)
|
||||||
val oldState = state.get
|
val oldState = state.get
|
||||||
|
|
||||||
if (oldState.failureStats.contains(connection)) {
|
if (oldState.history.contains(connection)) {
|
||||||
val newState = oldState copy (version = oldState.version + 1,
|
val newState = oldState copy (version = oldState.version + 1,
|
||||||
failureStats = oldState.failureStats - connection,
|
history = oldState.history - connection,
|
||||||
intervalHistory = oldState.intervalHistory - connection,
|
|
||||||
timestamps = oldState.timestamps - connection,
|
timestamps = oldState.timestamps - connection,
|
||||||
explicitRemovals = oldState.explicitRemovals + connection)
|
explicitRemovals = oldState.explicitRemovals + connection)
|
||||||
|
|
||||||
|
|
@ -190,3 +219,59 @@ class AccrualFailureDetector(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[cluster] object HeartbeatHistory {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an empty HeartbeatHistory, without any history.
|
||||||
|
* Can only be used as starting point for appending intervals.
|
||||||
|
* The stats (mean, variance, stdDeviation) are not defined for
|
||||||
|
* for empty HeartbeatHistory, i.e. throws AritmeticException.
|
||||||
|
*/
|
||||||
|
def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory(
|
||||||
|
maxSampleSize = maxSampleSize,
|
||||||
|
intervals = IndexedSeq.empty,
|
||||||
|
intervalSum = 0L,
|
||||||
|
interval2Sum = 0L)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds the heartbeat statistics for a specific node Address.
|
||||||
|
* It is capped by the number of samples specified in `maxSampleSize`.
|
||||||
|
*
|
||||||
|
* The stats (mean, variance, stdDeviation) are not defined for
|
||||||
|
* for empty HeartbeatHistory, i.e. throws AritmeticException.
|
||||||
|
*/
|
||||||
|
private[cluster] case class HeartbeatHistory private (
|
||||||
|
maxSampleSize: Int,
|
||||||
|
intervals: IndexedSeq[Long],
|
||||||
|
intervalSum: Long,
|
||||||
|
interval2Sum: Long) {
|
||||||
|
|
||||||
|
def mean: Double = intervalSum.toDouble / intervals.size
|
||||||
|
|
||||||
|
def variance: Double = (interval2Sum.toDouble / intervals.size) - (mean * mean)
|
||||||
|
|
||||||
|
def stdDeviation: Double = math.sqrt(variance)
|
||||||
|
|
||||||
|
@tailrec
|
||||||
|
final def :+(interval: Long): HeartbeatHistory = {
|
||||||
|
if (intervals.size < maxSampleSize)
|
||||||
|
HeartbeatHistory(
|
||||||
|
maxSampleSize,
|
||||||
|
intervals = intervals :+ interval,
|
||||||
|
intervalSum = intervalSum + interval,
|
||||||
|
interval2Sum = interval2Sum + pow2(interval))
|
||||||
|
else
|
||||||
|
dropOldest :+ interval // recur
|
||||||
|
}
|
||||||
|
|
||||||
|
private def dropOldest: HeartbeatHistory = HeartbeatHistory(
|
||||||
|
maxSampleSize,
|
||||||
|
intervals = intervals drop 1,
|
||||||
|
intervalSum = intervalSum - intervals.head,
|
||||||
|
interval2Sum = interval2Sum - pow2(intervals.head))
|
||||||
|
|
||||||
|
private def pow2(x: Long) = x * x
|
||||||
|
}
|
||||||
|
|
@ -13,12 +13,16 @@ import akka.actor.AddressFromURIString
|
||||||
|
|
||||||
class ClusterSettings(val config: Config, val systemName: String) {
|
class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
import config._
|
import config._
|
||||||
final val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
|
|
||||||
|
final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
|
||||||
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
|
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
|
||||||
final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match {
|
final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match {
|
||||||
case "" ⇒ None
|
case "" ⇒ None
|
||||||
case fqcn ⇒ Some(fqcn)
|
case fqcn ⇒ Some(fqcn)
|
||||||
}
|
}
|
||||||
|
final val FailureDetectorMinStdDeviation: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
|
||||||
|
final val FailureDetectorAcceptableLostHeartbeats: Double = getDouble("akka.cluster.failure-detector.acceptable-lost-heartbeats")
|
||||||
|
|
||||||
final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
|
final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
|
||||||
case "" ⇒ None
|
case "" ⇒ None
|
||||||
case AddressFromURIString(addr) ⇒ Some(addr)
|
case AddressFromURIString(addr) ⇒ Some(addr)
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,9 @@ package akka.cluster
|
||||||
|
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import akka.testkit.{ LongRunningTest, AkkaSpec }
|
import akka.testkit.{ LongRunningTest, AkkaSpec }
|
||||||
|
import scala.collection.immutable.TreeMap
|
||||||
|
import akka.util.duration._
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class AccrualFailureDetectorSpec extends AkkaSpec("""
|
class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
|
|
@ -27,33 +30,72 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
timeGenerator
|
timeGenerator
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val defaultFakeTimeIntervals = Vector.fill(20)(1000L)
|
||||||
|
def createFailureDetector(
|
||||||
|
threshold: Double = 8.0,
|
||||||
|
maxSampleSize: Int = 1000,
|
||||||
|
minStdDeviation: Duration = 10.millis,
|
||||||
|
acceptableLostDuration: Duration = Duration.Zero,
|
||||||
|
firstHeartbeatEstimate: Duration = 1.second,
|
||||||
|
timeMachine: () ⇒ Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector =
|
||||||
|
new AccrualFailureDetector(system,
|
||||||
|
threshold,
|
||||||
|
maxSampleSize,
|
||||||
|
minStdDeviation,
|
||||||
|
acceptableLostDuration,
|
||||||
|
firstHeartbeatEstimate = firstHeartbeatEstimate,
|
||||||
|
timeMachine = timeMachine)
|
||||||
|
|
||||||
|
"use good enough cumulative distribution function" in {
|
||||||
|
val fd = createFailureDetector()
|
||||||
|
fd.cumulativeDistributionFunction(0.0, 0, 1) must be(0.5 plusOrMinus (0.001))
|
||||||
|
fd.cumulativeDistributionFunction(0.6, 0, 1) must be(0.7257 plusOrMinus (0.001))
|
||||||
|
fd.cumulativeDistributionFunction(1.5, 0, 1) must be(0.9332 plusOrMinus (0.001))
|
||||||
|
fd.cumulativeDistributionFunction(2.0, 0, 1) must be(0.97725 plusOrMinus (0.01))
|
||||||
|
fd.cumulativeDistributionFunction(2.5, 0, 1) must be(0.9379 plusOrMinus (0.1))
|
||||||
|
fd.cumulativeDistributionFunction(3.5, 0, 1) must be(0.99977 plusOrMinus (0.1))
|
||||||
|
fd.cumulativeDistributionFunction(4.0, 0, 1) must be(0.99997 plusOrMinus (0.1))
|
||||||
|
|
||||||
|
for (x :: y :: Nil ← (0.0 to 4.0 by 0.1).toList.sliding(2)) {
|
||||||
|
fd.cumulativeDistributionFunction(x, 0, 1) must be < (
|
||||||
|
fd.cumulativeDistributionFunction(y, 0, 1))
|
||||||
|
}
|
||||||
|
|
||||||
|
fd.cumulativeDistributionFunction(2.2, 2.0, 0.3) must be(0.7475 plusOrMinus (0.001))
|
||||||
|
}
|
||||||
|
|
||||||
|
"return realistic phi values" in {
|
||||||
|
val fd = createFailureDetector()
|
||||||
|
val test = TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3)
|
||||||
|
for ((timeDiff, expectedPhi) ← test) {
|
||||||
|
fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// larger stdDeviation results => lower phi
|
||||||
|
fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 500.0) must be < (
|
||||||
|
fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 100.0))
|
||||||
|
}
|
||||||
|
|
||||||
"return phi value of 0.0 on startup for each address, when no heartbeats" in {
|
"return phi value of 0.0 on startup for each address, when no heartbeats" in {
|
||||||
val fd = new AccrualFailureDetector(system)
|
val fd = createFailureDetector()
|
||||||
fd.phi(conn) must be(0.0)
|
fd.phi(conn) must be(0.0)
|
||||||
fd.phi(conn2) must be(0.0)
|
fd.phi(conn2) must be(0.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
"return phi based on guess when only one heartbeat" in {
|
"return phi based on guess when only one heartbeat" in {
|
||||||
// 1 second ticks
|
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000)
|
||||||
val timeInterval = Vector.fill(30)(1000L)
|
val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds,
|
||||||
val fd = new AccrualFailureDetector(system,
|
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
|
|
||||||
fd.heartbeat(conn)
|
fd.heartbeat(conn)
|
||||||
fd.phi(conn) must be > (0.0)
|
fd.phi(conn) must be(0.3 plusOrMinus 0.2)
|
||||||
// let time go
|
fd.phi(conn) must be(4.5 plusOrMinus 0.3)
|
||||||
for (n ← 2 to 8)
|
fd.phi(conn) must be > (15.0)
|
||||||
fd.phi(conn) must be < (4.0)
|
|
||||||
for (n ← 9 to 18)
|
|
||||||
fd.phi(conn) must be < (8.0)
|
|
||||||
|
|
||||||
fd.phi(conn) must be > (8.0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"return phi value using first interval after second heartbeat" in {
|
"return phi value using first interval after second heartbeat" in {
|
||||||
val timeInterval = List[Long](0, 100, 100, 100)
|
val timeInterval = List[Long](0, 100, 100, 100)
|
||||||
val fd = new AccrualFailureDetector(system,
|
val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
fd.heartbeat(conn)
|
fd.heartbeat(conn)
|
||||||
fd.phi(conn) must be > (0.0)
|
fd.phi(conn) must be > (0.0)
|
||||||
|
|
@ -63,8 +105,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"mark node as available after a series of successful heartbeats" in {
|
"mark node as available after a series of successful heartbeats" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 100)
|
val timeInterval = List[Long](0, 1000, 100, 100)
|
||||||
val fd = new AccrualFailureDetector(system,
|
val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
fd.heartbeat(conn)
|
fd.heartbeat(conn)
|
||||||
fd.heartbeat(conn)
|
fd.heartbeat(conn)
|
||||||
|
|
@ -75,8 +116,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"mark node as dead after explicit removal of connection" in {
|
"mark node as dead after explicit removal of connection" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 100, 100)
|
val timeInterval = List[Long](0, 1000, 100, 100, 100)
|
||||||
val fd = new AccrualFailureDetector(system,
|
val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
fd.heartbeat(conn)
|
fd.heartbeat(conn)
|
||||||
fd.heartbeat(conn)
|
fd.heartbeat(conn)
|
||||||
|
|
@ -89,8 +129,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
|
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
|
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
|
||||||
val fd = new AccrualFailureDetector(system,
|
val fd = createFailureDetector(timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
fd.heartbeat(conn) //0
|
fd.heartbeat(conn) //0
|
||||||
|
|
||||||
|
|
@ -112,40 +151,65 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
|
|
||||||
"mark node as dead if heartbeat are missed" in {
|
"mark node as dead if heartbeat are missed" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 100, 5000)
|
val timeInterval = List[Long](0, 1000, 100, 100, 7000)
|
||||||
val ft = fakeTimeGenerator(timeInterval)
|
val ft = fakeTimeGenerator(timeInterval)
|
||||||
val fd = new AccrualFailureDetector(system, threshold = 3,
|
val fd = createFailureDetector(threshold = 3, timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
fd.heartbeat(conn) //0
|
fd.heartbeat(conn) //0
|
||||||
fd.heartbeat(conn) //1000
|
fd.heartbeat(conn) //1000
|
||||||
fd.heartbeat(conn) //1100
|
fd.heartbeat(conn) //1100
|
||||||
|
|
||||||
fd.isAvailable(conn) must be(true) //1200
|
fd.isAvailable(conn) must be(true) //1200
|
||||||
fd.isAvailable(conn) must be(false) //6200
|
fd.isAvailable(conn) must be(false) //8200
|
||||||
}
|
}
|
||||||
|
|
||||||
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
|
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
|
val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100)
|
||||||
val fd = new AccrualFailureDetector(system, threshold = 3,
|
val fd = createFailureDetector(threshold = 3, timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
fd.heartbeat(conn) //0
|
fd.heartbeat(conn) //0
|
||||||
fd.heartbeat(conn) //1000
|
fd.heartbeat(conn) //1000
|
||||||
fd.heartbeat(conn) //1100
|
fd.heartbeat(conn) //1100
|
||||||
fd.isAvailable(conn) must be(true) //1200
|
fd.isAvailable(conn) must be(true) //1200
|
||||||
fd.isAvailable(conn) must be(false) //6200
|
fd.isAvailable(conn) must be(false) //8200
|
||||||
fd.heartbeat(conn) //6300
|
fd.heartbeat(conn) //8300
|
||||||
fd.heartbeat(conn) //7300
|
fd.heartbeat(conn) //9300
|
||||||
fd.heartbeat(conn) //7400
|
fd.heartbeat(conn) //9400
|
||||||
|
|
||||||
fd.isAvailable(conn) must be(true) //7500
|
fd.isAvailable(conn) must be(true) //9500
|
||||||
|
}
|
||||||
|
|
||||||
|
"accept some configured missing heartbeats" in {
|
||||||
|
val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000)
|
||||||
|
val fd = createFailureDetector(acceptableLostDuration = 3.seconds, timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
|
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.isAvailable(conn) must be(true)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.isAvailable(conn) must be(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail after configured acceptable missing heartbeats" in {
|
||||||
|
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000)
|
||||||
|
val fd = createFailureDetector(acceptableLostDuration = 3.seconds, timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
|
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.isAvailable(conn) must be(true)
|
||||||
|
fd.heartbeat(conn)
|
||||||
|
fd.isAvailable(conn) must be(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
"use maxSampleSize heartbeats" in {
|
"use maxSampleSize heartbeats" in {
|
||||||
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
|
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
|
||||||
val fd = new AccrualFailureDetector(system, maxSampleSize = 3,
|
val fd = createFailureDetector(maxSampleSize = 3, timeMachine = fakeTimeGenerator(timeInterval))
|
||||||
timeMachine = fakeTimeGenerator(timeInterval))
|
|
||||||
|
|
||||||
// 100 ms interval
|
// 100 ms interval
|
||||||
fd.heartbeat(conn) //0
|
fd.heartbeat(conn) //0
|
||||||
|
|
@ -163,4 +227,33 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"Statistics for heartbeats" must {
|
||||||
|
|
||||||
|
"calculate correct mean and variance" in {
|
||||||
|
val samples = Seq(100, 200, 125, 340, 130)
|
||||||
|
val stats = (HeartbeatHistory(maxSampleSize = 20) /: samples) { (stats, value) ⇒ stats :+ value }
|
||||||
|
stats.mean must be(179.0 plusOrMinus 0.00001)
|
||||||
|
stats.variance must be(7584.0 plusOrMinus 0.00001)
|
||||||
|
}
|
||||||
|
|
||||||
|
"have 0.0 variance for one sample" in {
|
||||||
|
(HeartbeatHistory(600) :+ 1000L).variance must be(0.0 plusOrMinus 0.00001)
|
||||||
|
}
|
||||||
|
|
||||||
|
"be capped by the specified maxSampleSize" in {
|
||||||
|
val history3 = HeartbeatHistory(maxSampleSize = 3) :+ 100 :+ 110 :+ 90
|
||||||
|
history3.mean must be(100.0 plusOrMinus 0.00001)
|
||||||
|
history3.variance must be(66.6666667 plusOrMinus 0.00001)
|
||||||
|
|
||||||
|
val history4 = history3 :+ 140
|
||||||
|
history4.mean must be(113.333333 plusOrMinus 0.00001)
|
||||||
|
history4.variance must be(422.222222 plusOrMinus 0.00001)
|
||||||
|
|
||||||
|
val history5 = history4 :+ 80
|
||||||
|
history5.mean must be(103.333333 plusOrMinus 0.00001)
|
||||||
|
history5.variance must be(688.88888889 plusOrMinus 0.00001)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,9 +16,11 @@ class ClusterConfigSpec extends AkkaSpec {
|
||||||
"be able to parse generic cluster config elements" in {
|
"be able to parse generic cluster config elements" in {
|
||||||
val settings = new ClusterSettings(system.settings.config, system.name)
|
val settings = new ClusterSettings(system.settings.config, system.name)
|
||||||
import settings._
|
import settings._
|
||||||
FailureDetectorThreshold must be(8)
|
FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001)
|
||||||
FailureDetectorMaxSampleSize must be(1000)
|
FailureDetectorMaxSampleSize must be(1000)
|
||||||
FailureDetectorImplementationClass must be(None)
|
FailureDetectorImplementationClass must be(None)
|
||||||
|
FailureDetectorMinStdDeviation must be(100 millis)
|
||||||
|
FailureDetectorAcceptableLostHeartbeats must be(3.0 plusOrMinus 0.0001)
|
||||||
NodeToJoin must be(None)
|
NodeToJoin must be(None)
|
||||||
PeriodicTasksInitialDelay must be(1 seconds)
|
PeriodicTasksInitialDelay must be(1 seconds)
|
||||||
GossipInterval must be(1 second)
|
GossipInterval must be(1 second)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue