add heartbeat delays warning log to remoteWatcer #17389 (#23135)

* add heartbeat delays warning log to remoteWatcer #17389

* using eventStream for logging

* add backward compatibility constructor

* make eventstream Option
This commit is contained in:
fady zohdy 2017-06-16 13:14:49 +02:00 committed by Patrik Nordwall
parent e9332de648
commit 398e3d0bd3

View file

@ -3,6 +3,8 @@
*/
package akka.remote
import akka.event.Logging.Warning
import akka.event.jul.Logger
import akka.remote.FailureDetector.Clock
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
@ -33,23 +35,18 @@ import akka.util.Helpers.ConfigOps
* @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
* of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
* actual crashes
*
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
* inter-arrival times.
*
* @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi.
* Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations
* in heartbeat inter arrival times.
*
* @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed
* heartbeats that will be accepted before considering it to be an anomaly.
* This margin is important to be able to survive sudden, occasional, pauses in heartbeat
* arrivals, due to for example garbage collect or network drop.
*
* @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to
* to this duration, with a with rather high standard deviation (since environment is unknown
* in the beginning)
*
* @param clock The clock, returning current time in milliseconds, but can be faked for testing
* purposes. It is only used for measuring intervals (duration).
*/
@ -58,23 +55,38 @@ class PhiAccrualFailureDetector(
val maxSampleSize: Int,
val minStdDeviation: FiniteDuration,
val acceptableHeartbeatPause: FiniteDuration,
val firstHeartbeatEstimate: FiniteDuration)(
val firstHeartbeatEstimate: FiniteDuration,
eventStream: Option[EventStream])(
implicit
clock: Clock) extends FailureDetector {
/**
* Constructor without eventStream to support backwards compatibility
*/
def this(
threshold: Double,
maxSampleSize: Int,
minStdDeviation: FiniteDuration,
acceptableHeartbeatPause: FiniteDuration,
firstHeartbeatEstimate: FiniteDuration)(implicit clock: Clock) =
this(
threshold, maxSampleSize, minStdDeviation, acceptableHeartbeatPause, firstHeartbeatEstimate, None
)(clock)
/**
* Constructor that reads parameters from config.
* Expecting config properties named `threshold`, `max-sample-size`,
* `min-std-deviation`, `acceptable-heartbeat-pause` and
* `heartbeat-interval`.
*/
def this(config: Config, ev: EventStream) =
def this(config: Config, ev: Option[EventStream]) =
this(
threshold = config.getDouble("threshold"),
maxSampleSize = config.getInt("max-sample-size"),
minStdDeviation = config.getMillisDuration("min-std-deviation"),
acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause"),
firstHeartbeatEstimate = config.getMillisDuration("heartbeat-interval"))
firstHeartbeatEstimate = config.getMillisDuration("heartbeat-interval"),
ev)
require(threshold > 0.0, "failure-detector.threshold must be > 0")
require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0")
@ -122,8 +134,11 @@ class PhiAccrualFailureDetector(
// this is a known connection
val interval = timestamp - latestTimestamp
// don't use the first heartbeat after failure for the history, since a long pause will skew the stats
if (isAvailable(timestamp)) oldState.history :+ interval
else oldState.history
if (isAvailable(timestamp)) {
if (interval >= (acceptableHeartbeatPauseMillis / 2) && eventStream.isDefined)
eventStream.get.publish(Warning(this.toString, getClass, s"heartbeat interval is growing too large: $interval millis"))
oldState.history :+ interval
} else oldState.history
}
val newState = oldState.copy(history = newHistory, timestamp = Some(timestamp)) // record new timestamp