#2021: Abstract over time for AccrualFailureDetector

This commit is contained in:
Amir Moulavi 2012-04-25 14:46:27 +02:00
parent 202efb1c6a
commit dcb0a177f1
2 changed files with 62 additions and 55 deletions

View file

@ -11,7 +11,6 @@ import scala.collection.immutable.Map
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
import System.{ currentTimeMillis newTimestamp }
/**
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
@ -23,7 +22,7 @@ import System.{ currentTimeMillis ⇒ newTimestamp }
* <p/>
* Default threshold is 8, but can be configured in the Akka config.
*/
class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000) {
class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000, val timeMachine: () Long = System.currentTimeMillis) {
private final val PhiFactor = 1.0 / math.log(10.0)
@ -72,7 +71,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
// add starter records for this new connection
val newFailureStats = oldFailureStats + (connection -> FailureStats())
val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
val newTimestamps = oldTimestamps + (connection -> newTimestamp)
val newTimestamps = oldTimestamps + (connection -> timeMachine())
val newExplicitRemovals = explicitRemovals - connection
val newState = oldState copy (
@ -87,7 +86,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
} else {
// this is a known connection
val timestamp = newTimestamp
val timestamp = timeMachine()
val interval = timestamp - latestTimestamp.get
val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp
@ -161,7 +160,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = newTimestamp - oldTimestamp.get
val timestampDiff = timeMachine() - oldTimestamp.get
val mean = oldState.failureStats.get(connection) match {
case Some(FailureStats(mean, _, _)) mean

View file

@ -15,35 +15,47 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
val conn = Address("akka", "", "localhost", 2552)
val conn2 = Address("akka", "", "localhost", 2553)
def fakeTimeGenerator(timeIntervals: List[Long]): () Long = {
var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) acc ::: List[Long](acc.last + c))
def timeGenerator(): Long = {
val currentTime = times.head
times = times.tail
currentTime
}
timeGenerator
}
"return phi value of 0.0D on startup for each address" in {
val fd = new AccrualFailureDetector(system, conn)
fd.phi(conn) must be(0.0D)
fd.phi(conn2) must be(0.0D)
}
"mark node as available after a series of successful heartbeats" taggedAs LongRunningTest in {
val fd = new AccrualFailureDetector(system, conn)
"mark node as available after a series of successful heartbeats" in {
var timeInterval = List[Long](0, 1000, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
fd.heartbeat(conn)
Thread.sleep(1000)
fd.heartbeat(conn)
Thread.sleep(100)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
}
"mark node as dead after explicit removal of connection" taggedAs LongRunningTest in {
val fd = new AccrualFailureDetector(system, conn)
"mark node as dead after explicit removal of connection" in {
var timeInterval = List[Long](0, 1000, 100, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
fd.heartbeat(conn)
Thread.sleep(1000)
fd.heartbeat(conn)
Thread.sleep(100)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
@ -53,78 +65,74 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(false)
}
"mark node as available after explicit removal of connection and receiving heartbeat again" taggedAs LongRunningTest in {
val fd = new AccrualFailureDetector(system, conn)
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
var timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val ft = fakeTimeGenerator(timeInterval)
fd.heartbeat(conn)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
Thread.sleep(1000)
fd.heartbeat(conn)
fd.heartbeat(conn) //0
Thread.sleep(100)
fd.heartbeat(conn)
fd.heartbeat(conn) //1000
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //2200
fd.remove(conn)
fd.isAvailable(conn) must be(false)
fd.isAvailable(conn) must be(false) //3300
// it recieves heartbeat from an explicitly removed node
fd.heartbeat(conn)
// it receives heartbeat from an explicitly removed node
fd.heartbeat(conn) //4400
Thread.sleep(1000)
fd.heartbeat(conn)
fd.heartbeat(conn) //5500
Thread.sleep(100)
fd.heartbeat(conn)
fd.heartbeat(conn) //6600
fd.isAvailable(conn) must be(true)
fd.isAvailable(conn) must be(true) //6700
}
"mark node as dead if heartbeat are missed" taggedAs LongRunningTest in {
val fd = new AccrualFailureDetector(system, conn, threshold = 3)
"mark node as dead if heartbeat are missed" in {
var timeInterval = List[Long](0, 1000, 100, 100, 5000)
val ft = fakeTimeGenerator(timeInterval)
fd.heartbeat(conn)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft)
Thread.sleep(1000)
fd.heartbeat(conn)
fd.heartbeat(conn) //0
Thread.sleep(100)
fd.heartbeat(conn)
fd.heartbeat(conn) //1000
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn) //1100
Thread.sleep(5000)
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false)
fd.isAvailable(conn) must be(false) //6200
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" taggedAs LongRunningTest in {
val fd = new AccrualFailureDetector(system, conn, threshold = 3)
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
var timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
fd.heartbeat(conn)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft)
Thread.sleep(1000)
fd.heartbeat(conn)
fd.heartbeat(conn) //0
Thread.sleep(100)
fd.heartbeat(conn)
fd.heartbeat(conn) //1000
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn) //1100
Thread.sleep(5000)
fd.isAvailable(conn) must be(false)
fd.isAvailable(conn) must be(true) //1200
fd.heartbeat(conn)
fd.isAvailable(conn) must be(false) //6200
Thread.sleep(1000)
fd.heartbeat(conn)
fd.heartbeat(conn) //6300
Thread.sleep(100)
fd.heartbeat(conn)
fd.heartbeat(conn) //7300
fd.isAvailable(conn) must be(true)
fd.heartbeat(conn) //7400
fd.isAvailable(conn) must be(true) //7500
}
}
}