Failure detector refactoring, see #2690

* Failure detector was previously copied with refactoring to
  akka-remote and this refactoring makes use of that and removes
  the failure detector in akka-cluster
* Adjustments to reference.conf
* Refactoring of FailureDetectorPuppet
This commit is contained in:
Patrik Nordwall 2013-01-29 11:55:33 +01:00
parent 6198480c34
commit 157a25bcde
16 changed files with 183 additions and 750 deletions

View file

@ -69,28 +69,29 @@ akka {
# exceeded the conflicting gossip messages are dropped and will reappear later.
max-gossip-merge-rate = 5.0
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable members.
failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.cluster.FailureDetector and
# have constructor with akka.actor.ActorSystem and
# akka.cluster.ClusterSettings parameters
implementation-class = "akka.cluster.AccrualFailureDetector"
# It must implement akka.remote.FailureDetector and have
# a constructor with a com.typesafe.config.Config parameter.
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# how often should the node send out heartbeats?
heartbeat-interval = 1s
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 1 s
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 5
# defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes
# Defines the failure detector threshold.
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes.
threshold = 8.0
# Number of the samples of inter-heartbeat arrival times to adaptively
# calculate the failure timeout for connections.
max-sample-size = 1000
# Minimum standard deviation to use for the normal distribution in
# AccrualFailureDetector. Too low standard deviation might result in
# too much sensitivity for sudden, but normal, deviations in heartbeat
@ -99,15 +100,14 @@ akka {
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# It is a factor of heartbeat-interval.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 3s
acceptable-heartbeat-pause = 3 s
# Number of samples to use for calculation of mean and standard deviation of
# inter-arrival times.
max-sample-size = 1000
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 5
# When a node stops sending heartbeats to another node it will end that
# with this number of EndHeartbeat messages, which will remove the

View file

@ -1,290 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.event.Logging
import scala.collection.immutable
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
object AccrualFailureDetector {
private def realClock: () Long = () NANOSECONDS.toMillis(System.nanoTime)
}
/**
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
* [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
*
* The suspicion level of failure is given by a value called φ (phi).
* The basic idea of the φ failure detector is to express the value of φ on a scale that
* is dynamically adjusted to reflect current network conditions. A configurable
* threshold is used to decide if φ is considered to be a failure.
*
* The value of φ is calculated as:
*
* {{{
* φ = -log10(1 - F(timeSinceLastHeartbeat)
* }}}
* where F is the cumulative distribution function of a normal distribution with mean
* and standard deviation estimated from historical heartbeat inter-arrival times.
*
*
* @param system Belongs to the [[akka.actor.ActorSystem]]. Used for logging.
*
* @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
* of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
* actual crashes
*
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
* inter-arrival times.
*
* @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi.
* Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations
* in heartbeat inter arrival times.
*
* @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed
* heartbeats that will be accepted before considering it to be an anomaly.
* This margin is important to be able to survive sudden, occasional, pauses in heartbeat
* arrivals, due to for example garbage collect or network drop.
*
* @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to
* to this duration, with a with rather high standard deviation (since environment is unknown
* in the beginning)
*
* @param clock The clock, returning current time in milliseconds, but can be faked for testing
* purposes. It is only used for measuring intervals (duration).
*
*/
class AccrualFailureDetector(
val system: ActorSystem,
val threshold: Double,
val maxSampleSize: Int,
val minStdDeviation: Duration,
val acceptableHeartbeatPause: Duration,
val firstHeartbeatEstimate: Duration,
val clock: () Long = AccrualFailureDetector.realClock) extends FailureDetector {
import AccrualFailureDetector._
/**
* Constructor that picks configuration from the settings.
*/
def this(
system: ActorSystem,
settings: ClusterSettings) =
this(
system,
threshold = settings.FailureDetectorThreshold,
maxSampleSize = settings.FailureDetectorMaxSampleSize,
minStdDeviation = settings.FailureDetectorMinStdDeviation,
acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause,
firstHeartbeatEstimate = settings.HeartbeatInterval,
clock = AccrualFailureDetector.realClock)
private val log = Logging(system, "FailureDetector")
// guess statistics for first heartbeat,
// important so that connections with only one heartbeat becomes unavailable
private val firstHeartbeat: HeartbeatHistory = {
// bootstrap with 2 entries with rather high standard deviation
val mean = firstHeartbeatEstimate.toMillis
val stdDeviation = mean / 4
HeartbeatHistory(maxSampleSize) :+ (mean - stdDeviation) :+ (mean + stdDeviation)
}
private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis
/**
* Implement using optimistic lockless concurrency, all state is represented
* by this immutable case class and managed by an AtomicReference.
*/
private case class State(
version: Long = 0L,
history: Map[Address, HeartbeatHistory] = Map.empty,
timestamps: Map[Address, Long] = Map.empty[Address, Long])
private val state = new AtomicReference[State](State())
override def isAvailable(connection: Address): Boolean = phi(connection) < threshold
override def isMonitoring(connection: Address): Boolean = state.get.timestamps.get(connection).nonEmpty
/**
* Records a heartbeat for a connection.
*/
@tailrec
final def heartbeat(connection: Address) {
if (isMonitoring(connection))
log.debug("Heartbeat from connection [{}] ", connection)
else
log.info("First heartbeat from connection [{}] ", connection)
val timestamp = clock()
val oldState = state.get
val newHistory = oldState.timestamps.get(connection) match {
case None
// this is heartbeat from a new connection
// add starter records for this new connection
firstHeartbeat
case Some(latestTimestamp)
// this is a known connection
val interval = timestamp - latestTimestamp
oldState.history(connection) :+ interval
}
val newState = oldState copy (version = oldState.version + 1,
history = oldState.history + (connection -> newHistory),
timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
}
/**
* The suspicion level of the accrual failure detector.
*
* If a connection does not have any records in failure detector then it is
* considered healthy.
*/
def phi(connection: Address): Double = {
val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection)
if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timeDiff = clock() - oldTimestamp.get
val history = oldState.history(connection)
val mean = history.mean
val stdDeviation = ensureValidStdDeviation(history.stdDeviation)
val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation)
// FIXME change to debug log level, when failure detector is stable
if (φ > 1.0 && timeDiff < (acceptableHeartbeatPauseMillis + 5000))
log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]",
φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")")
φ
}
}
private[cluster] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = {
val cdf = cumulativeDistributionFunction(timeDiff, mean, stdDeviation)
-math.log10(1.0 - cdf)
}
private val minStdDeviationMillis = minStdDeviation.toMillis
private def ensureValidStdDeviation(stdDeviation: Double): Double = math.max(stdDeviation, minStdDeviationMillis)
/**
* Cumulative distribution function for N(mean, stdDeviation) normal distribution.
* This is an approximation defined in β Mathematics Handbook.
*/
private[cluster] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = {
val y = (x - mean) / stdDeviation
// Cumulative distribution function for N(0, 1)
1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
}
/**
* Removes the heartbeat management for a connection.
*/
@tailrec
final def remove(connection: Address): Unit = {
if (isMonitoring(connection))
log.info("Remove heartbeat connection [{}] ", connection)
val oldState = state.get
if (oldState.history.contains(connection)) {
val newState = oldState copy (version = oldState.version + 1,
history = oldState.history - connection,
timestamps = oldState.timestamps - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur
}
}
def reset(): Unit = {
@tailrec
def doReset(): Unit = {
val oldState = state.get
val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) doReset() // recur
}
log.debug("Resetting failure detector")
doReset()
}
}
private[cluster] object HeartbeatHistory {
/**
* Create an empty HeartbeatHistory, without any history.
* Can only be used as starting point for appending intervals.
* The stats (mean, variance, stdDeviation) are not defined for
* for empty HeartbeatHistory, i.e. throws AritmeticException.
*/
def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory(
maxSampleSize = maxSampleSize,
intervals = immutable.IndexedSeq.empty,
intervalSum = 0L,
squaredIntervalSum = 0L)
}
/**
* Holds the heartbeat statistics for a specific node Address.
* It is capped by the number of samples specified in `maxSampleSize`.
*
* The stats (mean, variance, stdDeviation) are not defined for
* for empty HeartbeatHistory, i.e. throws AritmeticException.
*/
private[cluster] case class HeartbeatHistory private (
maxSampleSize: Int,
intervals: immutable.IndexedSeq[Long],
intervalSum: Long,
squaredIntervalSum: Long) {
if (maxSampleSize < 1)
throw new IllegalArgumentException("maxSampleSize must be >= 1, got [%s]" format maxSampleSize)
if (intervalSum < 0L)
throw new IllegalArgumentException("intervalSum must be >= 0, got [%s]" format intervalSum)
if (squaredIntervalSum < 0L)
throw new IllegalArgumentException("squaredIntervalSum must be >= 0, got [%s]" format squaredIntervalSum)
def mean: Double = intervalSum.toDouble / intervals.size
def variance: Double = (squaredIntervalSum.toDouble / intervals.size) - (mean * mean)
def stdDeviation: Double = math.sqrt(variance)
@tailrec
final def :+(interval: Long): HeartbeatHistory = {
if (intervals.size < maxSampleSize)
HeartbeatHistory(
maxSampleSize,
intervals = intervals :+ interval,
intervalSum = intervalSum + interval,
squaredIntervalSum = squaredIntervalSum + pow2(interval))
else
dropOldest :+ interval // recur
}
private def dropOldest: HeartbeatHistory = HeartbeatHistory(
maxSampleSize,
intervals = intervals drop 1,
intervalSum = intervalSum - intervals.head,
squaredIntervalSum = squaredIntervalSum - pow2(intervals.head))
private def pow2(x: Long) = x * x
}

