From dcae863f7fbe4de0b57c0634daeb4e99de0416a9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 Jun 2012 13:44:40 +0200 Subject: [PATCH] Use all heartbeats in failure detector, see #2182 * Failure detector didn't use hearbeat 1 and 2 * Included heartbeat 2 in ordinary stats * For heartbeat 1 use guess stats, important so that connections with only one heartbeat becomes unavailble, the guess corresponds to 1 second interval which results in phi > 8 after 18 seconds * Improved AccrualFailureDetectorSpec --- .../akka/cluster/AccrualFailureDetector.scala | 60 ++++++------- ...LeavingAndExitingAndBeingRemovedSpec.scala | 3 +- .../cluster/NodeLeavingAndExitingSpec.scala | 3 +- .../scala/akka/cluster/NodeShutdownSpec.scala | 2 +- .../cluster/AccrualFailureDetectorSpec.scala | 84 ++++++++++--------- 5 files changed, 78 insertions(+), 74 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 3caece392c..c86eb3361e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -36,7 +36,11 @@ class AccrualFailureDetector( /** * Holds the failure statistics for a specific node Address. */ - private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) + private case class FailureStats(mean: Double = 0.0, variance: Double = 0.0, deviation: Double = 0.0) + + // guess statistics for first heartbeat, + // important so that connections with only one heartbeat becomes unavailble + private val failureStatsFirstHeartbeat = FailureStats(mean = 1000.0) /** * Implement using optimistic lockless concurrency, all state is represented @@ -72,7 +76,7 @@ class AccrualFailureDetector( // add starter records for this new connection val newState = oldState copy ( version = oldState.version + 1, - failureStats = oldState.failureStats + (connection -> FailureStats()), + failureStats = oldState.failureStats + (connection -> failureStatsFirstHeartbeat), intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]), timestamps = oldState.timestamps + (connection -> timeMachine()), explicitRemovals = oldState.explicitRemovals - connection) @@ -93,30 +97,25 @@ class AccrualFailureDetector( case _ ⇒ IndexedSeq.empty[Long] }) :+ interval - val newFailureStats = - if (newIntervalsForConnection.size > 1) { + val newFailureStats = { + val newMean: Double = newIntervalsForConnection.sum.toDouble / newIntervalsForConnection.size - val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble - - val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse { - throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history") - } - - val deviationSum = - newIntervalsForConnection - .map(_.toDouble) - .foldLeft(0.0D)((x, y) ⇒ x + (y - newMean)) - - val newVariance: Double = deviationSum / newIntervalsForConnection.size.toDouble - val newDeviation: Double = math.sqrt(newVariance) - - val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance) - oldState.failureStats + (connection -> newFailureStats) - - } else { - oldState.failureStats + val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse { + throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history") } + val deviationSum = + newIntervalsForConnection + .map(_.toDouble) + .foldLeft(0.0)((x, y) ⇒ x + (y - newMean)) + + val newVariance: Double = deviationSum / newIntervalsForConnection.size + val newDeviation: Double = math.sqrt(newVariance) + + val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance) + oldState.failureStats + (connection -> newFailureStats) + } + val newState = oldState copy (version = oldState.version + 1, failureStats = newFailureStats, intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection), @@ -132,8 +131,7 @@ class AccrualFailureDetector( * Calculates how likely it is that the connection has failed. *

* If a connection does not have any records in failure detector then it is - * considered dead. This is true either if the heartbeat have not started - * yet or the connection have been explicitly removed. + * considered healthy. *

* Implementations of 'Cumulative Distribution Function' for Exponential Distribution. * For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597]. @@ -145,21 +143,22 @@ class AccrualFailureDetector( val phi = // if connection has been removed explicitly if (oldState.explicitRemovals.contains(connection)) Double.MaxValue - else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections + else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timestampDiff = timeMachine() - oldTimestamp.get val mean = oldState.failureStats.get(connection) match { case Some(FailureStats(mean, _, _)) ⇒ mean - case _ ⇒ throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history") + case _ ⇒ + if (!oldState.intervalHistory.contains(connection)) 1000.0 + else throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history") } - if (mean == 0.0D) 0.0D + if (mean == 0.0) 0.0 else PhiFactor * timestampDiff / mean } - // only log if PHI value is starting to get interesting - if (phi > 0.0D) log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) + log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) phi } @@ -168,6 +167,7 @@ class AccrualFailureDetector( */ @tailrec final def remove(connection: Address) { + log.debug("Node [{}] - Remove connection [{}] ", address, connection) val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index d85016c714..8e274be311 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -36,8 +36,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec "A node that is LEAVING a non-singleton cluster" must { - // FIXME make it work and remove ignore - "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest ignore { + "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 2909362fa7..79fff4770f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -42,8 +42,7 @@ abstract class NodeLeavingAndExitingSpec "A node that is LEAVING a non-singleton cluster" must { - // FIXME make it work and remove ignore - "be moved to EXITING by the leader" taggedAs LongRunningTest ignore { + "be moved to EXITING by the leader" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala index 37d4b4571e..4dc90a5b89 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -13,7 +13,7 @@ object NodeShutdownMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = true). + commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { auto-down = on diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 173ce799f8..1cf62daf1c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -17,7 +17,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" val conn = Address("akka", "", "localhost", 2552) val conn2 = Address("akka", "", "localhost", 2553) - def fakeTimeGenerator(timeIntervals: List[Long]): () ⇒ Long = { + def fakeTimeGenerator(timeIntervals: Seq[Long]): () ⇒ Long = { var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c)) def timeGenerator(): Long = { val currentTime = times.head @@ -27,22 +27,47 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" timeGenerator } - "return phi value of 0.0D on startup for each address" in { + "return phi value of 0.0 on startup for each address, when no heartbeats" in { val fd = new AccrualFailureDetector(system, conn) - fd.phi(conn) must be(0.0D) - fd.phi(conn2) must be(0.0D) + fd.phi(conn) must be(0.0) + fd.phi(conn2) must be(0.0) + } + + "return phi based on guess when only one heartbeat" in { + // 1 second ticks + val timeInterval = Vector.fill(30)(1000L) + val fd = new AccrualFailureDetector(system, conn, + timeMachine = fakeTimeGenerator(timeInterval)) + + fd.heartbeat(conn) + fd.phi(conn) must be > (0.0) + // let time go + for (n ← 2 to 8) + fd.phi(conn) must be < (4.0) + for (n ← 9 to 18) + fd.phi(conn) must be < (8.0) + + fd.phi(conn) must be > (8.0) + } + + "return phi value using first interval after second heartbeat" in { + val timeInterval = List[Long](0, 100, 100, 100) + val fd = new AccrualFailureDetector(system, conn, + timeMachine = fakeTimeGenerator(timeInterval)) + + fd.heartbeat(conn) + fd.phi(conn) must be > (0.0) + fd.heartbeat(conn) + fd.phi(conn) must be > (0.0) } "mark node as available after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val ft = fakeTimeGenerator(timeInterval) - - val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) + val fd = new AccrualFailureDetector(system, conn, + timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) fd.isAvailable(conn) must be(true) @@ -50,18 +75,13 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) - val ft = fakeTimeGenerator(timeInterval) - - val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) + val fd = new AccrualFailureDetector(system, conn, + timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) - fd.heartbeat(conn) - fd.heartbeat(conn) - fd.isAvailable(conn) must be(true) - fd.remove(conn) fd.isAvailable(conn) must be(false) @@ -69,14 +89,12 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val ft = fakeTimeGenerator(timeInterval) - - val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) + val fd = new AccrualFailureDetector(system, conn, + timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //1100 fd.isAvailable(conn) must be(true) //2200 @@ -87,9 +105,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" // it receives heartbeat from an explicitly removed node fd.heartbeat(conn) //4400 - fd.heartbeat(conn) //5500 - fd.heartbeat(conn) //6600 fd.isAvailable(conn) must be(true) //6700 @@ -98,40 +114,29 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 5000) val ft = fakeTimeGenerator(timeInterval) - - val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft) + val fd = new AccrualFailureDetector(system, conn, threshold = 3, + timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 - fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //1100 fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) //6200 } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) - val ft = fakeTimeGenerator(timeInterval) - - val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft) + val fd = new AccrualFailureDetector(system, conn, threshold = 3, + timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 - fd.heartbeat(conn) //1000 - fd.heartbeat(conn) //1100 - fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) //6200 - fd.heartbeat(conn) //6300 - fd.heartbeat(conn) //7300 - fd.heartbeat(conn) //7400 fd.isAvailable(conn) must be(true) //7500 @@ -139,8 +144,8 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val ft = fakeTimeGenerator(timeInterval) - val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, timeMachine = ft) + val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, + timeMachine = fakeTimeGenerator(timeInterval)) // 100 ms interval fd.heartbeat(conn) //0 @@ -156,5 +161,6 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" val phi2 = fd.phi(conn) //5000 phi2 must be(phi1.plusOrMinus(0.001)) } + } }