diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index e7b5844c73..3caece392c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -22,7 +22,12 @@ import java.util.concurrent.atomic.AtomicReference *

* Default threshold is 8, but can be configured in the Akka config. */ -class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000, val timeMachine: () ⇒ Long = System.currentTimeMillis) { +class AccrualFailureDetector( + system: ActorSystem, + address: Address, + val threshold: Int = 8, + val maxSampleSize: Int = 1000, + val timeMachine: () ⇒ Long = System.currentTimeMillis) { private final val PhiFactor = 1.0 / math.log(10.0) @@ -40,7 +45,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol private case class State( version: Long = 0L, failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats], - intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]], + intervalHistory: Map[Address, IndexedSeq[Long]] = Map.empty[Address, IndexedSeq[Long]], timestamps: Map[Address, Long] = Map.empty[Address, Long], explicitRemovals: Set[Address] = Set.empty[Address]) @@ -60,26 +65,17 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) val oldState = state.get - val oldFailureStats = oldState.failureStats - val oldTimestamps = oldState.timestamps val latestTimestamp = oldState.timestamps.get(connection) - val explicitRemovals = oldState.explicitRemovals if (latestTimestamp.isEmpty) { - // this is heartbeat from a new connection // add starter records for this new connection - val newFailureStats = oldFailureStats + (connection -> FailureStats()) - val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) - val newTimestamps = oldTimestamps + (connection -> timeMachine()) - val newExplicitRemovals = explicitRemovals - connection - val newState = oldState copy ( version = oldState.version + 1, - failureStats = newFailureStats, - intervalHistory = newIntervalHistory, - timestamps = newTimestamps, - explicitRemovals = newExplicitRemovals) + failureStats = oldState.failureStats + (connection -> FailureStats()), + intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]), + timestamps = oldState.timestamps + (connection -> timeMachine()), + explicitRemovals = oldState.explicitRemovals - connection) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -89,26 +85,21 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val timestamp = timeMachine() val interval = timestamp - latestTimestamp.get - val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp - - var newIntervalsForConnection = (oldState.intervalHistory.get(connection) match { + val newIntervalsForConnection = (oldState.intervalHistory.get(connection) match { + case Some(history) if history.size >= maxSampleSize ⇒ + // reached max history, drop first interval + history drop 1 case Some(history) ⇒ history - case _ ⇒ Vector.empty[Long] + case _ ⇒ IndexedSeq.empty[Long] }) :+ interval - if (newIntervalsForConnection.size > maxSampleSize) { - // reached max history, drop first interval - newIntervalsForConnection = newIntervalsForConnection drop 0 - } - val newFailureStats = if (newIntervalsForConnection.size > 1) { val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble - val oldConnectionFailureStats = oldState.failureStats.get(connection) match { - case Some(stats) ⇒ stats - case _ ⇒ throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history") + val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse { + throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history") } val deviationSum = @@ -120,21 +111,17 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val newDeviation: Double = math.sqrt(newVariance) val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance) - oldFailureStats + (connection -> newFailureStats) + oldState.failureStats + (connection -> newFailureStats) } else { - oldFailureStats + oldState.failureStats } - val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) - - val newExplicitRemovals = explicitRemovals - connection - val newState = oldState copy (version = oldState.version + 1, failureStats = newFailureStats, - intervalHistory = newIntervalHistory, - timestamps = newTimestamps, - explicitRemovals = newExplicitRemovals) + intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection), + timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp, + explicitRemovals = oldState.explicitRemovals - connection) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -184,16 +171,11 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val oldState = state.get if (oldState.failureStats.contains(connection)) { - val failureStats = oldState.failureStats - connection - val intervalHistory = oldState.intervalHistory - connection - val timestamps = oldState.timestamps - connection - val explicitRemovals = oldState.explicitRemovals + connection - val newState = oldState copy (version = oldState.version + 1, - failureStats = failureStats, - intervalHistory = intervalHistory, - timestamps = timestamps, - explicitRemovals = explicitRemovals) + failureStats = oldState.failureStats - connection, + intervalHistory = oldState.intervalHistory - connection, + timestamps = oldState.timestamps - connection, + explicitRemovals = oldState.explicitRemovals + connection) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) remove(connection) // recur diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index 512d29caad..82c1b9881d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -105,7 +105,7 @@ object VectorClock { object Timestamp { private val counter = new AtomicLong(newTimestamp) - def zero(): Timestamp = Timestamp(0L) + val zero: Timestamp = Timestamp(0L) def apply(): Timestamp = { var newTime: Long = 0L diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 85abfcf1d7..173ce799f8 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -7,6 +7,7 @@ package akka.cluster import akka.actor.Address import akka.testkit.{ LongRunningTest, AkkaSpec } +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class AccrualFailureDetectorSpec extends AkkaSpec(""" actor.provider = "akka.remote.RemoteActorRefProvider" akka.loglevel = "INFO" @@ -33,7 +34,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as available after a series of successful heartbeats" in { - var timeInterval = List[Long](0, 1000, 100, 100) + val timeInterval = List[Long](0, 1000, 100, 100) val ft = fakeTimeGenerator(timeInterval) val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) @@ -48,7 +49,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as dead after explicit removal of connection" in { - var timeInterval = List[Long](0, 1000, 100, 100, 100) + val timeInterval = List[Long](0, 1000, 100, 100, 100) val ft = fakeTimeGenerator(timeInterval) val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) @@ -67,7 +68,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as available after explicit removal of connection and receiving heartbeat again" in { - var timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) + val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) val ft = fakeTimeGenerator(timeInterval) val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) @@ -95,7 +96,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as dead if heartbeat are missed" in { - var timeInterval = List[Long](0, 1000, 100, 100, 5000) + val timeInterval = List[Long](0, 1000, 100, 100, 5000) val ft = fakeTimeGenerator(timeInterval) val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft) @@ -112,7 +113,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { - var timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) + val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) val ft = fakeTimeGenerator(timeInterval) val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft) @@ -135,5 +136,25 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) //7500 } + + "use maxSampleSize heartbeats" in { + val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) + val ft = fakeTimeGenerator(timeInterval) + val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, timeMachine = ft) + + // 100 ms interval + fd.heartbeat(conn) //0 + fd.heartbeat(conn) //100 + fd.heartbeat(conn) //200 + fd.heartbeat(conn) //300 + val phi1 = fd.phi(conn) //400 + // 1000 ms interval, should become same phi when 100 ms intervals have been dropped + fd.heartbeat(conn) //1000 + fd.heartbeat(conn) //2000 + fd.heartbeat(conn) //3000 + fd.heartbeat(conn) //4000 + val phi2 = fd.phi(conn) //5000 + phi2 must be(phi1.plusOrMinus(0.001)) + } } }