Merge branch 'master' of github.com:akka/akka

This commit is contained in:
Viktor Klang 2012-06-08 14:26:26 +02:00
commit b464b1a02d
5 changed files with 74 additions and 73 deletions

View file

@ -36,7 +36,11 @@ class AccrualFailureDetector(
/**
* Holds the failure statistics for a specific node Address.
*/
private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D)
private case class FailureStats(mean: Double = 0.0, variance: Double = 0.0, deviation: Double = 0.0)
// guess statistics for first heartbeat,
// important so that connections with only one heartbeat becomes unavailble
private val failureStatsFirstHeartbeat = FailureStats(mean = 1000.0)
/**
* Implement using optimistic lockless concurrency, all state is represented
@ -72,7 +76,7 @@ class AccrualFailureDetector(
// add starter records for this new connection
val newState = oldState copy (
version = oldState.version + 1,
failureStats = oldState.failureStats + (connection -> FailureStats()),
failureStats = oldState.failureStats + (connection -> failureStatsFirstHeartbeat),
intervalHistory = oldState.intervalHistory + (connection -> IndexedSeq.empty[Long]),
timestamps = oldState.timestamps + (connection -> timeMachine()),
explicitRemovals = oldState.explicitRemovals - connection)
@ -93,30 +97,24 @@ class AccrualFailureDetector(
case _ IndexedSeq.empty[Long]
}) :+ interval
val newFailureStats =
if (newIntervalsForConnection.size > 1) {
val newFailureStats = {
val newMean: Double = newIntervalsForConnection.sum.toDouble / newIntervalsForConnection.size
val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble
val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse {
throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
}
val deviationSum =
newIntervalsForConnection
.map(_.toDouble)
.foldLeft(0.0D)((x, y) x + (y - newMean))
val newVariance: Double = deviationSum / newIntervalsForConnection.size.toDouble
val newDeviation: Double = math.sqrt(newVariance)
val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
oldState.failureStats + (connection -> newFailureStats)
} else {
oldState.failureStats
val oldConnectionFailureStats = oldState.failureStats.get(connection).getOrElse {
throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
}
val deviationSum = (0.0d /: newIntervalsForConnection) { (mean, interval)
mean + interval.toDouble - newMean
}
val newVariance: Double = deviationSum / newIntervalsForConnection.size
val newDeviation: Double = math.sqrt(newVariance)
val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
oldState.failureStats + (connection -> newFailureStats)
}
val newState = oldState copy (version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection),
@ -132,8 +130,7 @@ class AccrualFailureDetector(
* Calculates how likely it is that the connection has failed.
* <p/>
* If a connection does not have any records in failure detector then it is
* considered dead. This is true either if the heartbeat have not started
* yet or the connection have been explicitly removed.
* considered healthy.
* <p/>
* Implementations of 'Cumulative Distribution Function' for Exponential Distribution.
* For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597].
@ -145,7 +142,7 @@ class AccrualFailureDetector(
val phi =
// if connection has been removed explicitly
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = timeMachine() - oldTimestamp.get
@ -154,12 +151,11 @@ class AccrualFailureDetector(
case _ throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")
}
if (mean == 0.0D) 0.0D
if (mean == 0.0) 0.0
else PhiFactor * timestampDiff / mean
}
// only log if PHI value is starting to get interesting
if (phi > 0.0D) log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection)
log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection)
phi
}
@ -168,6 +164,7 @@ class AccrualFailureDetector(
*/
@tailrec
final def remove(connection: Address) {
log.debug("Node [{}] - Remove connection [{}] ", address, connection)
val oldState = state.get
if (oldState.failureStats.contains(connection)) {

View file

@ -36,8 +36,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
"A node that is LEAVING a non-singleton cluster" must {
// FIXME make it work and remove ignore
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest ignore {
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)

View file

@ -42,8 +42,7 @@ abstract class NodeLeavingAndExitingSpec
"A node that is LEAVING a non-singleton cluster" must {
// FIXME make it work and remove ignore
"be moved to EXITING by the leader" taggedAs LongRunningTest ignore {
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)

View file

@ -13,7 +13,7 @@ object NodeShutdownMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = true).
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-down = on

View file

@ -17,7 +17,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
val conn = Address("akka", "", "localhost", 2552)
val conn2 = Address("akka", "", "localhost", 2553)
def fakeTimeGenerator(timeIntervals: List[Long]): () Long = {
def fakeTimeGenerator(timeIntervals: Seq[Long]): () Long = {
var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) acc ::: List[Long](acc.last + c))
def timeGenerator(): Long = {
val currentTime = times.head
@ -27,22 +27,47 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
timeGenerator
}
"return phi value of 0.0D on startup for each address" in {
"return phi value of 0.0 on startup for each address, when no heartbeats" in {
val fd = new AccrualFailureDetector(system, conn)
fd.phi(conn) must be(0.0D)
fd.phi(conn2) must be(0.0D)
fd.phi(conn) must be(0.0)
fd.phi(conn2) must be(0.0)
}
"return phi based on guess when only one heartbeat" in {
// 1 second ticks
val timeInterval = Vector.fill(30)(1000L)
val fd = new AccrualFailureDetector(system, conn,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
// let time go
for (n 2 to 8)
fd.phi(conn) must be < (4.0)
for (n 9 to 18)
fd.phi(conn) must be < (8.0)
fd.phi(conn) must be > (8.0)
}
"return phi value using first interval after second heartbeat" in {
val timeInterval = List[Long](0, 100, 100, 100)
val fd = new AccrualFailureDetector(system, conn,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
fd.heartbeat(conn)
fd.phi(conn) must be > (0.0)
}
"mark node as available after a series of successful heartbeats" in {
val timeInterval = List[Long](0, 1000, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
val fd = new AccrualFailureDetector(system, conn,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
@ -50,18 +75,13 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as dead after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
val fd = new AccrualFailureDetector(system, conn,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.heartbeat(conn)
fd.isAvailable(conn) must be(true)
fd.remove(conn)
fd.isAvailable(conn) must be(false)
@ -69,14 +89,12 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, timeMachine = ft)
val fd = new AccrualFailureDetector(system, conn,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //2200
@ -87,9 +105,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
// it receives heartbeat from an explicitly removed node
fd.heartbeat(conn) //4400
fd.heartbeat(conn) //5500
fd.heartbeat(conn) //6600
fd.isAvailable(conn) must be(true) //6700
@ -98,40 +114,29 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as dead if heartbeat are missed" in {
val timeInterval = List[Long](0, 1000, 100, 100, 5000)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft)
val fd = new AccrualFailureDetector(system, conn, threshold = 3,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false) //6200
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, timeMachine = ft)
val fd = new AccrualFailureDetector(system, conn, threshold = 3,
timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0
fd.heartbeat(conn) //1000
fd.heartbeat(conn) //1100
fd.isAvailable(conn) must be(true) //1200
fd.isAvailable(conn) must be(false) //6200
fd.heartbeat(conn) //6300
fd.heartbeat(conn) //7300
fd.heartbeat(conn) //7400
fd.isAvailable(conn) must be(true) //7500
@ -139,8 +144,8 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"use maxSampleSize heartbeats" in {
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, timeMachine = ft)
val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3,
timeMachine = fakeTimeGenerator(timeInterval))
// 100 ms interval
fd.heartbeat(conn) //0
@ -156,5 +161,6 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
val phi2 = fd.phi(conn) //5000
phi2 must be(phi1.plusOrMinus(0.001))
}
}
}