=rem #17567 Adjust parameters for DeadlineFailureDetector
To be more aligned with PhiAccrualFailureDetector the DeadlineFailureDetector should trigger after heartbeat-interval + acceptable-heartbeat-pause
This commit is contained in:
parent
122fdedd08
commit
96c84a1df6
5 changed files with 38 additions and 14 deletions
|
|
@ -97,6 +97,9 @@ akka {
|
||||||
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
||||||
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable
|
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable
|
||||||
# members.
|
# members.
|
||||||
|
# The default PhiAccrualFailureDetector will trigger if there are no heartbeats within
|
||||||
|
# the duration heartbeat-interval + acceptable-heartbeat-pause + threshold_adjustment,
|
||||||
|
# i.e. around 5.5 seconds with default settings.
|
||||||
failure-detector {
|
failure-detector {
|
||||||
|
|
||||||
# FQCN of the failure detector implementation.
|
# FQCN of the failure detector implementation.
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,9 @@ akka {
|
||||||
# Settings for the failure detector to monitor connections.
|
# Settings for the failure detector to monitor connections.
|
||||||
# For TCP it is not important to have fast failure detection, since
|
# For TCP it is not important to have fast failure detection, since
|
||||||
# most connection failures are captured by TCP itself.
|
# most connection failures are captured by TCP itself.
|
||||||
|
# The default DeadlineFailureDetector will trigger if there are no heartbeats within
|
||||||
|
# the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds
|
||||||
|
# with the default settings.
|
||||||
transport-failure-detector {
|
transport-failure-detector {
|
||||||
|
|
||||||
# FQCN of the failure detector implementation.
|
# FQCN of the failure detector implementation.
|
||||||
|
|
@ -171,11 +174,14 @@ akka {
|
||||||
# A margin to the `heartbeat-interval` is important to be able to survive sudden,
|
# A margin to the `heartbeat-interval` is important to be able to survive sudden,
|
||||||
# occasional, pauses in heartbeat arrivals, due to for example garbage collect or
|
# occasional, pauses in heartbeat arrivals, due to for example garbage collect or
|
||||||
# network drop.
|
# network drop.
|
||||||
acceptable-heartbeat-pause = 20 s
|
acceptable-heartbeat-pause = 16 s
|
||||||
}
|
}
|
||||||
|
|
||||||
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
||||||
# [Hayashibara et al]) used for remote death watch.
|
# [Hayashibara et al]) used for remote death watch.
|
||||||
|
# The default PhiAccrualFailureDetector will trigger if there are no heartbeats within
|
||||||
|
# the duration heartbeat-interval + acceptable-heartbeat-pause + threshold_adjustment,
|
||||||
|
# i.e. around 12.5 seconds with default settings.
|
||||||
watch-failure-detector {
|
watch-failure-detector {
|
||||||
|
|
||||||
# FQCN of the failure detector implementation.
|
# FQCN of the failure detector implementation.
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
*/
|
*/
|
||||||
package akka.remote
|
package akka.remote
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
@ -14,14 +15,20 @@ import akka.util.Helpers.ConfigOps
|
||||||
* Implementation of failure detector using an absolute timeout of missing heartbeats
|
* Implementation of failure detector using an absolute timeout of missing heartbeats
|
||||||
* to trigger unavailability.
|
* to trigger unavailability.
|
||||||
*
|
*
|
||||||
|
* [[#isAvailable]] will return `false` if there is no [[#heartbeat]] within the duration
|
||||||
|
* `heartbeatInterval + acceptableHeartbeatPause`.
|
||||||
|
*
|
||||||
* @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed
|
* @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed
|
||||||
* heartbeats that will be accepted before considering it to be an anomaly.
|
* heartbeats that will be accepted before considering it to be an anomaly.
|
||||||
*
|
*
|
||||||
|
* @param heartbeatInterval Expected heartbeat interval
|
||||||
|
*
|
||||||
* @param clock The clock, returning current time in milliseconds, but can be faked for testing
|
* @param clock The clock, returning current time in milliseconds, but can be faked for testing
|
||||||
* purposes. It is only used for measuring intervals (duration).
|
* purposes. It is only used for measuring intervals (duration).
|
||||||
*/
|
*/
|
||||||
class DeadlineFailureDetector(
|
class DeadlineFailureDetector(
|
||||||
val acceptableHeartbeatPause: FiniteDuration)(
|
val acceptableHeartbeatPause: FiniteDuration,
|
||||||
|
val heartbeatInterval: FiniteDuration)(
|
||||||
implicit clock: Clock) extends FailureDetector {
|
implicit clock: Clock) extends FailureDetector {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -29,18 +36,26 @@ class DeadlineFailureDetector(
|
||||||
* Expecting config properties named `acceptable-heartbeat-pause`.
|
* Expecting config properties named `acceptable-heartbeat-pause`.
|
||||||
*/
|
*/
|
||||||
def this(config: Config, ev: EventStream) =
|
def this(config: Config, ev: EventStream) =
|
||||||
this(acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause"))
|
this(
|
||||||
|
acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause"),
|
||||||
|
heartbeatInterval = config.getMillisDuration("heartbeat-interval"))
|
||||||
|
|
||||||
require(acceptableHeartbeatPause >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0")
|
// for backwards compatibility with 2.3.x
|
||||||
|
@deprecated("Use constructor with acceptableHeartbeatPause and heartbeatInterval", "2.4")
|
||||||
|
def this(acceptableHeartbeatPause: FiniteDuration)(implicit clock: Clock) =
|
||||||
|
this(acceptableHeartbeatPause, heartbeatInterval = 1.millis)(clock)
|
||||||
|
|
||||||
private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis
|
require(acceptableHeartbeatPause >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0 s")
|
||||||
|
require(heartbeatInterval > Duration.Zero, "failure-detector.heartbeat-interval must be > 0 s")
|
||||||
|
|
||||||
|
private val deadlineMillis = acceptableHeartbeatPause.toMillis + heartbeatInterval.toMillis
|
||||||
@volatile private var heartbeatTimestamp = 0L //not used until active (first heartbeat)
|
@volatile private var heartbeatTimestamp = 0L //not used until active (first heartbeat)
|
||||||
@volatile private var active = false
|
@volatile private var active = false
|
||||||
|
|
||||||
override def isAvailable: Boolean = isAvailable(clock())
|
override def isAvailable: Boolean = isAvailable(clock())
|
||||||
|
|
||||||
private def isAvailable(timestamp: Long): Boolean =
|
private def isAvailable(timestamp: Long): Boolean =
|
||||||
if (active) (heartbeatTimestamp + acceptableHeartbeatPauseMillis) > timestamp
|
if (active) (heartbeatTimestamp + deadlineMillis) > timestamp
|
||||||
else true // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
|
else true // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
|
||||||
|
|
||||||
override def isMonitoring: Boolean = active
|
override def isMonitoring: Boolean = active
|
||||||
|
|
|
||||||
|
|
@ -26,11 +26,11 @@ class DeadlineFailureDetectorSpec extends AkkaSpec {
|
||||||
def createFailureDetector(
|
def createFailureDetector(
|
||||||
acceptableLostDuration: FiniteDuration,
|
acceptableLostDuration: FiniteDuration,
|
||||||
clock: Clock = FailureDetector.defaultClock) =
|
clock: Clock = FailureDetector.defaultClock) =
|
||||||
new DeadlineFailureDetector(acceptableLostDuration)(clock = clock)
|
new DeadlineFailureDetector(acceptableLostDuration, heartbeatInterval = 1.second)(clock = clock)
|
||||||
|
|
||||||
"mark node as monitored after a series of successful heartbeats" in {
|
"mark node as monitored after a series of successful heartbeats" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 100)
|
val timeInterval = List[Long](0, 1000, 100, 100)
|
||||||
val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval))
|
val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval))
|
||||||
fd.isMonitoring should ===(false)
|
fd.isMonitoring should ===(false)
|
||||||
|
|
||||||
fd.heartbeat()
|
fd.heartbeat()
|
||||||
|
|
@ -43,7 +43,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec {
|
||||||
|
|
||||||
"mark node as dead if heartbeat are missed" in {
|
"mark node as dead if heartbeat are missed" in {
|
||||||
val timeInterval = List[Long](0, 1000, 100, 100, 7000)
|
val timeInterval = List[Long](0, 1000, 100, 100, 7000)
|
||||||
val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval))
|
val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval))
|
||||||
|
|
||||||
fd.heartbeat() //0
|
fd.heartbeat() //0
|
||||||
fd.heartbeat() //1000
|
fd.heartbeat() //1000
|
||||||
|
|
@ -57,7 +57,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec {
|
||||||
// 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger unreachable again
|
// 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger unreachable again
|
||||||
val regularIntervals = 0L +: Vector.fill(999)(1000L)
|
val regularIntervals = 0L +: Vector.fill(999)(1000L)
|
||||||
val timeIntervals = regularIntervals :+ (5 * 60 * 1000L) :+ 100L :+ 900L :+ 100L :+ 7000L :+ 100L :+ 900L :+ 100L :+ 900L
|
val timeIntervals = regularIntervals :+ (5 * 60 * 1000L) :+ 100L :+ 900L :+ 100L :+ 7000L :+ 100L :+ 900L :+ 100L :+ 900L
|
||||||
val fd = createFailureDetector(acceptableLostDuration = 7.seconds, clock = fakeTimeGenerator(timeIntervals))
|
val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeIntervals))
|
||||||
|
|
||||||
for (_ ← 0 until 1000) fd.heartbeat()
|
for (_ ← 0 until 1000) fd.heartbeat()
|
||||||
fd.isAvailable should ===(false) // after the long pause
|
fd.isAvailable should ===(false) // after the long pause
|
||||||
|
|
@ -73,7 +73,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec {
|
||||||
|
|
||||||
"accept some configured missing heartbeats" in {
|
"accept some configured missing heartbeats" in {
|
||||||
val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000)
|
val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000)
|
||||||
val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval))
|
val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval))
|
||||||
|
|
||||||
fd.heartbeat()
|
fd.heartbeat()
|
||||||
fd.heartbeat()
|
fd.heartbeat()
|
||||||
|
|
@ -86,7 +86,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec {
|
||||||
|
|
||||||
"fail after configured acceptable missing heartbeats" in {
|
"fail after configured acceptable missing heartbeats" in {
|
||||||
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000)
|
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000)
|
||||||
val fd = createFailureDetector(acceptableLostDuration = 5.seconds, clock = fakeTimeGenerator(timeInterval))
|
val fd = createFailureDetector(acceptableLostDuration = 4.seconds, clock = fakeTimeGenerator(timeInterval))
|
||||||
|
|
||||||
fd.heartbeat()
|
fd.heartbeat()
|
||||||
fd.heartbeat()
|
fd.heartbeat()
|
||||||
|
|
@ -101,4 +101,4 @@ class DeadlineFailureDetectorSpec extends AkkaSpec {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ class RemoteConfigSpec extends AkkaSpec(
|
||||||
|
|
||||||
TransportFailureDetectorImplementationClass should ===(classOf[DeadlineFailureDetector].getName)
|
TransportFailureDetectorImplementationClass should ===(classOf[DeadlineFailureDetector].getName)
|
||||||
TransportHeartBeatInterval should ===(4.seconds)
|
TransportHeartBeatInterval should ===(4.seconds)
|
||||||
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(20 seconds)
|
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(16.seconds)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue