diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index ab83ab803b..e7b5844c73 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -11,7 +11,6 @@ import scala.collection.immutable.Map import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicReference -import System.{ currentTimeMillis ⇒ newTimestamp } /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: @@ -23,7 +22,7 @@ import System.{ currentTimeMillis ⇒ newTimestamp } *
* Default threshold is 8, but can be configured in the Akka config. */ -class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000) { +class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000, val timeMachine: () ⇒ Long = System.currentTimeMillis) { private final val PhiFactor = 1.0 / math.log(10.0) @@ -72,7 +71,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol // add starter records for this new connection val newFailureStats = oldFailureStats + (connection -> FailureStats()) val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) - val newTimestamps = oldTimestamps + (connection -> newTimestamp) + val newTimestamps = oldTimestamps + (connection -> timeMachine()) val newExplicitRemovals = explicitRemovals - connection val newState = oldState copy ( @@ -87,7 +86,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol } else { // this is a known connection - val timestamp = newTimestamp + val timestamp = timeMachine() val interval = timestamp - latestTimestamp.get val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp @@ -161,7 +160,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol if (oldState.explicitRemovals.contains(connection)) Double.MaxValue else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { - val timestampDiff = newTimestamp - oldTimestamp.get + val timestampDiff = timeMachine() - oldTimestamp.get val mean = oldState.failureStats.get(connection) match { case Some(FailureStats(mean, _, _)) ⇒ mean diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 7945691c59..cd62968e56 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -15,35 +15,47 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" val conn = Address("akka", "", "localhost", 2552) val conn2 = Address("akka", "", "localhost", 2553) + def fakeTimeGenerator(timeIntervals: List[Long]): () ⇒ Long = { + var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c)) + def timeGenerator(): Long = { + val currentTime = times.head + times = times.tail + currentTime + } + timeGenerator + } + "return phi value of 0.0D on startup for each address" in { val fd = new AccrualFailureDetector(system, conn) fd.phi(conn) must be(0.0D) fd.phi(conn2) must be(0.0D) } - "mark node as available after a series of successful heartbeats" taggedAs LongRunningTest in { - val fd = new AccrualFailureDetector(system, conn) + "mark node as available after a series of successful heartbeats" in { + var timeInterval = List[Long](0, 1000, 100, 100) + val ft = fakeTimeGenerator(timeInterval) + + val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) fd.heartbeat(conn) - Thread.sleep(1000) fd.heartbeat(conn) - Thread.sleep(100) fd.heartbeat(conn) fd.isAvailable(conn) must be(true) } - "mark node as dead after explicit removal of connection" taggedAs LongRunningTest in { - val fd = new AccrualFailureDetector(system, conn) + "mark node as dead after explicit removal of connection" in { + var timeInterval = List[Long](0, 1000, 100, 100, 100) + val ft = fakeTimeGenerator(timeInterval) + + val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) fd.heartbeat(conn) - Thread.sleep(1000) fd.heartbeat(conn) - Thread.sleep(100) fd.heartbeat(conn) fd.isAvailable(conn) must be(true) @@ -53,78 +65,74 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(false) } - "mark node as available after explicit removal of connection and receiving heartbeat again" taggedAs LongRunningTest in { - val fd = new AccrualFailureDetector(system, conn) + "mark node as available after explicit removal of connection and receiving heartbeat again" in { + var timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) + val ft = fakeTimeGenerator(timeInterval) - fd.heartbeat(conn) + val fd = new AccrualFailureDetector(system, conn, timeMachine = ft) - Thread.sleep(1000) - fd.heartbeat(conn) + fd.heartbeat(conn) //0 - Thread.sleep(100) - fd.heartbeat(conn) + fd.heartbeat(conn) //1000 - fd.isAvailable(conn) must be(true) + fd.heartbeat(conn) //1100 + + fd.isAvailable(conn) must be(true) //2200 fd.remove(conn) - fd.isAvailable(conn) must be(false) + fd.isAvailable(conn) must be(false) //3300 - // it recieves heartbeat from an explicitly removed node - fd.heartbeat(conn) + // it receives heartbeat from an explicitly removed node + fd.heartbeat(conn) //4400 - Thread.sleep(1000) - fd.heartbeat(conn) + fd.heartbeat(conn) //5500 - Thread.sleep(100) - fd.heartbeat(conn) + fd.heartbeat(conn) //6600 - fd.isAvailable(conn) must be(true) + fd.isAvailable(conn) must be(true) //6700 } - "mark node as dead if heartbeat are missed" taggedAs LongRunningTest in { - val fd = new AccrualFailureDetector(system, conn, threshold = 3) + "mark node as dead if heartbeat are missed" in { + var timeInterval = List[Long](0, 1000, 100, 100, 5000) + val ft = fakeTimeGenerator(timeInterval) - fd.heartbeat(conn) + val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft) - Thread.sleep(1000) - fd.heartbeat(conn) + fd.heartbeat(conn) //0 - Thread.sleep(100) - fd.heartbeat(conn) + fd.heartbeat(conn) //1000 - fd.isAvailable(conn) must be(true) + fd.heartbeat(conn) //1100 - Thread.sleep(5000) + fd.isAvailable(conn) must be(true) //1200 - fd.isAvailable(conn) must be(false) + fd.isAvailable(conn) must be(false) //6200 } - "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" taggedAs LongRunningTest in { - val fd = new AccrualFailureDetector(system, conn, threshold = 3) + "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { + var timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) + val ft = fakeTimeGenerator(timeInterval) - fd.heartbeat(conn) + val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft) - Thread.sleep(1000) - fd.heartbeat(conn) + fd.heartbeat(conn) //0 - Thread.sleep(100) - fd.heartbeat(conn) + fd.heartbeat(conn) //1000 - fd.isAvailable(conn) must be(true) + fd.heartbeat(conn) //1100 - Thread.sleep(5000) - fd.isAvailable(conn) must be(false) + fd.isAvailable(conn) must be(true) //1200 - fd.heartbeat(conn) + fd.isAvailable(conn) must be(false) //6200 - Thread.sleep(1000) - fd.heartbeat(conn) + fd.heartbeat(conn) //6300 - Thread.sleep(100) - fd.heartbeat(conn) + fd.heartbeat(conn) //7300 - fd.isAvailable(conn) must be(true) + fd.heartbeat(conn) //7400 + + fd.isAvailable(conn) must be(true) //7500 } } }