Merge pull request #415 from amir343/master

AccrualFD: explicit removal of connections functionality + corresponding tests
This commit is contained in:
viktorklang 2012-04-24 06:04:37 -07:00
commit 1f30be1f87
2 changed files with 46 additions and 7 deletions

View file

@ -42,7 +42,8 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
version: Long = 0L,
failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats],
intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]],
timestamps: Map[Address, Long] = Map.empty[Address, Long])
timestamps: Map[Address, Long] = Map.empty[Address, Long],
explicitRemovals: Set[Address] = Set.empty[Address])
private val state = new AtomicReference[State](State())
@ -63,6 +64,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldFailureStats = oldState.failureStats
val oldTimestamps = oldState.timestamps
val latestTimestamp = oldState.timestamps.get(connection)
val explicitRemovals = oldState.explicitRemovals
if (latestTimestamp.isEmpty) {
@ -71,12 +73,14 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val newFailureStats = oldFailureStats + (connection -> FailureStats())
val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
val newTimestamps = oldTimestamps + (connection -> newTimestamp)
val newExplicitRemovals = explicitRemovals - connection
val newState = oldState copy (
version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
timestamps = newTimestamps)
timestamps = newTimestamps,
explicitRemovals = newExplicitRemovals)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -125,10 +129,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)
val newExplicitRemovals = explicitRemovals - connection
val newState = oldState copy (version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
timestamps = newTimestamps)
timestamps = newTimestamps,
explicitRemovals = newExplicitRemovals)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -150,7 +157,9 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldTimestamp = oldState.timestamps.get(connection)
val phi =
if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
// if connection has been removed explicitly
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = newTimestamp - oldTimestamp.get
@ -179,11 +188,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val failureStats = oldState.failureStats - connection
val intervalHistory = oldState.intervalHistory - connection
val timestamps = oldState.timestamps - connection
val explicitRemovals = oldState.explicitRemovals + connection
val newState = oldState copy (version = oldState.version + 1,
failureStats = failureStats,
intervalHistory = intervalHistory,
timestamps = timestamps)
timestamps = timestamps,
explicitRemovals = explicitRemovals)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur

View file

@ -36,8 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
}
// FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector
"mark node as dead after explicit removal of connection" ignore {
"mark node as dead after explicit removal of connection" in {
val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn)
@ -55,6 +54,35 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(false)
}
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn)
Thread.sleep(1000)
fd.heartbeat(conn)
Thread.sleep(100)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.remove(conn)
fd.isAvailable(conn) must be(false)
// it recieves heartbeat from an explicitly removed node
fd.heartbeat(conn)
Thread.sleep(1000)
fd.heartbeat(conn)
Thread.sleep(100)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
}
"mark node as dead if heartbeat are missed" in {
val fd = new AccrualFailureDetector(system, conn, threshold = 3)