From 398e3d0bd30caac87dfe00425a49ea984ae58807 Mon Sep 17 00:00:00 2001 From: fady zohdy Date: Fri, 16 Jun 2017 13:14:49 +0200 Subject: [PATCH] add heartbeat delays warning log to remoteWatcer #17389 (#23135) * add heartbeat delays warning log to remoteWatcer #17389 * using eventStream for logging * add backward compatibility constructor * make eventstream Option --- .../remote/PhiAccrualFailureDetector.scala | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index 5dca505f49..b523fa4f17 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -3,6 +3,8 @@ */ package akka.remote +import akka.event.Logging.Warning +import akka.event.jul.Logger import akka.remote.FailureDetector.Clock import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec @@ -33,23 +35,18 @@ import akka.util.Helpers.ConfigOps * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event * of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect * actual crashes - * * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of * inter-arrival times. - * * @param minStdDeviation Minimum standard deviation to use for the normal distribution used when calculating phi. * Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations * in heartbeat inter arrival times. - * * @param acceptableHeartbeatPause Duration corresponding to number of potentially lost/delayed * heartbeats that will be accepted before considering it to be an anomaly. * This margin is important to be able to survive sudden, occasional, pauses in heartbeat * arrivals, due to for example garbage collect or network drop. - * * @param firstHeartbeatEstimate Bootstrap the stats with heartbeats that corresponds to * to this duration, with a with rather high standard deviation (since environment is unknown * in the beginning) - * * @param clock The clock, returning current time in milliseconds, but can be faked for testing * purposes. It is only used for measuring intervals (duration). */ @@ -58,23 +55,38 @@ class PhiAccrualFailureDetector( val maxSampleSize: Int, val minStdDeviation: FiniteDuration, val acceptableHeartbeatPause: FiniteDuration, - val firstHeartbeatEstimate: FiniteDuration)( + val firstHeartbeatEstimate: FiniteDuration, + eventStream: Option[EventStream])( implicit clock: Clock) extends FailureDetector { + /** + * Constructor without eventStream to support backwards compatibility + */ + def this( + threshold: Double, + maxSampleSize: Int, + minStdDeviation: FiniteDuration, + acceptableHeartbeatPause: FiniteDuration, + firstHeartbeatEstimate: FiniteDuration)(implicit clock: Clock) = + this( + threshold, maxSampleSize, minStdDeviation, acceptableHeartbeatPause, firstHeartbeatEstimate, None + )(clock) + /** * Constructor that reads parameters from config. * Expecting config properties named `threshold`, `max-sample-size`, * `min-std-deviation`, `acceptable-heartbeat-pause` and * `heartbeat-interval`. */ - def this(config: Config, ev: EventStream) = + def this(config: Config, ev: Option[EventStream]) = this( threshold = config.getDouble("threshold"), maxSampleSize = config.getInt("max-sample-size"), minStdDeviation = config.getMillisDuration("min-std-deviation"), acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause"), - firstHeartbeatEstimate = config.getMillisDuration("heartbeat-interval")) + firstHeartbeatEstimate = config.getMillisDuration("heartbeat-interval"), + ev) require(threshold > 0.0, "failure-detector.threshold must be > 0") require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0") @@ -122,8 +134,11 @@ class PhiAccrualFailureDetector( // this is a known connection val interval = timestamp - latestTimestamp // don't use the first heartbeat after failure for the history, since a long pause will skew the stats - if (isAvailable(timestamp)) oldState.history :+ interval - else oldState.history + if (isAvailable(timestamp)) { + if (interval >= (acceptableHeartbeatPauseMillis / 2) && eventStream.isDefined) + eventStream.get.publish(Warning(this.toString, getClass, s"heartbeat interval is growing too large: $interval millis")) + oldState.history :+ interval + } else oldState.history } val newState = oldState.copy(history = newHistory, timestamp = Some(timestamp)) // record new timestamp