View file

@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicReference
import akka.util.internal.HashedWheelTimer
import scala.concurrent.{ ExecutionContext, Await }
import com.typesafe.config.ConfigFactory
import akka.remote.DefaultFailureDetectorRegistry
import akka.remote.FailureDetector
import com.typesafe.config.Config
/**
* Cluster Extension Id and factory for creating Cluster extension.
@ -73,12 +76,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
log.info("Cluster Node [{}] - is starting up...", selfAddress)
val failureDetector: FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, List(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({
case e throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
}).get
val failureDetector: FailureDetectorRegistry[Address] = {
def createFailureDetector(): FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({
case e throw new ConfigurationException(
s"Could not create custom cluster failure detector [$fqcn] due to: ${e.toString}", e)
}).get
}
new DefaultFailureDetectorRegistry(() createFailureDetector())
}
// ========================================================

View file

@ -16,70 +16,60 @@ import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq
class ClusterSettings(val config: Config, val systemName: String) {
import config._
final val FailureDetectorThreshold: Double = {
getDouble("akka.cluster.failure-detector.threshold")
} requiring (_ > 0.0, "failure-detector.threshold must be > 0")
final val FailureDetectorMaxSampleSize: Int = {
getInt("akka.cluster.failure-detector.max-sample-size")
} requiring (_ > 0, "failure-detector.max-sample-size must be > 0")
final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class")
final val FailureDetectorMinStdDeviation: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.min-std-deviation must be > 0")
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
} requiring (_ >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0")
private val cc = config.getConfig("akka.cluster")
final val FailureDetectorConfig: Config = cc.getConfig("failure-detector")
final val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class")
final val HeartbeatInterval: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
final val HeartbeatRequestDelay: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.grace-period"), MILLISECONDS)
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0")
final val HeartbeatExpectedResponseAfter: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.expected-response-after"), MILLISECONDS)
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0")
final val HeartbeatRequestTimeToLive: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.time-to-live"), MILLISECONDS)
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0")
final val NumberOfEndHeartbeats: Int = {
getInt("akka.cluster.failure-detector.nr-of-end-heartbeats")
FailureDetectorConfig.getInt("nr-of-end-heartbeats")
} requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0")
final val MonitoredByNrOfMembers: Int = {
getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")
final val SeedNodes: immutable.IndexedSeq[Address] =
immutableSeq(getStringList("akka.cluster.seed-nodes")).map { case AddressFromURIString(addr) addr }.toVector
final val SeedNodeTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
final val LeaderActionsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS)
final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join")
final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down")
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) addr }.toVector
final val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS)
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS)
final val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
final val PublishStatsInterval: FiniteDuration = Duration(cc.getMilliseconds("publish-stats-interval"), MILLISECONDS)
final val AutoJoin: Boolean = cc.getBoolean("auto-join")
final val AutoDown: Boolean = cc.getBoolean("auto-down")
final val MinNrOfMembers: Int = {
getInt("akka.cluster.min-nr-of-members")
cc.getInt("min-nr-of-members")
} requiring (_ > 0, "min-nr-of-members must be > 0")
final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled")
final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match {
final val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
final val UseDispatcher: String = cc.getString("use-dispatcher") match {
case "" Dispatchers.DefaultDispatcherId
case id id
}
final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability")
final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate")
final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel")
final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled")
final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class")
final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
final val MaxGossipMergeRate: Double = cc.getDouble("max-gossip-merge-rate")
final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")
final val MetricsCollectorClass: String = cc.getString("metrics.collector-class")
final val MetricsInterval: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.metrics.collect-interval"), MILLISECONDS)
Duration(cc.getMilliseconds("metrics.collect-interval"), MILLISECONDS)
} requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0")
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
final val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS)
final val MetricsMovingAverageHalfLife: FiniteDuration = {
Duration(getMilliseconds("akka.cluster.metrics.moving-average-half-life"), MILLISECONDS)
Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS)
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")
}

View file

@ -1,39 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.Address
/**
* Interface for Akka failure detectors.
*/
trait FailureDetector {
/**
* Returns true if the connection is considered to be up and healthy and returns false otherwise.
*/
def isAvailable(connection: Address): Boolean
/**
* Returns true if the failure detector has received any heartbeats and started monitoring
* of the resource.
*/
def isMonitoring(connection: Address): Boolean
/**
* Records a heartbeat for a connection.
*/
def heartbeat(connection: Address): Unit
/**
* Removes the heartbeat management for a connection.
*/
def remove(connection: Address): Unit
/**
* Removes all connections and starts over.
*/
def reset(): Unit
}

