diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c7aaf12fcf..cdca8c9503 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -4,7 +4,8 @@ package akka.cluster -import akka.actor.{ ActorSystem, Address } +import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } +import akka.remote.RemoteActorRefProvider import akka.event.Logging import scala.collection.immutable.Map @@ -23,11 +24,20 @@ import java.util.concurrent.atomic.AtomicReference * Default threshold is 8, but can be configured in the Akka config. */ class AccrualFailureDetector( - system: ActorSystem, - address: Address, + val system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000, - val timeMachine: () ⇒ Long = System.currentTimeMillis) { + val timeMachine: () ⇒ Long = System.currentTimeMillis) extends FailureDetector { + + def this( + system: ActorSystem, + settings: ClusterSettings, + timeMachine: () ⇒ Long = System.currentTimeMillis) = + this( + system, + settings.FailureDetectorThreshold, + settings.FailureDetectorMaxSampleSize, + timeMachine) private final val PhiFactor = 1.0 / math.log(10.0) @@ -65,8 +75,8 @@ class AccrualFailureDetector( * Records a heartbeat for a connection. */ @tailrec - final def heartbeat(connection: Address) { - log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) + final def heartbeat(connection: Address): Unit = { + log.debug("Heartbeat from connection [{}] ", connection) val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -155,7 +165,7 @@ class AccrualFailureDetector( else PhiFactor * timestampDiff / mean } - log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) + log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) phi } @@ -163,8 +173,8 @@ class AccrualFailureDetector( * Removes the heartbeat management for a connection. */ @tailrec - final def remove(connection: Address) { - log.debug("Node [{}] - Remove connection [{}] ", address, connection) + final def remove(connection: Address): Unit = { + log.debug("Remove connection [{}] ", connection) val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 4ea43d50e4..e788450148 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -306,7 +306,22 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def lookup = Cluster - override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) + override def createExtension(system: ExtendedActorSystem): Cluster = { + val clusterSettings = new ClusterSettings(system.settings.config, system.name) + + def createDefaultFD() = new AccrualFailureDetector(system, clusterSettings) + val failureDetector = clusterSettings.FailureDetectorImplementationClass match { + case None ⇒ createDefaultFD() + case Some(fqcn) ⇒ system.dynamicAccess.createInstanceFor[FailureDetector](fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { + case Right(fd) ⇒ fd + case Left(e) ⇒ + system.log.error(e, "Could not create custom failure detector - falling back to default") + createDefaultFD() + } + } + + new Cluster(system, failureDetector) + } } /** @@ -349,7 +364,7 @@ trait ClusterNodeMBean { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ +class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode ⇒ /** * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. @@ -369,8 +384,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ import clusterSettings._ val selfAddress = remote.transport.address - val failureDetector = new AccrualFailureDetector( - system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize) private val vclockNode = VectorClock.Node(selfAddress.toString) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 0e7dac06ab..b58775e222 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -15,6 +15,10 @@ class ClusterSettings(val config: Config, val systemName: String) { import config._ val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") + val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { + case "" ⇒ None + case fqcn ⇒ Some(fqcn) + } val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala new file mode 100644 index 0000000000..897d0413b5 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.{ Address, ActorSystem } +import akka.event.{ Logging, LogSource } + +/** + * Interface for Akka failure detectors. + */ +trait FailureDetector { + + /** + * Returns true if the connection is considered to be up and healthy + * and returns false otherwise. + */ + def isAvailable(connection: Address): Boolean + + /** + * Records a heartbeat for a connection. + */ + def heartbeat(connection: Address): Unit + + /** + * Calculates how likely it is that the connection has failed. + *

+ * If a connection does not have any records in failure detector then it is + * considered healthy. + */ + def phi(connection: Address): Double + + /** + * Removes the heartbeat management for a connection. + */ + def remove(connection: Address): Unit +} + +/** + * User controllable "puppet" failure detector. + */ +class FailureDetectorPuppet(system: ActorSystem, connectionsToStartWith: Address*) extends FailureDetector { + import java.util.concurrent.ConcurrentHashMap + + trait Status + object Up extends Status + object Down extends Status + + implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + + val log = Logging(system, this) + + private val connections = { + val cs = new ConcurrentHashMap[Address, Status] + connectionsToStartWith foreach { cs put (_, Up) } + cs + } + + def +(connection: Address): this.type = { + log.debug("Adding cluster node [{}]", connection) + connections.put(connection, Up) + this + } + + def markAsDown(connection: Address): this.type = { + connections.put(connection, Down) + this + } + + def markAsUp(connection: Address): this.type = { + connections.put(connection, Up) + this + } + + def isAvailable(connection: Address): Boolean = connections.get(connection) match { + case null ⇒ + this + connection + true + case Up ⇒ + log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) + true + case Down ⇒ + log.debug("isAvailable: Cluster node IS available [{}]", connection) + false + } + + def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) + + def phi(connection: Address): Double = 0.1D + + def remove(connection: Address): Unit = { + log.debug("Removing cluster node [{}]", connection) + connections.remove(connection) + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 1cf62daf1c..bd4d5d2c52 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -28,7 +28,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "return phi value of 0.0 on startup for each address, when no heartbeats" in { - val fd = new AccrualFailureDetector(system, conn) + val fd = new AccrualFailureDetector(system) fd.phi(conn) must be(0.0) fd.phi(conn2) must be(0.0) } @@ -36,7 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi based on guess when only one heartbeat" in { // 1 second ticks val timeInterval = Vector.fill(30)(1000L) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi value using first interval after second heartbeat" in { val timeInterval = List[Long](0, 100, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -63,7 +63,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -75,7 +75,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -89,7 +89,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 5000) val ft = fakeTimeGenerator(timeInterval) - val fd = new AccrualFailureDetector(system, conn, threshold = 3, + val fd = new AccrualFailureDetector(system, threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -127,7 +127,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, conn, threshold = 3, + val fd = new AccrualFailureDetector(system, threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -144,7 +144,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, + val fd = new AccrualFailureDetector(system, maxSampleSize = 3, timeMachine = fakeTimeGenerator(timeInterval)) // 100 ms interval diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6b2ff1962c..9bce41a831 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -18,6 +18,7 @@ class ClusterConfigSpec extends AkkaSpec { import settings._ FailureDetectorThreshold must be(8) FailureDetectorMaxSampleSize must be(1000) + FailureDetectorImplementationClass must be(None) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index d3d1d6d0a2..5b4bca3379 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -33,7 +33,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { val deterministicRandom = new AtomicInteger - val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem]) { + val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], new FailureDetectorPuppet(system)) { override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None @@ -67,9 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { @volatile var _unavailable: Set[Address] = Set.empty - override val failureDetector = new AccrualFailureDetector( - system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) { - + override val failureDetector = new AccrualFailureDetector(system, clusterSettings) { override def isAvailable(connection: Address): Boolean = { if (_unavailable.contains(connection)) false else super.isAvailable(connection)