Misc fixes after FailureDetectorPuppet and abstraction review

- Moved FailureDetectorPuppet to its own file in src/test.
- Removed 'phi' method from FailureDetector public API.
- Throwing exception instead of falling back to default if we can't load the custom FD.
- Removed add-connection method in FailureDetectorPuppet.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-06-11 10:06:53 +02:00
parent 0030fa1b52
commit ec7177be74
4 changed files with 70 additions and 82 deletions

View file

@ -309,14 +309,13 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): Cluster = {
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
def createDefaultFD() = new AccrualFailureDetector(system, clusterSettings)
val failureDetector = clusterSettings.FailureDetectorImplementationClass match {
case None createDefaultFD()
case Some(fqcn) system.dynamicAccess.createInstanceFor[FailureDetector](fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match {
case None new AccrualFailureDetector(system, clusterSettings)
case Some(fqcn)
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match {
case Right(fd) fd
case Left(e)
system.log.error(e, "Could not create custom failure detector - falling back to default")
createDefaultFD()
case Left(e) throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
}
}

View file

@ -4,8 +4,7 @@
package akka.cluster
import akka.actor.{ Address, ActorSystem }
import akka.event.{ Logging, LogSource }
import akka.actor.Address
/**
* Interface for Akka failure detectors.
@ -13,8 +12,7 @@ import akka.event.{ Logging, LogSource }
trait FailureDetector {
/**
* Returns true if the connection is considered to be up and healthy
* and returns false otherwise.
* Returns true if the connection is considered to be up and healthy and returns false otherwise.
*/
def isAvailable(connection: Address): Boolean
@ -23,77 +21,8 @@ trait FailureDetector {
*/
def heartbeat(connection: Address): Unit
/**
* Calculates how likely it is that the connection has failed.
* <p/>
* If a connection does not have any records in failure detector then it is
* considered healthy.
*/
def phi(connection: Address): Double
/**
* Removes the heartbeat management for a connection.
*/
def remove(connection: Address): Unit
}
/**
* User controllable "puppet" failure detector.
*/
class FailureDetectorPuppet(system: ActorSystem, connectionsToStartWith: Address*) extends FailureDetector {
import java.util.concurrent.ConcurrentHashMap
trait Status
object Up extends Status
object Down extends Status
implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
val log = Logging(system, this)
private val connections = {
val cs = new ConcurrentHashMap[Address, Status]
connectionsToStartWith foreach { cs put (_, Up) }
cs
}
def +(connection: Address): this.type = {
log.debug("Adding cluster node [{}]", connection)
connections.put(connection, Up)
this
}
def markAsDown(connection: Address): this.type = {
connections.put(connection, Down)
this
}
def markAsUp(connection: Address): this.type = {
connections.put(connection, Up)
this
}
def isAvailable(connection: Address): Boolean = connections.get(connection) match {
case null
this + connection
true
case Up
log.debug("isAvailable: Cluster node IS NOT available [{}]", connection)
true
case Down
log.debug("isAvailable: Cluster node IS available [{}]", connection)
false
}
def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection)
def phi(connection: Address): Double = 0.1D
def remove(connection: Address): Unit = {
log.debug("Removing cluster node [{}]", connection)
connections.remove(connection)
}
}

View file

@ -67,7 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
@volatile
var _unavailable: Set[Address] = Set.empty
override val failureDetector = new AccrualFailureDetector(system, clusterSettings) {
override val failureDetector = new FailureDetectorPuppet(system) {
override def isAvailable(connection: Address): Boolean = {
if (_unavailable.contains(connection)) false
else super.isAvailable(connection)

View file

@ -0,0 +1,60 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.{ Address, ActorSystem }
import akka.event.{ Logging, LogSource }
/**
* User controllable "puppet" failure detector.
*/
class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector {
import java.util.concurrent.ConcurrentHashMap
def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name))
trait Status
object Up extends Status
object Down extends Status
implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
private val log = Logging(system, this)
private val connections = new ConcurrentHashMap[Address, Status]
def markAsDown(connection: Address): this.type = {
connections.put(connection, Down)
this
}
def markAsUp(connection: Address): this.type = {
connections.put(connection, Up)
this
}
def isAvailable(connection: Address): Boolean = connections.get(connection) match {
case null
log.debug("Adding cluster node [{}]", connection)
connections.put(connection, Up)
true
case Up
log.debug("isAvailable: Cluster node IS NOT available [{}]", connection)
true
case Down
log.debug("isAvailable: Cluster node IS available [{}]", connection)
false
}
def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection)
def remove(connection: Address): Unit = {
log.debug("Removing cluster node [{}]", connection)
connections.remove(connection)
}
}