Moved 'failure-detector' config from 'akka.actor.deployment.address' to 'akka.remote'. Made AccrualFailureDetector configurable from config.

This commit is contained in:
Jonas Bonér 2011-11-10 11:50:11 +01:00
parent 39d169612f
commit f4740a4798
9 changed files with 27 additions and 106 deletions

View file

@ -22,7 +22,6 @@ class DeployerSpec extends AkkaSpec {
None,
RoundRobin,
NrOfInstances(3),
BannagePeriodFailureDetector(10 seconds),
RemoteScope(List(
RemoteAddress("wallace", 2552), RemoteAddress("gromit", 2552))))))
// ClusterScope(

View file

@ -22,7 +22,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
None,
RoundRobin,
NrOfInstances(5),
NoOpFailureDetector,
LocalScope))
val helloLatch = new CountDownLatch(5)
@ -58,7 +57,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
None,
RoundRobin,
NrOfInstances(10),
NoOpFailureDetector,
LocalScope))
val connectionCount = 10
@ -103,7 +101,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
None,
RoundRobin,
NrOfInstances(5),
NoOpFailureDetector,
LocalScope))
val helloLatch = new CountDownLatch(5)
@ -138,7 +135,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
None,
Random,
NrOfInstances(7),
NoOpFailureDetector,
LocalScope))
val stopLatch = new CountDownLatch(7)
@ -172,7 +168,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
None,
Random,
NrOfInstances(10),
NoOpFailureDetector,
LocalScope))
val connectionCount = 10
@ -217,7 +212,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
None,
Random,
NrOfInstances(6),
NoOpFailureDetector,
LocalScope))
val helloLatch = new CountDownLatch(6)

View file

@ -162,11 +162,11 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
(if (systemService) None else deployer.lookupDeployment(address)) match { // see if the deployment already exists, if so use it, if not create actor
// create a local actor
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope))
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope))
new LocalActorRef(app, props, supervisor, address, systemService) // create a local actor
// create a routed actor ref
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope))
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope))
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct () new DirectRouter

View file

@ -51,7 +51,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
def isLocal(deployment: Deploy): Boolean = deployment match {
case Deploy(_, _, _, _, _, LocalScope) | Deploy(_, _, _, _, _, _: LocalScope) true
case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) true
case _ false
}
@ -144,50 +144,6 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
}
}
// --------------------------------
// akka.actor.deployment.<address>.failure-detector.<detector>
// --------------------------------
val failureDetectorOption: Option[FailureDetector] = addressConfig.getSection("failure-detector") match {
case Some(failureDetectorConfig)
failureDetectorConfig.keys.toList match {
case Nil None
case detector :: Nil
detector match {
case "no-op"
Some(NoOpFailureDetector)
case "remove-connection-on-first-failure"
Some(RemoveConnectionOnFirstFailureFailureDetector)
case "bannage-period"
throw new ConfigurationException(
"Configuration for [" + addressPath + ".failure-detector.bannage-period] must have a 'time-to-ban' option defined")
case "bannage-period.time-to-ban"
failureDetectorConfig.getSection("bannage-period") map { section
val timeToBan = Duration(section.getInt("time-to-ban", 60), app.AkkaConfig.DefaultTimeUnit)
BannagePeriodFailureDetector(timeToBan)
}
case "custom"
failureDetectorConfig.getSection("custom") map { section
val implementationClass = section.getString("class").getOrElse(throw new ConfigurationException(
"Configuration for [" + addressPath +
".failure-detector.custom] must have a 'class' element with the fully qualified name of the failure detector class"))
CustomFailureDetector(implementationClass)
}
case _ None
}
case detectors
throw new ConfigurationException(
"Configuration for [" + addressPath +
".failure-detector] can not have multiple sections - found [" + detectors.mkString(", ") + "]")
}
case None None
}
val failureDetector = failureDetectorOption getOrElse { NoOpFailureDetector } // fall back to default failure detector
// --------------------------------
// akka.actor.deployment.<address>.create-as
// --------------------------------
@ -235,7 +191,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
}
}
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(remoteAddresses)))
Some(Deploy(address, recipe, router, nrOfInstances, RemoteScope(remoteAddresses)))
case None // check for 'cluster' config section
@ -266,9 +222,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
val address = tokenizer.nextElement.asInstanceOf[String]
protocol match {
//case "host" Host(address)
case "node" Node(address)
//case "ip" IP(address)
case _ raiseHomeConfigError()
}
}
@ -279,7 +233,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
// --------------------------------
clusterConfig.getSection("replication") match {
case None
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.ClusterScope(preferredNodes, Transient)))
Some(Deploy(address, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Transient)))
case Some(replicationConfig)
val storage = replicationConfig.getString("storage", "transaction-log") match {
@ -298,7 +252,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]")
}
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy))))
Some(Deploy(address, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy))))
}
}
}