View file

@ -4,10 +4,8 @@
package akka.cluster
import language.implicitConversions
import org.scalatest.Suite
import org.scalatest.exceptions.TestFailedException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.remote.testconductor.RoleName
@ -19,6 +17,7 @@ import akka.event.Logging.ErrorLevel
import scala.concurrent.duration._
import scala.collection.immutable
import java.util.concurrent.ConcurrentHashMap
import akka.remote.DefaultFailureDetectorRegistry
object MultiNodeClusterSpec {
@ -258,20 +257,32 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
* [[akka.cluster.FailureDetectorPuppet]] is used as
* failure detector.
*/
def markNodeAsAvailable(address: Address): Unit = cluster.failureDetector match {
case puppet: FailureDetectorPuppet puppet.markNodeAsAvailable(address)
case _
}
def markNodeAsAvailable(address: Address): Unit =
failureDetectorPuppet(address) foreach (_.markNodeAsAvailable())
/**
* Marks a node as unavailable in the failure detector if
* [[akka.cluster.FailureDetectorPuppet]] is used as
* failure detector.
*/
def markNodeAsUnavailable(address: Address): Unit = cluster.failureDetector match {
case puppet: FailureDetectorPuppet puppet.markNodeAsUnavailable(address)
case _
def markNodeAsUnavailable(address: Address): Unit = {
if (isFailureDetectorPuppet) {
// before marking it as unavailble there must be at least one heartbeat
// to create the FailureDetectorPuppet in the FailureDetectorRegistry
cluster.failureDetector.heartbeat(address)
failureDetectorPuppet(address) foreach (_.markNodeAsUnavailable())
}
}
private def isFailureDetectorPuppet: Boolean =
cluster.settings.FailureDetectorImplementationClass == classOf[FailureDetectorPuppet].getName
private def failureDetectorPuppet(address: Address): Option[FailureDetectorPuppet] =
cluster.failureDetector match {
case reg: DefaultFailureDetectorRegistry[Address]
reg.failureDetector(address) collect { case p: FailureDetectorPuppet p }
case _ None
}
}

View file

@ -27,6 +27,8 @@ import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics.HeapMemory
import akka.remote.DefaultFailureDetectorRegistry
import akka.remote.PhiAccrualFailureDetector
import akka.remote.RemoteScope
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
@ -328,12 +330,19 @@ object StressMultiJvmSpec extends MultiNodeConfig {
*/
class PhiObserver extends Actor with ActorLogging {
val cluster = Cluster(context.system)
val fd = cluster.failureDetector.asInstanceOf[AccrualFailureDetector]
var reportTo: Option[ActorRef] = None
val emptyPhiByNode = Map.empty[Address, PhiValue].withDefault(address PhiValue(address, 0, 0, 0.0))
var phiByNode = emptyPhiByNode
var nodes = Set.empty[Address]
def phi(address: Address): Double = cluster.failureDetector match {
case reg: DefaultFailureDetectorRegistry[Address] reg.failureDetector(address) match {
case Some(fd: PhiAccrualFailureDetector) fd.phi
case _ 0.0
}
case _ 0.0
}
import context.dispatcher
val checkPhiTask = context.system.scheduler.schedule(
1.second, 1.second, self, PhiTick)
@ -350,8 +359,8 @@ object StressMultiJvmSpec extends MultiNodeConfig {
case PhiTick
nodes foreach { node
val previous = phiByNode(node)
val φ = fd.phi(node)
if (φ > 0 || fd.isMonitoring(node)) {
val φ = phi(node)
if (φ > 0 || cluster.failureDetector.isMonitoring(node)) {
val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0
phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1,
math.max(previous.max, φ))

View file

@ -1,266 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.Address
import akka.testkit._
import akka.testkit.TestEvent._
import scala.collection.immutable
import scala.concurrent.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class AccrualFailureDetectorSpec extends AkkaSpec("""
actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.loglevel = "INFO"
""") {
override def atStartup(): Unit = {
super.atStartup()
if (!log.isDebugEnabled) {
system.eventStream.publish(Mute(EventFilter.info(pattern = ".*Phi value.*")))
}
}
"An AccrualFailureDetector" must {
val conn = Address("akka.tcp", "", "localhost", 2552)
val conn2 = Address("akka.tcp", "", "localhost", 2553)
def fakeTimeGenerator(timeIntervals: immutable.Seq[Long]): () Long = {
var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) acc ::: List[Long](acc.last + c))
def timeGenerator(): Long = {
val currentTime = times.head
times = times.tail
currentTime
}
timeGenerator
}
val defaultFakeTimeIntervals = Vector.fill(20)(1000L)
def createFailureDetector(
threshold: Double = 8.0,
maxSampleSize: Int = 1000,
minStdDeviation: Duration = 10.millis,
acceptableLostDuration: Duration = Duration.Zero,
firstHeartbeatEstimate: Duration = 1.second,
clock: () Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector =
new AccrualFailureDetector(system,
threshold,
maxSampleSize,
minStdDeviation,
acceptableLostDuration,
firstHeartbeatEstimate = firstHeartbeatEstimate,
clock = clock)
"use good enough cumulative distribution function" in {
val fd = createFailureDetector()
fd.cumulativeDistributionFunction(0.0, 0, 1) must be(0.5 plusOrMinus (0.001))
fd.cumulativeDistributionFunction(0.6, 0, 1) must be(0.7257 plusOrMinus (0.001))
fd.cumulativeDistributionFunction(1.5, 0, 1) must be(0.9332 plusOrMinus (0.001))
fd.cumulativeDistributionFunction(2.0, 0, 1) must be(0.97725 plusOrMinus (0.01))
fd.cumulativeDistributionFunction(2.5, 0, 1) must be(0.9379 plusOrMinus (0.1))
fd.cumulativeDistributionFunction(3.5, 0, 1) must be(0.99977 plusOrMinus (0.1))
fd.cumulativeDistributionFunction(4.0, 0, 1) must be(0.99997 plusOrMinus (0.1))
for (x :: y :: Nil (0.0 to 4.0 by 0.1).toList.sliding(2)) {
fd.cumulativeDistributionFunction(x, 0, 1) must be < (
fd.cumulativeDistributionFunction(y, 0, 1))
}
fd.cumulativeDistributionFunction(2.2, 2.0, 0.3) must be(0.7475 plusOrMinus (0.001))
}
"return realistic phi values" in {
val fd = createFailureDetector()
val test = immutable.TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3)
for ((timeDiff, expectedPhi) test) {
fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1))
}
// larger stdDeviation results => lower phi
fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 500.0) must be < (
fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 100.0))
}
"return phi value of 0.0 on startup for each address, when no heartbeats" in {
val fd = createFailureDetector()
fd.phi(conn) must be(0.0)
fd.phi(conn2) must be(0.0)
}
"return phi based on guess when only one heartbeat" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000)
val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds,
clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.phi(conn) must be(0.3 plusOrMinus 0.2)
fd.phi(conn) must be(4.5 plusOrMinus 0.3)
fd.phi(conn) must be > (15.0)
}
"return phi value using first interval after second heartbeat" in {
val timeInterval = List[Long](0, 100, 100, 100)
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
}
"mark node as available after a series of successful heartbeats" in {
val timeInterval = List[Long](0, 1000, 100, 100)
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
}
"mark node as available after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.remove(conn)
fd.isAvailable(conn) must be(true)
}
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //2200
fd.remove(conn)
fd.isAvailable(conn) must be(true) //3300
// it receives heartbeat from an explicitly removed node
fd.heartbeat(conn) //4400
fd.heartbeat(conn) //5500
fd.heartbeat(conn) //6600
fd.isAvailable(conn) must be(true) //6700
}
"mark node as dead if heartbeat are missed" in {
val timeInterval = List[Long](0, 1000, 100, 100, 7000)
val ft = fakeTimeGenerator(timeInterval)
val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false) //8200
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100)
val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false) //8200
fd.heartbeat(conn) //8300
fd.heartbeat(conn) //9300
fd.heartbeat(conn) //9400
fd.isAvailable(conn) must be(true) //9500
}
"accept some configured missing heartbeats" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000)
val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
}
"fail after configured acceptable missing heartbeats" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000)
val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(false)
}
"use maxSampleSize heartbeats" in {
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval))
// 100 ms interval
fd.heartbeat(conn) //0
fd.heartbeat(conn) //100
fd.heartbeat(conn) //200
fd.heartbeat(conn) //300
val phi1 = fd.phi(conn) //400
// 1000 ms interval, should become same phi when 100 ms intervals have been dropped
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //2000
fd.heartbeat(conn) //3000
fd.heartbeat(conn) //4000
val phi2 = fd.phi(conn) //5000
phi2 must be(phi1.plusOrMinus(0.001))
}
}
"Statistics for heartbeats" must {
"calculate correct mean and variance" in {
val samples = Seq(100, 200, 125, 340, 130)
val stats = (HeartbeatHistory(maxSampleSize = 20) /: samples) { (stats, value) stats :+ value }
stats.mean must be(179.0 plusOrMinus 0.00001)
stats.variance must be(7584.0 plusOrMinus 0.00001)
}
"have 0.0 variance for one sample" in {
(HeartbeatHistory(600) :+ 1000L).variance must be(0.0 plusOrMinus 0.00001)
}
"be capped by the specified maxSampleSize" in {
val history3 = HeartbeatHistory(maxSampleSize = 3) :+ 100 :+ 110 :+ 90
history3.mean must be(100.0 plusOrMinus 0.00001)
history3.variance must be(66.6666667 plusOrMinus 0.00001)
val history4 = history3 :+ 140
history4.mean must be(113.333333 plusOrMinus 0.00001)
history4.variance must be(422.222222 plusOrMinus 0.00001)
val history5 = history4 :+ 80
history5.mean must be(103.333333 plusOrMinus 0.00001)
history5.variance must be(688.88888889 plusOrMinus 0.00001)
}
}
}

