Use maxSampleSize correctly. See #2065

* Added failing test
* Fixed wrong drop
* Boy scout cleanup
This commit is contained in:
Patrik Nordwall 2012-05-14 09:35:15 +02:00
parent bcc208819a
commit 99d036d99a
3 changed files with 54 additions and 51 deletions

View file

@ -22,7 +22,12 @@ import java.util.concurrent.atomic.AtomicReference
* <p/>
* Default threshold is 8, but can be configured in the Akka config.
*/
class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000, val timeMachine: () Long = System.currentTimeMillis) {
class AccrualFailureDetector(
system: ActorSystem,
address: Address,
val threshold: Int = 8,
val maxSampleSize: Int = 1000,
val timeMachine: () Long = System.currentTimeMillis) {
private final val PhiFactor = 1.0 / math.log(10.0)
@ -40,7 +45,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
private case class State(
version: Long = 0L,
failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats],
intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]],
intervalHistory: Map[Address, IndexedSeq[Long]] = Map.empty[Address, IndexedSeq[Long]],
timestamps: Map[Address, Long] = Map.empty[Address, Long],
explicitRemovals: Set[Address] = Set.empty[Address])
@ -60,26 +65,17 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection)
val oldState = state.get
val oldFailureStats = oldState.failureStats
val oldTimestamps = oldState.timestamps
val latestTimestamp = oldState.timestamps.get(connection)
val explicitRemovals = oldState.explicitRemovals
if (latestTimestamp.isEmpty) {
// this is heartbeat from a new connection
// add starter records for this new connection
val newFailureStats = oldFailureStats + (connection -> FailureStats())
val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
val newTimestamps = oldTimestamps + (connection -> timeMachine())
val newExplicitRemovals = explicitRemovals - connection
val newState = oldState copy (
version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
timestamps = newTimestamps,
explicitRemovals = newExplicitRemovals)
failureStats = oldState.failureStats + (connection -> FailureStats()),
intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]),
timestamps = oldState.timestamps + (connection -> timeMachine()),
explicitRemovals = oldState.explicitRemovals - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -89,26 +85,21 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val timestamp = timeMachine()
val interval = timestamp - latestTimestamp.get
val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp
var newIntervalsForConnection = (oldState.intervalHistory.get(connection) match {
case Some(history) history
case _ Vector.empty[Long]
}) :+ interval
if (newIntervalsForConnection.size > maxSampleSize) {
val newIntervalsForConnection = (oldState.intervalHistory.get(connection) match {
case Some(history) if history.size >= maxSampleSize
// reached max history, drop first interval
newIntervalsForConnection = newIntervalsForConnection drop 0
}
history drop 1
case Some(history) history
case _ IndexedSeq.empty[Long]
}) :+ interval
val newFailureStats =
if (newIntervalsForConnection.size > 1) {
val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble
val oldConnectionFailureStats = oldState.failureStats.get(connection) match {
case Some(stats) stats
case _ throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse {
throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
}
val deviationSum =
@ -120,21 +111,17 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val newDeviation: Double = math.sqrt(newVariance)
val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
oldFailureStats + (connection -> newFailureStats)
oldState.failureStats + (connection -> newFailureStats)
} else {
oldFailureStats
oldState.failureStats
}
val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)
val newExplicitRemovals = explicitRemovals - connection
val newState = oldState copy (version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
timestamps = newTimestamps,
explicitRemovals = newExplicitRemovals)
intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection),
timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp,
explicitRemovals = oldState.explicitRemovals - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -184,16 +171,11 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldState = state.get
if (oldState.failureStats.contains(connection)) {
val failureStats = oldState.failureStats - connection
val intervalHistory = oldState.intervalHistory - connection
val timestamps = oldState.timestamps - connection
val explicitRemovals = oldState.explicitRemovals + connection
val newState = oldState copy (version = oldState.version + 1,
failureStats = failureStats,
intervalHistory = intervalHistory,
timestamps = timestamps,
explicitRemovals = explicitRemovals)
failureStats = oldState.failureStats - connection,
intervalHistory = oldState.intervalHistory - connection,
timestamps = oldState.timestamps - connection,
explicitRemovals = oldState.explicitRemovals + connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur

View file

@ -105,7 +105,7 @@ object VectorClock {
object Timestamp {
private val counter = new AtomicLong(newTimestamp)
def zero(): Timestamp = Timestamp(0L)
val zero: Timestamp = Timestamp(0L)
def apply(): Timestamp = {
var newTime: Long = 0L

View file

@ -7,6 +7,7 @@ package akka.cluster
import akka.actor.Address
import akka.testkit.{ LongRunningTest, AkkaSpec }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class AccrualFailureDetectorSpec extends AkkaSpec("""
actor.provider = "akka.remote.RemoteActorRefProvider"
akka.loglevel = "INFO"
@ -33,7 +34,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as available after a series of successful heartbeats" in {
var timeInterval = List[Long](0, 1000, 100, 100)
val timeInterval = List[Long](0, 1000, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
@ -48,7 +49,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as dead after explicit removal of connection" in {
var timeInterval = List[Long](0, 1000, 100, 100, 100)
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
@ -67,7 +68,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
var timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
@ -95,7 +96,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as dead if heartbeat are missed" in {
var timeInterval = List[Long](0, 1000, 100, 100, 5000)
val timeInterval = List[Long](0, 1000, 100, 100, 5000)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft)
@ -112,7 +113,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
var timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft)
@ -135,5 +136,25 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true) //7500
}
"use maxSampleSize heartbeats" in {
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, timeMachine = ft)
// 100 ms interval
fd.heartbeat(conn) //0
fd.heartbeat(conn) //100
fd.heartbeat(conn) //200
fd.heartbeat(conn) //300
val phi1 = fd.phi(conn) //400
// 1000 ms interval, should become same phi when 100 ms intervals have been dropped
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //2000
fd.heartbeat(conn) //3000
fd.heartbeat(conn) //4000
val phi2 = fd.phi(conn) //5000
phi2 must be(phi1.plusOrMinus(0.001))
}
}
}