From ec7177be740fc070c2a5fe483dbc76a49b35d6fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 11 Jun 2012 10:06:53 +0200 Subject: [PATCH] Misc fixes after FailureDetectorPuppet and abstraction review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Moved FailureDetectorPuppet to its own file in src/test. - Removed 'phi' method from FailureDetector public API. - Throwing exception instead of falling back to default if we can't load the custom FD. - Removed add-connection method in FailureDetectorPuppet. Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 15 ++-- .../scala/akka/cluster/FailureDetector.scala | 75 +------------------ .../test/scala/akka/cluster/ClusterSpec.scala | 2 +- .../akka/cluster/FailureDetectorPuppet.scala | 60 +++++++++++++++ 4 files changed, 70 insertions(+), 82 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index e788450148..891c8972b0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -309,15 +309,14 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): Cluster = { val clusterSettings = new ClusterSettings(system.settings.config, system.name) - def createDefaultFD() = new AccrualFailureDetector(system, clusterSettings) val failureDetector = clusterSettings.FailureDetectorImplementationClass match { - case None ⇒ createDefaultFD() - case Some(fqcn) ⇒ system.dynamicAccess.createInstanceFor[FailureDetector](fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { - case Right(fd) ⇒ fd - case Left(e) ⇒ - system.log.error(e, "Could not create custom failure detector - falling back to default") - createDefaultFD() - } + case None ⇒ new AccrualFailureDetector(system, clusterSettings) + case Some(fqcn) ⇒ + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { + case Right(fd) ⇒ fd + case Left(e) ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) + } } new Cluster(system, failureDetector) diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 897d0413b5..60af0a1c41 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -4,8 +4,7 @@ package akka.cluster -import akka.actor.{ Address, ActorSystem } -import akka.event.{ Logging, LogSource } +import akka.actor.Address /** * Interface for Akka failure detectors. @@ -13,8 +12,7 @@ import akka.event.{ Logging, LogSource } trait FailureDetector { /** - * Returns true if the connection is considered to be up and healthy - * and returns false otherwise. + * Returns true if the connection is considered to be up and healthy and returns false otherwise. */ def isAvailable(connection: Address): Boolean @@ -23,77 +21,8 @@ trait FailureDetector { */ def heartbeat(connection: Address): Unit - /** - * Calculates how likely it is that the connection has failed. - *

- * If a connection does not have any records in failure detector then it is - * considered healthy. - */ - def phi(connection: Address): Double - /** * Removes the heartbeat management for a connection. */ def remove(connection: Address): Unit } - -/** - * User controllable "puppet" failure detector. - */ -class FailureDetectorPuppet(system: ActorSystem, connectionsToStartWith: Address*) extends FailureDetector { - import java.util.concurrent.ConcurrentHashMap - - trait Status - object Up extends Status - object Down extends Status - - implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { - def genString(o: AnyRef): String = o.getClass.getName - override def getClazz(o: AnyRef): Class[_] = o.getClass - } - - val log = Logging(system, this) - - private val connections = { - val cs = new ConcurrentHashMap[Address, Status] - connectionsToStartWith foreach { cs put (_, Up) } - cs - } - - def +(connection: Address): this.type = { - log.debug("Adding cluster node [{}]", connection) - connections.put(connection, Up) - this - } - - def markAsDown(connection: Address): this.type = { - connections.put(connection, Down) - this - } - - def markAsUp(connection: Address): this.type = { - connections.put(connection, Up) - this - } - - def isAvailable(connection: Address): Boolean = connections.get(connection) match { - case null ⇒ - this + connection - true - case Up ⇒ - log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) - true - case Down ⇒ - log.debug("isAvailable: Cluster node IS available [{}]", connection) - false - } - - def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) - - def phi(connection: Address): Double = 0.1D - - def remove(connection: Address): Unit = { - log.debug("Removing cluster node [{}]", connection) - connections.remove(connection) - } -} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 5b4bca3379..f60e6fa7dc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -67,7 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { @volatile var _unavailable: Set[Address] = Set.empty - override val failureDetector = new AccrualFailureDetector(system, clusterSettings) { + override val failureDetector = new FailureDetectorPuppet(system) { override def isAvailable(connection: Address): Boolean = { if (_unavailable.contains(connection)) false else super.isAvailable(connection) diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala new file mode 100644 index 0000000000..3245a15f97 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.{ Address, ActorSystem } +import akka.event.{ Logging, LogSource } + +/** + * User controllable "puppet" failure detector. + */ +class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { + import java.util.concurrent.ConcurrentHashMap + + def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name)) + + trait Status + object Up extends Status + object Down extends Status + + implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + + private val log = Logging(system, this) + + private val connections = new ConcurrentHashMap[Address, Status] + + def markAsDown(connection: Address): this.type = { + connections.put(connection, Down) + this + } + + def markAsUp(connection: Address): this.type = { + connections.put(connection, Up) + this + } + + def isAvailable(connection: Address): Boolean = connections.get(connection) match { + case null ⇒ + log.debug("Adding cluster node [{}]", connection) + connections.put(connection, Up) + true + case Up ⇒ + log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) + true + case Down ⇒ + log.debug("isAvailable: Cluster node IS available [{}]", connection) + false + } + + def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) + + def remove(connection: Address): Unit = { + log.debug("Removing cluster node [{}]", connection) + connections.remove(connection) + } +}