View file

@ -5,10 +5,10 @@
package akka.cluster
import language.postfixOps
import akka.testkit.AkkaSpec
import akka.dispatch.Dispatchers
import scala.concurrent.duration._
import akka.remote.PhiAccrualFailureDetector
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterConfigSpec extends AkkaSpec {
@ -18,11 +18,11 @@ class ClusterConfigSpec extends AkkaSpec {
"be able to parse generic cluster config elements" in {
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001)
FailureDetectorMaxSampleSize must be(1000)
FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName)
FailureDetectorMinStdDeviation must be(100 millis)
FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
FailureDetectorConfig.getDouble("threshold") must be(8.0 plusOrMinus 0.0001)
FailureDetectorConfig.getInt("max-sample-size") must be(1000)
Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis)
Duration(FailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(3 seconds)
FailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName)
SeedNodes must be(Seq.empty[String])
SeedNodeTimeout must be(5 seconds)
PeriodicTasksInitialDelay must be(1 seconds)

View file

@ -4,64 +4,35 @@
package akka.cluster
import akka.actor.{ Address, ActorSystem }
import akka.event.{ Logging, LogSource }
import java.util.concurrent.atomic.AtomicReference
import akka.remote.testkit.MultiNodeConfig
import akka.remote.FailureDetector
import com.typesafe.config.Config
/**
* User controllable "puppet" failure detector.
*/
class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector {
import java.util.concurrent.ConcurrentHashMap
class FailureDetectorPuppet(config: Config) extends FailureDetector {
trait Status
object Up extends Status
object Down extends Status
object Unknown extends Status
implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
private val status: AtomicReference[Status] = new AtomicReference(Unknown)
def markNodeAsUnavailable(): Unit = status.set(Down)
def markNodeAsAvailable(): Unit = status.set(Up)
override def isAvailable: Boolean = status.get match {
case Unknown | Up true
case Down false
}
private val log = Logging(system, this)
override def isMonitoring: Boolean = status.get != Unknown
private val connections = new ConcurrentHashMap[Address, Status]
override def heartbeat(): Unit = status.compareAndSet(Unknown, Up)
def markNodeAsUnavailable(connection: Address): this.type = {
connections.put(connection, Down)
this
}
def markNodeAsAvailable(connection: Address): this.type = {
connections.put(connection, Up)
this
}
def isAvailable(connection: Address): Boolean = connections.get(connection) match {
case null
log.debug("Adding cluster node [{}]", connection)
connections.put(connection, Up)
true
case Up
log.debug("isAvailable: Cluster node IS NOT available [{}]", connection)
true
case Down
log.debug("isAvailable: Cluster node IS available [{}]", connection)
false
}
override def isMonitoring(connection: Address): Boolean = connections.contains(connection)
def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection)
def remove(connection: Address): Unit = {
log.debug("Removing cluster node [{}]", connection)
connections.remove(connection)
}
def reset(): Unit = {
log.debug("Resetting failure detector")
connections.clear()
}
}

View file

@ -130,17 +130,23 @@ akka {
### Failure detection and recovery
# how often should keep-alive heartbeat messages sent to connections.
heartbeat-interval = 1 s
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
# [Hayashibara et al]) used by the remoting subsystem to detect failed connections.
failure-detector {
# defines the failure detector threshold
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a constructor with a com.typesafe.config.Config parameter.
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 1 s
# Defines the failure detector threshold.
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes
# actual crashes.
threshold = 7.0
# Number of the samples of inter-heartbeat arrival times to adaptively
@ -155,7 +161,6 @@ akka {
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# It is a factor of heartbeat-interval.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.

View file

@ -16,7 +16,7 @@ import java.util.concurrent.locks.{ ReentrantLock, Lock }
* By-name parameter that returns the failure detector instance to be used by a newly registered resource
*
*/
class DefaultFailureDetectorRegistry[A](val detectorFactory: () FailureDetector) extends FailureDetectorRegistry[A] {
class DefaultFailureDetectorRegistry[A](detectorFactory: () FailureDetector) extends FailureDetectorRegistry[A] {
private val resourceToFailureDetector = new AtomicReference[Map[A, FailureDetector]](Map())
private final val failureDetectorCreationLock: Lock = new ReentrantLock
@ -73,5 +73,13 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec
if (!resourceToFailureDetector.compareAndSet(oldTable, Map.empty[A, FailureDetector])) reset() // recur
}
/**
* INTERNAL API
* Get the underlying FailureDetector for a resource.
*/
private[akka] def failureDetector(resource: A): Option[FailureDetector] =
resourceToFailureDetector.get.get(resource)
}

View file

@ -5,9 +5,12 @@ package akka.remote
import akka.remote.FailureDetector.Clock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import com.typesafe.config.Config
/**
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
@ -57,6 +60,26 @@ class PhiAccrualFailureDetector(
val firstHeartbeatEstimate: FiniteDuration)(
implicit clock: Clock) extends FailureDetector {
/**
* Constructor that reads parameters from config.
* Expecting config properties named `threshold`, `max-sample-size`,
* `min-std-deviation`, `acceptable-heartbeat-pause` and
* `heartbeat-interval`.
*/
def this(config: Config) =
this(
threshold = config.getDouble("threshold"),
maxSampleSize = config.getInt("max-sample-size"),
minStdDeviation = Duration(config.getMilliseconds("min-std-deviation"), MILLISECONDS),
acceptableHeartbeatPause = Duration(config.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS),
firstHeartbeatEstimate = Duration(config.getMilliseconds("heartbeat-interval"), MILLISECONDS))
require(threshold > 0.0, "failure-detector.threshold must be > 0")
require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0")
require(minStdDeviation > Duration.Zero, "failure-detector.min-std-deviation must be > 0")
require(acceptableHeartbeatPause >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0")
require(firstHeartbeatEstimate > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
// guess statistics for first heartbeat,
// important so that connections with only one heartbeat becomes unavailable
private val firstHeartbeat: HeartbeatHistory = {

View file

@ -21,6 +21,7 @@ import scala.util.control.NonFatal
import scala.util.{ Success, Failure }
import scala.collection.immutable
import akka.remote.transport.ActorTransportAdapter._
import akka.ConfigurationException
class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace {
def this(msg: String) = this(msg, null)
@ -30,18 +31,15 @@ private[remote] class AkkaProtocolSettings(config: Config) {
import config._
val FailureDetectorThreshold: Double = getDouble("akka.remote.failure-detector.threshold")
val FailureDetectorConfig: Config = getConfig("akka.remote.failure-detector")
val FailureDetectorMaxSampleSize: Int = getInt("akka.remote.failure-detector.max-sample-size")
val FailureDetectorStdDeviation: FiniteDuration =
Duration(getMilliseconds("akka.remote.failure-detector.min-std-deviation"), MILLISECONDS)
val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class")
val AcceptableHeartBeatPause: FiniteDuration =
Duration(getMilliseconds("akka.remote.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
Duration(FailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS)
val HeartBeatInterval: FiniteDuration =
Duration(getMilliseconds("akka.remote.heartbeat-interval"), MILLISECONDS)
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
val WaitActivityEnabled: Boolean = getBoolean("akka.remote.wait-activity-enabled")
@ -143,12 +141,14 @@ private[transport] class AkkaProtocolManager(
failureDetector)), actorNameFor(remoteAddress)) // Why don't we watch this one?
}
private def createFailureDetector(): FailureDetector = new PhiAccrualFailureDetector(
settings.FailureDetectorThreshold,
settings.FailureDetectorMaxSampleSize,
settings.FailureDetectorStdDeviation,
settings.AcceptableHeartBeatPause,
settings.HeartBeatInterval)
private def createFailureDetector(): FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector](
fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({
case e throw new ConfigurationException(
s"Could not create custom remote failure detector [$fqcn] due to: ${e.toString}", e)
}).get
}
override def postStop() {
wrappedTransport.shutdown()

View file

@ -47,13 +47,16 @@ class RemoteConfigSpec extends AkkaSpec(
import settings._
WaitActivityEnabled must be(true)
FailureDetectorThreshold must be === 7
FailureDetectorMaxSampleSize must be === 100
FailureDetectorStdDeviation must be === 100.milliseconds
FailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName)
AcceptableHeartBeatPause must be === 3.seconds
HeartBeatInterval must be === 1.seconds
RequireCookie must be(false)
SecureCookie must be === ""
FailureDetectorConfig.getDouble("threshold") must be(7.0 plusOrMinus 0.0001)
FailureDetectorConfig.getInt("max-sample-size") must be(100)
Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis)
}
"contain correct configuration values in reference.conf" ignore {

View file

@ -36,14 +36,14 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
akka.remote {
failure-detector {
implementation-class = "akka.remote.PhiAccrualFailureDetector"
threshold = 7.0
max-sample-size = 100
min-std-deviation = 100 ms
acceptable-heartbeat-pause = 3 s
heartbeat-interval = 0.1 s
}
heartbeat-interval = 0.1 s
wait-activity-enabled = on
backoff-interval = 1 s