View file

@ -19,7 +19,6 @@ object DeploymentConfig {
recipe: Option[ActorRecipe],
routing: Routing = Direct,
nrOfInstances: NrOfInstances = ZeroNrOfInstances,
failureDetector: FailureDetector = NoOpFailureDetector,
scope: Scope = LocalScope)
// --------------------------------
@ -51,21 +50,6 @@ object DeploymentConfig {
case object LeastRAM extends Routing
case object LeastMessages extends Routing
// --------------------------------
// --- FailureDetector
// --------------------------------
sealed trait FailureDetector
case class BannagePeriodFailureDetector(timeToBan: Duration) extends FailureDetector
case class CustomFailureDetector(className: String) extends FailureDetector
// For Java API
case class NoOpFailureDetector() extends FailureDetector
case class RemoveConnectionOnFirstFailureFailureDetector() extends FailureDetector
// For Scala API
case object NoOpFailureDetector extends FailureDetector
case object RemoveConnectionOnFirstFailureFailureDetector extends FailureDetector
// --------------------------------
// --- Scope
// --------------------------------
@ -176,16 +160,6 @@ object DeploymentConfig {
// case IP(address) throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
}
def failureDetectorTypeFor(failureDetector: FailureDetector): FailureDetectorType = failureDetector match {
case NoOpFailureDetector FailureDetectorType.NoOp
case NoOpFailureDetector() FailureDetectorType.NoOp
case BannagePeriodFailureDetector(timeToBan) FailureDetectorType.BannagePeriod(timeToBan)
case RemoveConnectionOnFirstFailureFailureDetector FailureDetectorType.RemoveConnectionOnFirstFailure
case RemoveConnectionOnFirstFailureFailureDetector() FailureDetectorType.RemoveConnectionOnFirstFailure
case CustomFailureDetector(implClass) FailureDetectorType.Custom(implClass)
case unknown throw new UnsupportedOperationException("Unknown FailureDetector [" + unknown + "]")
}
def routerTypeFor(routing: Routing): RouterType = routing match {
case _: Direct | Direct RouterType.Direct
case _: RoundRobin | RoundRobin RouterType.RoundRobin
@ -255,7 +229,7 @@ class DeploymentConfig(val app: AkkaApplication) {
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home nodeNameFor(home) == app.nodename)
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
case Deploy(_, _, _, _, _, ClusterScope(_, replicationScheme)) Some(replicationScheme)
case Deploy(_, _, _, _, ClusterScope(_, replicationScheme)) Some(replicationScheme)
case _ None
}

View file

@ -12,6 +12,8 @@ import scala.annotation.tailrec
import System.{ currentTimeMillis newTimestamp }
import akka.AkkaApplication
/**
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
* [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
@ -22,11 +24,15 @@ import System.{ currentTimeMillis ⇒ newTimestamp }
* <p/>
* Default threshold is 8, but can be configured in the Akka config.
*/
class AccrualFailureDetector(
val threshold: Int = 8,
val maxSampleSize: Int = 1000) {
class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 1000) {
final val PhiFactor = 1.0 / math.log(10.0)
def this(app: AkkaApplication) {
this(
app.config.getInt("akka.remote.failure-detector.theshold", 8),
app.config.getInt("akka.remote.failure-detector.max-sample-size", 1000))
}
private final val PhiFactor = 1.0 / math.log(10.0)
private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D)

View file

@ -37,7 +37,7 @@ class Remote(val app: AkkaApplication) {
val shouldCompressData = config.getBool("akka.remote.use-compression", false)
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
val failureDetector = new AccrualFailureDetector(FailureDetectorThreshold, FailureDetectorMaxSampleSize)
val failureDetector = new AccrualFailureDetector(app)
// val gossiper = new Gossiper(this)

View file

@ -58,7 +58,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
case null
val actor: ActorRef = try {
deployer.lookupDeploymentFor(address) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses)))
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses)))
// FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one
// val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {

View file

@ -75,20 +75,6 @@ akka {
# default is '1'
# if the "direct" router is used then this element is ignored (always '1')
failure-detector { # failure detection scheme to use
bannage-period { # available: no-op {}
time-to-ban = 10 # remove-connection-on-first-failure {}
} # bannage-period { ... }
# Options:
# remove-connection-on-first-failure {} # threshold { ... }
# custom { # custom { ... } - fully qualified class name of the router class
# class = "com.biz.app.MyCustomFailureDetector"
# }
# threshold {
# }
}
#create-as {
# class = "com.biz.app.MyActor" # FIXME document 'create-as'
#}
@ -236,6 +222,14 @@ akka {
remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc.
failure-detector { # accrual failure detection config
threshold = 8 # defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures a
# quick detection in the event of a real crash. Conversely, a high threshold
# generates fewer mistakes but needs more time to detect actual crashes
max-sample-size = 1000
}
server {
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads