diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 6a1f7ac3c3..f9e1f203b4 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -69,28 +69,29 @@ akka { # exceeded the conflicting gossip messages are dropped and will reappear later. max-gossip-merge-rate = 5.0 + # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf + # [Hayashibara et al]) used by the cluster subsystem to detect unreachable members. failure-detector { # FQCN of the failure detector implementation. - # It must implement akka.cluster.FailureDetector and - # have constructor with akka.actor.ActorSystem and - # akka.cluster.ClusterSettings parameters - implementation-class = "akka.cluster.AccrualFailureDetector" + # It must implement akka.remote.FailureDetector and have + # a constructor with a com.typesafe.config.Config parameter. + implementation-class = "akka.remote.PhiAccrualFailureDetector" - # how often should the node send out heartbeats? - heartbeat-interval = 1s + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 1 s - # Number of member nodes that each member will send heartbeat messages to, - # i.e. each node will be monitored by this number of other nodes. - monitored-by-nr-of-members = 5 - - # defines the failure detector threshold - # A low threshold is prone to generate many wrong suspicions but ensures - # a quick detection in the event of a real crash. Conversely, a high - # threshold generates fewer mistakes but needs more time to detect - # actual crashes + # Defines the failure detector threshold. + # A low threshold is prone to generate many wrong suspicions but ensures + # a quick detection in the event of a real crash. Conversely, a high + # threshold generates fewer mistakes but needs more time to detect + # actual crashes. threshold = 8.0 + # Number of the samples of inter-heartbeat arrival times to adaptively + # calculate the failure timeout for connections. + max-sample-size = 1000 + # Minimum standard deviation to use for the normal distribution in # AccrualFailureDetector. Too low standard deviation might result in # too much sensitivity for sudden, but normal, deviations in heartbeat @@ -99,15 +100,14 @@ akka { # Number of potentially lost/delayed heartbeats that will be # accepted before considering it to be an anomaly. - # It is a factor of heartbeat-interval. # This margin is important to be able to survive sudden, occasional, # pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-heartbeat-pause = 3s + acceptable-heartbeat-pause = 3 s - # Number of samples to use for calculation of mean and standard deviation of - # inter-arrival times. - max-sample-size = 1000 + # Number of member nodes that each member will send heartbeat messages to, + # i.e. each node will be monitored by this number of other nodes. + monitored-by-nr-of-members = 5 # When a node stops sending heartbeats to another node it will end that # with this number of EndHeartbeat messages, which will remove the diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala deleted file mode 100644 index 92c876b22d..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.cluster - -import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } -import akka.event.Logging -import scala.collection.immutable -import scala.annotation.tailrec -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.TimeUnit.NANOSECONDS -import scala.concurrent.duration._ - -object AccrualFailureDetector { - private def realClock: () ⇒ Long = () ⇒ NANOSECONDS.toMillis(System.nanoTime) -} -/** - * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: - * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf] - * - * The suspicion level of failure is given by a value called φ (phi). - * The basic idea of the φ failure detector is to express the value of φ on a scale that - * is dynamically adjusted to reflect current network conditions. A configurable - * threshold is used to decide if φ is considered to be a failure. - * - * The value of φ is calculated as: - * - * {{{ - * φ = -log10(1 - F(timeSinceLastHeartbeat) - * }}} - * where F is the cumulative distribution function of a normal distribution with mean - * and standard deviation estimated from historical heartbeat inter-arrival times. - * - * - * @param system Belongs to the [[akka.actor.ActorSystem]]. Used for logging. - * - * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event - * of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect - * actual crashes - * - * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of - * inter-arrival times. - * - * @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi. - * Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations - * in heartbeat inter arrival times. - * - * @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed - * heartbeats that will be accepted before considering it to be an anomaly. - * This margin is important to be able to survive sudden, occasional, pauses in heartbeat - * arrivals, due to for example garbage collect or network drop. - * - * @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to - * to this duration, with a with rather high standard deviation (since environment is unknown - * in the beginning) - * - * @param clock The clock, returning current time in milliseconds, but can be faked for testing - * purposes. It is only used for measuring intervals (duration). - * - */ -class AccrualFailureDetector( - val system: ActorSystem, - val threshold: Double, - val maxSampleSize: Int, - val minStdDeviation: Duration, - val acceptableHeartbeatPause: Duration, - val firstHeartbeatEstimate: Duration, - val clock: () ⇒ Long = AccrualFailureDetector.realClock) extends FailureDetector { - - import AccrualFailureDetector._ - - /** - * Constructor that picks configuration from the settings. - */ - def this( - system: ActorSystem, - settings: ClusterSettings) = - this( - system, - threshold = settings.FailureDetectorThreshold, - maxSampleSize = settings.FailureDetectorMaxSampleSize, - minStdDeviation = settings.FailureDetectorMinStdDeviation, - acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause, - firstHeartbeatEstimate = settings.HeartbeatInterval, - clock = AccrualFailureDetector.realClock) - - private val log = Logging(system, "FailureDetector") - - // guess statistics for first heartbeat, - // important so that connections with only one heartbeat becomes unavailable - private val firstHeartbeat: HeartbeatHistory = { - // bootstrap with 2 entries with rather high standard deviation - val mean = firstHeartbeatEstimate.toMillis - val stdDeviation = mean / 4 - HeartbeatHistory(maxSampleSize) :+ (mean - stdDeviation) :+ (mean + stdDeviation) - } - - private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis - - /** - * Implement using optimistic lockless concurrency, all state is represented - * by this immutable case class and managed by an AtomicReference. - */ - private case class State( - version: Long = 0L, - history: Map[Address, HeartbeatHistory] = Map.empty, - timestamps: Map[Address, Long] = Map.empty[Address, Long]) - - private val state = new AtomicReference[State](State()) - - override def isAvailable(connection: Address): Boolean = phi(connection) < threshold - - override def isMonitoring(connection: Address): Boolean = state.get.timestamps.get(connection).nonEmpty - - /** - * Records a heartbeat for a connection. - */ - @tailrec - final def heartbeat(connection: Address) { - if (isMonitoring(connection)) - log.debug("Heartbeat from connection [{}] ", connection) - else - log.info("First heartbeat from connection [{}] ", connection) - - val timestamp = clock() - val oldState = state.get - - val newHistory = oldState.timestamps.get(connection) match { - case None ⇒ - // this is heartbeat from a new connection - // add starter records for this new connection - firstHeartbeat - case Some(latestTimestamp) ⇒ - // this is a known connection - val interval = timestamp - latestTimestamp - oldState.history(connection) :+ interval - } - - val newState = oldState copy (version = oldState.version + 1, - history = oldState.history + (connection -> newHistory), - timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp - - // if we won the race then update else try again - if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur - } - - /** - * The suspicion level of the accrual failure detector. - * - * If a connection does not have any records in failure detector then it is - * considered healthy. - */ - def phi(connection: Address): Double = { - val oldState = state.get - val oldTimestamp = oldState.timestamps.get(connection) - - if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections - else { - val timeDiff = clock() - oldTimestamp.get - - val history = oldState.history(connection) - val mean = history.mean - val stdDeviation = ensureValidStdDeviation(history.stdDeviation) - - val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation) - - // FIXME change to debug log level, when failure detector is stable - if (φ > 1.0 && timeDiff < (acceptableHeartbeatPauseMillis + 5000)) - log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]", - φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")") - - φ - } - } - - private[cluster] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = { - val cdf = cumulativeDistributionFunction(timeDiff, mean, stdDeviation) - -math.log10(1.0 - cdf) - } - - private val minStdDeviationMillis = minStdDeviation.toMillis - - private def ensureValidStdDeviation(stdDeviation: Double): Double = math.max(stdDeviation, minStdDeviationMillis) - - /** - * Cumulative distribution function for N(mean, stdDeviation) normal distribution. - * This is an approximation defined in β Mathematics Handbook. - */ - private[cluster] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = { - val y = (x - mean) / stdDeviation - // Cumulative distribution function for N(0, 1) - 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))) - } - - /** - * Removes the heartbeat management for a connection. - */ - @tailrec - final def remove(connection: Address): Unit = { - if (isMonitoring(connection)) - log.info("Remove heartbeat connection [{}] ", connection) - - val oldState = state.get - - if (oldState.history.contains(connection)) { - val newState = oldState copy (version = oldState.version + 1, - history = oldState.history - connection, - timestamps = oldState.timestamps - connection) - - // if we won the race then update else try again - if (!state.compareAndSet(oldState, newState)) remove(connection) // recur - } - } - - def reset(): Unit = { - @tailrec - def doReset(): Unit = { - val oldState = state.get - val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty) - // if we won the race then update else try again - if (!state.compareAndSet(oldState, newState)) doReset() // recur - } - log.debug("Resetting failure detector") - doReset() - } -} - -private[cluster] object HeartbeatHistory { - - /** - * Create an empty HeartbeatHistory, without any history. - * Can only be used as starting point for appending intervals. - * The stats (mean, variance, stdDeviation) are not defined for - * for empty HeartbeatHistory, i.e. throws AritmeticException. - */ - def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory( - maxSampleSize = maxSampleSize, - intervals = immutable.IndexedSeq.empty, - intervalSum = 0L, - squaredIntervalSum = 0L) - -} - -/** - * Holds the heartbeat statistics for a specific node Address. - * It is capped by the number of samples specified in `maxSampleSize`. - * - * The stats (mean, variance, stdDeviation) are not defined for - * for empty HeartbeatHistory, i.e. throws AritmeticException. - */ -private[cluster] case class HeartbeatHistory private ( - maxSampleSize: Int, - intervals: immutable.IndexedSeq[Long], - intervalSum: Long, - squaredIntervalSum: Long) { - - if (maxSampleSize < 1) - throw new IllegalArgumentException("maxSampleSize must be >= 1, got [%s]" format maxSampleSize) - if (intervalSum < 0L) - throw new IllegalArgumentException("intervalSum must be >= 0, got [%s]" format intervalSum) - if (squaredIntervalSum < 0L) - throw new IllegalArgumentException("squaredIntervalSum must be >= 0, got [%s]" format squaredIntervalSum) - - def mean: Double = intervalSum.toDouble / intervals.size - - def variance: Double = (squaredIntervalSum.toDouble / intervals.size) - (mean * mean) - - def stdDeviation: Double = math.sqrt(variance) - - @tailrec - final def :+(interval: Long): HeartbeatHistory = { - if (intervals.size < maxSampleSize) - HeartbeatHistory( - maxSampleSize, - intervals = intervals :+ interval, - intervalSum = intervalSum + interval, - squaredIntervalSum = squaredIntervalSum + pow2(interval)) - else - dropOldest :+ interval // recur - } - - private def dropOldest: HeartbeatHistory = HeartbeatHistory( - maxSampleSize, - intervals = intervals drop 1, - intervalSum = intervalSum - intervals.head, - squaredIntervalSum = squaredIntervalSum - pow2(intervals.head)) - - private def pow2(x: Long) = x * x -} diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index e8bd2e21fc..5d9f3e7146 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicReference import akka.util.internal.HashedWheelTimer import scala.concurrent.{ ExecutionContext, Await } import com.typesafe.config.ConfigFactory +import akka.remote.DefaultFailureDetectorRegistry +import akka.remote.FailureDetector +import com.typesafe.config.Config /** * Cluster Extension Id and factory for creating Cluster extension. @@ -73,12 +76,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { log.info("Cluster Node [{}] - is starting up...", selfAddress) - val failureDetector: FailureDetector = { - import settings.{ FailureDetectorImplementationClass ⇒ fqcn } - system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, List(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ - case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) - }).get + val failureDetector: FailureDetectorRegistry[Address] = { + def createFailureDetector(): FailureDetector = { + import settings.{ FailureDetectorImplementationClass ⇒ fqcn } + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({ + case e ⇒ throw new ConfigurationException( + s"Could not create custom cluster failure detector [$fqcn] due to: ${e.toString}", e) + }).get + } + + new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) } // ======================================================== diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 2020c67993..e4b36d6f5a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -16,70 +16,60 @@ import scala.concurrent.duration.FiniteDuration import akka.japi.Util.immutableSeq class ClusterSettings(val config: Config, val systemName: String) { - import config._ - final val FailureDetectorThreshold: Double = { - getDouble("akka.cluster.failure-detector.threshold") - } requiring (_ > 0.0, "failure-detector.threshold must be > 0") - final val FailureDetectorMaxSampleSize: Int = { - getInt("akka.cluster.failure-detector.max-sample-size") - } requiring (_ > 0, "failure-detector.max-sample-size must be > 0") - final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class") - final val FailureDetectorMinStdDeviation: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) - } requiring (_ > Duration.Zero, "failure-detector.min-std-deviation must be > 0") - final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - } requiring (_ >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0") + private val cc = config.getConfig("akka.cluster") + + final val FailureDetectorConfig: Config = cc.getConfig("failure-detector") + final val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class") final val HeartbeatInterval: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0") final val HeartbeatRequestDelay: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.grace-period"), MILLISECONDS) + Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0") final val HeartbeatExpectedResponseAfter: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.expected-response-after"), MILLISECONDS) + Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0") final val HeartbeatRequestTimeToLive: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.time-to-live"), MILLISECONDS) + Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0") final val NumberOfEndHeartbeats: Int = { - getInt("akka.cluster.failure-detector.nr-of-end-heartbeats") + FailureDetectorConfig.getInt("nr-of-end-heartbeats") } requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0") final val MonitoredByNrOfMembers: Int = { - getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") + FailureDetectorConfig.getInt("monitored-by-nr-of-members") } requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0") final val SeedNodes: immutable.IndexedSeq[Address] = - immutableSeq(getStringList("akka.cluster.seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector - final val SeedNodeTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS) - final val PeriodicTasksInitialDelay: FiniteDuration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - final val GossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) - final val LeaderActionsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) - final val UnreachableNodesReaperInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) - final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") - final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") + immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector + final val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS) + final val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS) + final val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS) + final val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS) + final val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) + final val PublishStatsInterval: FiniteDuration = Duration(cc.getMilliseconds("publish-stats-interval"), MILLISECONDS) + final val AutoJoin: Boolean = cc.getBoolean("auto-join") + final val AutoDown: Boolean = cc.getBoolean("auto-down") final val MinNrOfMembers: Int = { - getInt("akka.cluster.min-nr-of-members") + cc.getInt("min-nr-of-members") } requiring (_ > 0, "min-nr-of-members must be > 0") - final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled") - final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { + final val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") + final val UseDispatcher: String = cc.getString("use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId case id ⇒ id } - final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability") - final val MaxGossipMergeRate: Double = getDouble("akka.cluster.max-gossip-merge-rate") - final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) - final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") - final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled") - final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class") + final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") + final val MaxGossipMergeRate: Double = cc.getDouble("max-gossip-merge-rate") + final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS) + final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") + final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled") + final val MetricsCollectorClass: String = cc.getString("metrics.collector-class") final val MetricsInterval: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.metrics.collect-interval"), MILLISECONDS) + Duration(cc.getMilliseconds("metrics.collect-interval"), MILLISECONDS) } requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0") - final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS) + final val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS) final val MetricsMovingAverageHalfLife: FiniteDuration = { - Duration(getMilliseconds("akka.cluster.metrics.moving-average-half-life"), MILLISECONDS) + Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS) } requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0") } diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala deleted file mode 100644 index dbb17ac80a..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.cluster - -import akka.actor.Address - -/** - * Interface for Akka failure detectors. - */ -trait FailureDetector { - - /** - * Returns true if the connection is considered to be up and healthy and returns false otherwise. - */ - def isAvailable(connection: Address): Boolean - - /** - * Returns true if the failure detector has received any heartbeats and started monitoring - * of the resource. - */ - def isMonitoring(connection: Address): Boolean - - /** - * Records a heartbeat for a connection. - */ - def heartbeat(connection: Address): Unit - - /** - * Removes the heartbeat management for a connection. - */ - def remove(connection: Address): Unit - - /** - * Removes all connections and starts over. - */ - def reset(): Unit -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 1027a14279..7acd102444 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -4,10 +4,8 @@ package akka.cluster import language.implicitConversions - import org.scalatest.Suite import org.scalatest.exceptions.TestFailedException - import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.remote.testconductor.RoleName @@ -19,6 +17,7 @@ import akka.event.Logging.ErrorLevel import scala.concurrent.duration._ import scala.collection.immutable import java.util.concurrent.ConcurrentHashMap +import akka.remote.DefaultFailureDetectorRegistry object MultiNodeClusterSpec { @@ -258,20 +257,32 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS * [[akka.cluster.FailureDetectorPuppet]] is used as * failure detector. */ - def markNodeAsAvailable(address: Address): Unit = cluster.failureDetector match { - case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsAvailable(address) - case _ ⇒ - } + def markNodeAsAvailable(address: Address): Unit = + failureDetectorPuppet(address) foreach (_.markNodeAsAvailable()) /** * Marks a node as unavailable in the failure detector if * [[akka.cluster.FailureDetectorPuppet]] is used as * failure detector. */ - def markNodeAsUnavailable(address: Address): Unit = cluster.failureDetector match { - case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsUnavailable(address) - case _ ⇒ + def markNodeAsUnavailable(address: Address): Unit = { + if (isFailureDetectorPuppet) { + // before marking it as unavailble there must be at least one heartbeat + // to create the FailureDetectorPuppet in the FailureDetectorRegistry + cluster.failureDetector.heartbeat(address) + failureDetectorPuppet(address) foreach (_.markNodeAsUnavailable()) + } } + private def isFailureDetectorPuppet: Boolean = + cluster.settings.FailureDetectorImplementationClass == classOf[FailureDetectorPuppet].getName + + private def failureDetectorPuppet(address: Address): Option[FailureDetectorPuppet] = + cluster.failureDetector match { + case reg: DefaultFailureDetectorRegistry[Address] ⇒ + reg.failureDetector(address) collect { case p: FailureDetectorPuppet ⇒ p } + case _ ⇒ None + } + } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 16a163c026..d9dfa9b38f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -27,6 +27,8 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.StandardMetrics.Cpu import akka.cluster.StandardMetrics.HeapMemory +import akka.remote.DefaultFailureDetectorRegistry +import akka.remote.PhiAccrualFailureDetector import akka.remote.RemoteScope import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -328,12 +330,19 @@ object StressMultiJvmSpec extends MultiNodeConfig { */ class PhiObserver extends Actor with ActorLogging { val cluster = Cluster(context.system) - val fd = cluster.failureDetector.asInstanceOf[AccrualFailureDetector] var reportTo: Option[ActorRef] = None val emptyPhiByNode = Map.empty[Address, PhiValue].withDefault(address ⇒ PhiValue(address, 0, 0, 0.0)) var phiByNode = emptyPhiByNode var nodes = Set.empty[Address] + def phi(address: Address): Double = cluster.failureDetector match { + case reg: DefaultFailureDetectorRegistry[Address] ⇒ reg.failureDetector(address) match { + case Some(fd: PhiAccrualFailureDetector) ⇒ fd.phi + case _ ⇒ 0.0 + } + case _ ⇒ 0.0 + } + import context.dispatcher val checkPhiTask = context.system.scheduler.schedule( 1.second, 1.second, self, PhiTick) @@ -350,8 +359,8 @@ object StressMultiJvmSpec extends MultiNodeConfig { case PhiTick ⇒ nodes foreach { node ⇒ val previous = phiByNode(node) - val φ = fd.phi(node) - if (φ > 0 || fd.isMonitoring(node)) { + val φ = phi(node) + if (φ > 0 || cluster.failureDetector.isMonitoring(node)) { val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0 phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1, math.max(previous.max, φ)) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala deleted file mode 100644 index 7f0b5bea14..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.cluster - -import akka.actor.Address -import akka.testkit._ -import akka.testkit.TestEvent._ -import scala.collection.immutable -import scala.concurrent.duration._ - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class AccrualFailureDetectorSpec extends AkkaSpec(""" - actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.loglevel = "INFO" -""") { - - override def atStartup(): Unit = { - super.atStartup() - if (!log.isDebugEnabled) { - system.eventStream.publish(Mute(EventFilter.info(pattern = ".*Phi value.*"))) - } - } - - "An AccrualFailureDetector" must { - val conn = Address("akka.tcp", "", "localhost", 2552) - val conn2 = Address("akka.tcp", "", "localhost", 2553) - - def fakeTimeGenerator(timeIntervals: immutable.Seq[Long]): () ⇒ Long = { - var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c)) - def timeGenerator(): Long = { - val currentTime = times.head - times = times.tail - currentTime - } - timeGenerator - } - - val defaultFakeTimeIntervals = Vector.fill(20)(1000L) - def createFailureDetector( - threshold: Double = 8.0, - maxSampleSize: Int = 1000, - minStdDeviation: Duration = 10.millis, - acceptableLostDuration: Duration = Duration.Zero, - firstHeartbeatEstimate: Duration = 1.second, - clock: () ⇒ Long = fakeTimeGenerator(defaultFakeTimeIntervals)): AccrualFailureDetector = - new AccrualFailureDetector(system, - threshold, - maxSampleSize, - minStdDeviation, - acceptableLostDuration, - firstHeartbeatEstimate = firstHeartbeatEstimate, - clock = clock) - - "use good enough cumulative distribution function" in { - val fd = createFailureDetector() - fd.cumulativeDistributionFunction(0.0, 0, 1) must be(0.5 plusOrMinus (0.001)) - fd.cumulativeDistributionFunction(0.6, 0, 1) must be(0.7257 plusOrMinus (0.001)) - fd.cumulativeDistributionFunction(1.5, 0, 1) must be(0.9332 plusOrMinus (0.001)) - fd.cumulativeDistributionFunction(2.0, 0, 1) must be(0.97725 plusOrMinus (0.01)) - fd.cumulativeDistributionFunction(2.5, 0, 1) must be(0.9379 plusOrMinus (0.1)) - fd.cumulativeDistributionFunction(3.5, 0, 1) must be(0.99977 plusOrMinus (0.1)) - fd.cumulativeDistributionFunction(4.0, 0, 1) must be(0.99997 plusOrMinus (0.1)) - - for (x :: y :: Nil ← (0.0 to 4.0 by 0.1).toList.sliding(2)) { - fd.cumulativeDistributionFunction(x, 0, 1) must be < ( - fd.cumulativeDistributionFunction(y, 0, 1)) - } - - fd.cumulativeDistributionFunction(2.2, 2.0, 0.3) must be(0.7475 plusOrMinus (0.001)) - } - - "return realistic phi values" in { - val fd = createFailureDetector() - val test = immutable.TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3) - for ((timeDiff, expectedPhi) ← test) { - fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1)) - } - - // larger stdDeviation results => lower phi - fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 500.0) must be < ( - fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 100.0)) - } - - "return phi value of 0.0 on startup for each address, when no heartbeats" in { - val fd = createFailureDetector() - fd.phi(conn) must be(0.0) - fd.phi(conn2) must be(0.0) - } - - "return phi based on guess when only one heartbeat" in { - val timeInterval = List[Long](0, 1000, 1000, 1000, 1000) - val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds, - clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) - fd.phi(conn) must be(0.3 plusOrMinus 0.2) - fd.phi(conn) must be(4.5 plusOrMinus 0.3) - fd.phi(conn) must be > (15.0) - } - - "return phi value using first interval after second heartbeat" in { - val timeInterval = List[Long](0, 100, 100, 100) - val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) - fd.phi(conn) must be > (0.0) - fd.heartbeat(conn) - fd.phi(conn) must be > (0.0) - } - - "mark node as available after a series of successful heartbeats" in { - val timeInterval = List[Long](0, 1000, 100, 100) - val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - - fd.isAvailable(conn) must be(true) - } - - "mark node as available after explicit removal of connection" in { - val timeInterval = List[Long](0, 1000, 100, 100, 100) - val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.isAvailable(conn) must be(true) - fd.remove(conn) - - fd.isAvailable(conn) must be(true) - } - - "mark node as available after explicit removal of connection and receiving heartbeat again" in { - val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) //0 - - fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //1100 - - fd.isAvailable(conn) must be(true) //2200 - - fd.remove(conn) - - fd.isAvailable(conn) must be(true) //3300 - - // it receives heartbeat from an explicitly removed node - fd.heartbeat(conn) //4400 - fd.heartbeat(conn) //5500 - fd.heartbeat(conn) //6600 - - fd.isAvailable(conn) must be(true) //6700 - } - - "mark node as dead if heartbeat are missed" in { - val timeInterval = List[Long](0, 1000, 100, 100, 7000) - val ft = fakeTimeGenerator(timeInterval) - val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) //0 - fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //1100 - - fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) //8200 - } - - "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { - val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100) - val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) //0 - fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //1100 - fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) //8200 - fd.heartbeat(conn) //8300 - fd.heartbeat(conn) //9300 - fd.heartbeat(conn) //9400 - - fd.isAvailable(conn) must be(true) //9500 - } - - "accept some configured missing heartbeats" in { - val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000) - val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.isAvailable(conn) must be(true) - fd.heartbeat(conn) - fd.isAvailable(conn) must be(true) - } - - "fail after configured acceptable missing heartbeats" in { - val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000) - val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval)) - - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.isAvailable(conn) must be(true) - fd.heartbeat(conn) - fd.isAvailable(conn) must be(false) - } - - "use maxSampleSize heartbeats" in { - val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval)) - - // 100 ms interval - fd.heartbeat(conn) //0 - fd.heartbeat(conn) //100 - fd.heartbeat(conn) //200 - fd.heartbeat(conn) //300 - val phi1 = fd.phi(conn) //400 - // 1000 ms interval, should become same phi when 100 ms intervals have been dropped - fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //2000 - fd.heartbeat(conn) //3000 - fd.heartbeat(conn) //4000 - val phi2 = fd.phi(conn) //5000 - phi2 must be(phi1.plusOrMinus(0.001)) - } - - } - - "Statistics for heartbeats" must { - - "calculate correct mean and variance" in { - val samples = Seq(100, 200, 125, 340, 130) - val stats = (HeartbeatHistory(maxSampleSize = 20) /: samples) { (stats, value) ⇒ stats :+ value } - stats.mean must be(179.0 plusOrMinus 0.00001) - stats.variance must be(7584.0 plusOrMinus 0.00001) - } - - "have 0.0 variance for one sample" in { - (HeartbeatHistory(600) :+ 1000L).variance must be(0.0 plusOrMinus 0.00001) - } - - "be capped by the specified maxSampleSize" in { - val history3 = HeartbeatHistory(maxSampleSize = 3) :+ 100 :+ 110 :+ 90 - history3.mean must be(100.0 plusOrMinus 0.00001) - history3.variance must be(66.6666667 plusOrMinus 0.00001) - - val history4 = history3 :+ 140 - history4.mean must be(113.333333 plusOrMinus 0.00001) - history4.variance must be(422.222222 plusOrMinus 0.00001) - - val history5 = history4 :+ 80 - history5.mean must be(103.333333 plusOrMinus 0.00001) - history5.variance must be(688.88888889 plusOrMinus 0.00001) - - } - } -} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 4479520833..39e273d345 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -5,10 +5,10 @@ package akka.cluster import language.postfixOps - import akka.testkit.AkkaSpec import akka.dispatch.Dispatchers import scala.concurrent.duration._ +import akka.remote.PhiAccrualFailureDetector @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ClusterConfigSpec extends AkkaSpec { @@ -18,11 +18,11 @@ class ClusterConfigSpec extends AkkaSpec { "be able to parse generic cluster config elements" in { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001) - FailureDetectorMaxSampleSize must be(1000) - FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName) - FailureDetectorMinStdDeviation must be(100 millis) - FailureDetectorAcceptableHeartbeatPause must be(3 seconds) + FailureDetectorConfig.getDouble("threshold") must be(8.0 plusOrMinus 0.0001) + FailureDetectorConfig.getInt("max-sample-size") must be(1000) + Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) + Duration(FailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(3 seconds) + FailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) SeedNodes must be(Seq.empty[String]) SeedNodeTimeout must be(5 seconds) PeriodicTasksInitialDelay must be(1 seconds) diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index e212da2f74..40848f340c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -4,64 +4,35 @@ package akka.cluster -import akka.actor.{ Address, ActorSystem } -import akka.event.{ Logging, LogSource } +import java.util.concurrent.atomic.AtomicReference import akka.remote.testkit.MultiNodeConfig +import akka.remote.FailureDetector +import com.typesafe.config.Config /** * User controllable "puppet" failure detector. */ -class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { - import java.util.concurrent.ConcurrentHashMap +class FailureDetectorPuppet(config: Config) extends FailureDetector { trait Status object Up extends Status object Down extends Status + object Unknown extends Status - implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { - def genString(o: AnyRef): String = o.getClass.getName - override def getClazz(o: AnyRef): Class[_] = o.getClass + private val status: AtomicReference[Status] = new AtomicReference(Unknown) + + def markNodeAsUnavailable(): Unit = status.set(Down) + + def markNodeAsAvailable(): Unit = status.set(Up) + + override def isAvailable: Boolean = status.get match { + case Unknown | Up ⇒ true + case Down ⇒ false } - private val log = Logging(system, this) + override def isMonitoring: Boolean = status.get != Unknown - private val connections = new ConcurrentHashMap[Address, Status] + override def heartbeat(): Unit = status.compareAndSet(Unknown, Up) - def markNodeAsUnavailable(connection: Address): this.type = { - connections.put(connection, Down) - this - } - - def markNodeAsAvailable(connection: Address): this.type = { - connections.put(connection, Up) - this - } - - def isAvailable(connection: Address): Boolean = connections.get(connection) match { - case null ⇒ - log.debug("Adding cluster node [{}]", connection) - connections.put(connection, Up) - true - case Up ⇒ - log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) - true - case Down ⇒ - log.debug("isAvailable: Cluster node IS available [{}]", connection) - false - } - - override def isMonitoring(connection: Address): Boolean = connections.contains(connection) - - def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) - - def remove(connection: Address): Unit = { - log.debug("Removing cluster node [{}]", connection) - connections.remove(connection) - } - - def reset(): Unit = { - log.debug("Resetting failure detector") - connections.clear() - } } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 368706a8c7..bcbff6d439 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -130,17 +130,23 @@ akka { ### Failure detection and recovery - # how often should keep-alive heartbeat messages sent to connections. - heartbeat-interval = 1 s - # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the remoting subsystem to detect failed connections. failure-detector { - # defines the failure detector threshold + + # FQCN of the failure detector implementation. + # It must implement akka.remote.FailureDetector and have + # a constructor with a com.typesafe.config.Config parameter. + implementation-class = "akka.remote.PhiAccrualFailureDetector" + + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 1 s + + # Defines the failure detector threshold. # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high # threshold generates fewer mistakes but needs more time to detect - # actual crashes + # actual crashes. threshold = 7.0 # Number of the samples of inter-heartbeat arrival times to adaptively @@ -155,7 +161,6 @@ akka { # Number of potentially lost/delayed heartbeats that will be # accepted before considering it to be an anomaly. - # It is a factor of heartbeat-interval. # This margin is important to be able to survive sudden, occasional, # pauses in heartbeat arrivals, due to for example garbage collect or # network drop. diff --git a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala index 7053e77603..064fd17587 100644 --- a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala @@ -16,7 +16,7 @@ import java.util.concurrent.locks.{ ReentrantLock, Lock } * By-name parameter that returns the failure detector instance to be used by a newly registered resource * */ -class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetector) extends FailureDetectorRegistry[A] { +class DefaultFailureDetectorRegistry[A](detectorFactory: () ⇒ FailureDetector) extends FailureDetectorRegistry[A] { private val resourceToFailureDetector = new AtomicReference[Map[A, FailureDetector]](Map()) private final val failureDetectorCreationLock: Lock = new ReentrantLock @@ -73,5 +73,13 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec if (!resourceToFailureDetector.compareAndSet(oldTable, Map.empty[A, FailureDetector])) reset() // recur } + + /** + * INTERNAL API + * Get the underlying FailureDetector for a resource. + */ + private[akka] def failureDetector(resource: A): Option[FailureDetector] = + resourceToFailureDetector.get.get(resource) + } diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index a4117ebeaa..c0d418da65 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -5,9 +5,12 @@ package akka.remote import akka.remote.FailureDetector.Clock import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.TimeUnit.MILLISECONDS import scala.annotation.tailrec +import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.collection.immutable +import com.typesafe.config.Config /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: @@ -57,6 +60,26 @@ class PhiAccrualFailureDetector( val firstHeartbeatEstimate: FiniteDuration)( implicit clock: Clock) extends FailureDetector { + /** + * Constructor that reads parameters from config. + * Expecting config properties named `threshold`, `max-sample-size`, + * `min-std-deviation`, `acceptable-heartbeat-pause` and + * `heartbeat-interval`. + */ + def this(config: Config) = + this( + threshold = config.getDouble("threshold"), + maxSampleSize = config.getInt("max-sample-size"), + minStdDeviation = Duration(config.getMilliseconds("min-std-deviation"), MILLISECONDS), + acceptableHeartbeatPause = Duration(config.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS), + firstHeartbeatEstimate = Duration(config.getMilliseconds("heartbeat-interval"), MILLISECONDS)) + + require(threshold > 0.0, "failure-detector.threshold must be > 0") + require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0") + require(minStdDeviation > Duration.Zero, "failure-detector.min-std-deviation must be > 0") + require(acceptableHeartbeatPause >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0") + require(firstHeartbeatEstimate > Duration.Zero, "failure-detector.heartbeat-interval must be > 0") + // guess statistics for first heartbeat, // important so that connections with only one heartbeat becomes unavailable private val firstHeartbeat: HeartbeatHistory = { diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 85cb870727..cdb14055ba 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal import scala.util.{ Success, Failure } import scala.collection.immutable import akka.remote.transport.ActorTransportAdapter._ +import akka.ConfigurationException class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace { def this(msg: String) = this(msg, null) @@ -30,18 +31,15 @@ private[remote] class AkkaProtocolSettings(config: Config) { import config._ - val FailureDetectorThreshold: Double = getDouble("akka.remote.failure-detector.threshold") + val FailureDetectorConfig: Config = getConfig("akka.remote.failure-detector") - val FailureDetectorMaxSampleSize: Int = getInt("akka.remote.failure-detector.max-sample-size") - - val FailureDetectorStdDeviation: FiniteDuration = - Duration(getMilliseconds("akka.remote.failure-detector.min-std-deviation"), MILLISECONDS) + val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class") val AcceptableHeartBeatPause: FiniteDuration = - Duration(getMilliseconds("akka.remote.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) + Duration(FailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) val HeartBeatInterval: FiniteDuration = - Duration(getMilliseconds("akka.remote.heartbeat-interval"), MILLISECONDS) + Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) val WaitActivityEnabled: Boolean = getBoolean("akka.remote.wait-activity-enabled") @@ -143,12 +141,14 @@ private[transport] class AkkaProtocolManager( failureDetector)), actorNameFor(remoteAddress)) // Why don't we watch this one? } - private def createFailureDetector(): FailureDetector = new PhiAccrualFailureDetector( - settings.FailureDetectorThreshold, - settings.FailureDetectorMaxSampleSize, - settings.FailureDetectorStdDeviation, - settings.AcceptableHeartBeatPause, - settings.HeartBeatInterval) + private def createFailureDetector(): FailureDetector = { + import settings.{ FailureDetectorImplementationClass ⇒ fqcn } + context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({ + case e ⇒ throw new ConfigurationException( + s"Could not create custom remote failure detector [$fqcn] due to: ${e.toString}", e) + }).get + } override def postStop() { wrappedTransport.shutdown() diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 46d3341db2..23c301b6ed 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -47,13 +47,16 @@ class RemoteConfigSpec extends AkkaSpec( import settings._ WaitActivityEnabled must be(true) - FailureDetectorThreshold must be === 7 - FailureDetectorMaxSampleSize must be === 100 - FailureDetectorStdDeviation must be === 100.milliseconds + FailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) AcceptableHeartBeatPause must be === 3.seconds HeartBeatInterval must be === 1.seconds RequireCookie must be(false) SecureCookie must be === "" + + FailureDetectorConfig.getDouble("threshold") must be(7.0 plusOrMinus 0.0001) + FailureDetectorConfig.getInt("max-sample-size") must be(100) + Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) + } "contain correct configuration values in reference.conf" ignore { diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 6c382e5e10..7d6823d5a2 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -36,14 +36,14 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re akka.remote { failure-detector { + implementation-class = "akka.remote.PhiAccrualFailureDetector" threshold = 7.0 max-sample-size = 100 min-std-deviation = 100 ms acceptable-heartbeat-pause = 3 s + heartbeat-interval = 0.1 s } - heartbeat-interval = 0.1 s - wait-activity-enabled = on backoff-interval = 1 s