From 96c84a1df6acd99462ed7eb2439178217c51a7f0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 May 2015 07:57:14 +0200 Subject: [PATCH] =rem #17567 Adjust parameters for DeadlineFailureDetector To be more aligned with PhiAccrualFailureDetector the DeadlineFailureDetector should trigger after heartbeat-interval + acceptable-heartbeat-pause --- .../src/main/resources/reference.conf | 3 +++ akka-remote/src/main/resources/reference.conf | 8 +++++- .../akka/remote/DeadlineFailureDetector.scala | 25 +++++++++++++++---- .../remote/DeadlineFailureDetectorSpec.scala | 14 +++++------ .../scala/akka/remote/RemoteConfigSpec.scala | 2 +- 5 files changed, 38 insertions(+), 14 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d94ab0da5b..1bf7f8ec23 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -97,6 +97,9 @@ akka { # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the cluster subsystem to detect unreachable # members. + # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within + # the duration heartbeat-interval + acceptable-heartbeat-pause + threshold_adjustment, + # i.e. around 5.5 seconds with default settings. failure-detector { # FQCN of the failure detector implementation. diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 85c553c78d..353cb98580 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -155,6 +155,9 @@ akka { # Settings for the failure detector to monitor connections. # For TCP it is not important to have fast failure detection, since # most connection failures are captured by TCP itself. + # The default DeadlineFailureDetector will trigger if there are no heartbeats within + # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds + # with the default settings. transport-failure-detector { # FQCN of the failure detector implementation. @@ -171,11 +174,14 @@ akka { # A margin to the `heartbeat-interval` is important to be able to survive sudden, # occasional, pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-heartbeat-pause = 20 s + acceptable-heartbeat-pause = 16 s } # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used for remote death watch. + # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within + # the duration heartbeat-interval + acceptable-heartbeat-pause + threshold_adjustment, + # i.e. around 12.5 seconds with default settings. watch-failure-detector { # FQCN of the failure detector implementation. diff --git a/akka-remote/src/main/scala/akka/remote/DeadlineFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/DeadlineFailureDetector.scala index 5ddf5757a6..a8e2c83bf5 100644 --- a/akka-remote/src/main/scala/akka/remote/DeadlineFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/DeadlineFailureDetector.scala @@ -3,6 +3,7 @@ */ package akka.remote +import scala.concurrent.duration._ import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import com.typesafe.config.Config @@ -14,14 +15,20 @@ import akka.util.Helpers.ConfigOps * Implementation of failure detector using an absolute timeout of missing heartbeats * to trigger unavailability. * + * [[#isAvailable]] will return `false` if there is no [[#heartbeat]] within the duration + * `heartbeatInterval + acceptableHeartbeatPause`. + * * @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed * heartbeats that will be accepted before considering it to be an anomaly. * + * @param heartbeatInterval Expected heartbeat interval + * * @param clock The clock, returning current time in milliseconds, but can be faked for testing * purposes. It is only used for measuring intervals (duration). */ class DeadlineFailureDetector( - val acceptableHeartbeatPause: FiniteDuration)( + val acceptableHeartbeatPause: FiniteDuration, + val heartbeatInterval: FiniteDuration)( implicit clock: Clock) extends FailureDetector { /** @@ -29,18 +36,26 @@ class DeadlineFailureDetector( * Expecting config properties named `acceptable-heartbeat-pause`. */ def this(config: Config, ev: EventStream) = - this(acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause")) + this( + acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause"), + heartbeatInterval = config.getMillisDuration("heartbeat-interval")) - require(acceptableHeartbeatPause >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0") + // for backwards compatibility with 2.3.x + @deprecated("Use constructor with acceptableHeartbeatPause and heartbeatInterval", "2.4") + def this(acceptableHeartbeatPause: FiniteDuration)(implicit clock: Clock) = + this(acceptableHeartbeatPause, heartbeatInterval = 1.millis)(clock) - private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis + require(acceptableHeartbeatPause >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0 s") + require(heartbeatInterval > Duration.Zero, "failure-detector.heartbeat-interval must be > 0 s") + + private val deadlineMillis = acceptableHeartbeatPause.toMillis + heartbeatInterval.toMillis @volatile private var heartbeatTimestamp = 0L //not used until active (first heartbeat) @volatile private var active = false override def isAvailable: Boolean = isAvailable(clock()) private def isAvailable(timestamp: Long): Boolean = - if (active) (heartbeatTimestamp + acceptableHeartbeatPauseMillis) > timestamp + if (active) (heartbeatTimestamp + deadlineMillis) > timestamp else true // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections override def isMonitoring: Boolean = active diff --git a/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala index c88a42abfc..81d2567bfe 100644 --- a/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala @@ -26,11 +26,11 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { def createFailureDetector( acceptableLostDuration: FiniteDuration, clock: Clock = FailureDetector.defaultClock) = - new DeadlineFailureDetector(acceptableLostDuration)(clock = clock) + new DeadlineFailureDetector(acceptableLostDuration, heartbeatInterval = 1.second)(clock = clock) "mark node as monitored after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval)) fd.isMonitoring should ===(false) fd.heartbeat() @@ -43,7 +43,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 7000) - val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() //0 fd.heartbeat() //1000 @@ -57,7 +57,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger unreachable again val regularIntervals = 0L +: Vector.fill(999)(1000L) val timeIntervals = regularIntervals :+ (5 * 60 * 1000L) :+ 100L :+ 900L :+ 100L :+ 7000L :+ 100L :+ 900L :+ 100L :+ 900L - val fd = createFailureDetector(acceptableLostDuration = 7.seconds, clock = fakeTimeGenerator(timeIntervals)) + val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeIntervals)) for (_ ← 0 until 1000) fd.heartbeat() fd.isAvailable should ===(false) // after the long pause @@ -73,7 +73,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { "accept some configured missing heartbeats" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000) - val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() fd.heartbeat() @@ -86,7 +86,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { "fail after configured acceptable missing heartbeats" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000) - val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval)) + val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() fd.heartbeat() @@ -101,4 +101,4 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { } -} \ No newline at end of file +} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 35542dc193..e5e66652f4 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -71,7 +71,7 @@ class RemoteConfigSpec extends AkkaSpec( TransportFailureDetectorImplementationClass should ===(classOf[DeadlineFailureDetector].getName) TransportHeartBeatInterval should ===(4.seconds) - TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(20 seconds) + TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(16.seconds) }