diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index b35f08ec9b..d6693dbeca 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -22,7 +22,6 @@ class DeployerSpec extends AkkaSpec { None, RoundRobin, NrOfInstances(3), - BannagePeriodFailureDetector(10 seconds), RemoteScope(List( RemoteAddress("wallace", 2552), RemoteAddress("gromit", 2552)))))) // ClusterScope( diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 7b8a6aa751..adf251c766 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -22,7 +22,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { None, RoundRobin, NrOfInstances(5), - NoOpFailureDetector, LocalScope)) val helloLatch = new CountDownLatch(5) @@ -58,7 +57,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { None, RoundRobin, NrOfInstances(10), - NoOpFailureDetector, LocalScope)) val connectionCount = 10 @@ -103,7 +101,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { None, RoundRobin, NrOfInstances(5), - NoOpFailureDetector, LocalScope)) val helloLatch = new CountDownLatch(5) @@ -138,7 +135,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { None, Random, NrOfInstances(7), - NoOpFailureDetector, LocalScope)) val stopLatch = new CountDownLatch(7) @@ -172,7 +168,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { None, Random, NrOfInstances(10), - NoOpFailureDetector, LocalScope)) val connectionCount = 10 @@ -217,7 +212,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { None, Random, NrOfInstances(6), - NoOpFailureDetector, LocalScope)) val helloLatch = new CountDownLatch(6) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 920ed18be9..0069f8f419 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -162,11 +162,11 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { (if (systemService) None else deployer.lookupDeployment(address)) match { // see if the deployment already exists, if so use it, if not create actor // create a local actor - case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ + case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒ new LocalActorRef(app, props, supervisor, address, systemService) // create a local actor // create a routed actor ref - case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ + case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { case RouterType.Direct ⇒ () ⇒ new DirectRouter diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index f550c34bfa..653f64346e 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -51,7 +51,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { def deploy(deployment: Deploy): Unit = instance.deploy(deployment) def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, _, _, _, LocalScope) | Deploy(_, _, _, _, _, _: LocalScope) ⇒ true + case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) ⇒ true case _ ⇒ false } @@ -144,50 +144,6 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } } - // -------------------------------- - // akka.actor.deployment.
.failure-detector. - // -------------------------------- - val failureDetectorOption: Option[FailureDetector] = addressConfig.getSection("failure-detector") match { - case Some(failureDetectorConfig) ⇒ - failureDetectorConfig.keys.toList match { - case Nil ⇒ None - case detector :: Nil ⇒ - detector match { - case "no-op" ⇒ - Some(NoOpFailureDetector) - - case "remove-connection-on-first-failure" ⇒ - Some(RemoveConnectionOnFirstFailureFailureDetector) - - case "bannage-period" ⇒ - throw new ConfigurationException( - "Configuration for [" + addressPath + ".failure-detector.bannage-period] must have a 'time-to-ban' option defined") - - case "bannage-period.time-to-ban" ⇒ - failureDetectorConfig.getSection("bannage-period") map { section ⇒ - val timeToBan = Duration(section.getInt("time-to-ban", 60), app.AkkaConfig.DefaultTimeUnit) - BannagePeriodFailureDetector(timeToBan) - } - - case "custom" ⇒ - failureDetectorConfig.getSection("custom") map { section ⇒ - val implementationClass = section.getString("class").getOrElse(throw new ConfigurationException( - "Configuration for [" + addressPath + - ".failure-detector.custom] must have a 'class' element with the fully qualified name of the failure detector class")) - CustomFailureDetector(implementationClass) - } - - case _ ⇒ None - } - case detectors ⇒ - throw new ConfigurationException( - "Configuration for [" + addressPath + - ".failure-detector] can not have multiple sections - found [" + detectors.mkString(", ") + "]") - } - case None ⇒ None - } - val failureDetector = failureDetectorOption getOrElse { NoOpFailureDetector } // fall back to default failure detector - // -------------------------------- // akka.actor.deployment.
.create-as // -------------------------------- @@ -235,7 +191,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } } - Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(remoteAddresses))) + Some(Deploy(address, recipe, router, nrOfInstances, RemoteScope(remoteAddresses))) case None ⇒ // check for 'cluster' config section @@ -266,9 +222,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { val address = tokenizer.nextElement.asInstanceOf[String] protocol match { - //case "host" ⇒ Host(address) case "node" ⇒ Node(address) - //case "ip" ⇒ IP(address) case _ ⇒ raiseHomeConfigError() } } @@ -279,7 +233,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { // -------------------------------- clusterConfig.getSection("replication") match { case None ⇒ - Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.ClusterScope(preferredNodes, Transient))) + Some(Deploy(address, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Transient))) case Some(replicationConfig) ⇒ val storage = replicationConfig.getString("storage", "transaction-log") match { @@ -298,7 +252,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + unknown + "]") } - Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy)))) + Some(Deploy(address, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy)))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 8931f1d13e..1782cda940 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -19,7 +19,6 @@ object DeploymentConfig { recipe: Option[ActorRecipe], routing: Routing = Direct, nrOfInstances: NrOfInstances = ZeroNrOfInstances, - failureDetector: FailureDetector = NoOpFailureDetector, scope: Scope = LocalScope) // -------------------------------- @@ -51,21 +50,6 @@ object DeploymentConfig { case object LeastRAM extends Routing case object LeastMessages extends Routing - // -------------------------------- - // --- FailureDetector - // -------------------------------- - sealed trait FailureDetector - case class BannagePeriodFailureDetector(timeToBan: Duration) extends FailureDetector - case class CustomFailureDetector(className: String) extends FailureDetector - - // For Java API - case class NoOpFailureDetector() extends FailureDetector - case class RemoveConnectionOnFirstFailureFailureDetector() extends FailureDetector - - // For Scala API - case object NoOpFailureDetector extends FailureDetector - case object RemoveConnectionOnFirstFailureFailureDetector extends FailureDetector - // -------------------------------- // --- Scope // -------------------------------- @@ -176,16 +160,6 @@ object DeploymentConfig { // case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]") } - def failureDetectorTypeFor(failureDetector: FailureDetector): FailureDetectorType = failureDetector match { - case NoOpFailureDetector ⇒ FailureDetectorType.NoOp - case NoOpFailureDetector() ⇒ FailureDetectorType.NoOp - case BannagePeriodFailureDetector(timeToBan) ⇒ FailureDetectorType.BannagePeriod(timeToBan) - case RemoveConnectionOnFirstFailureFailureDetector ⇒ FailureDetectorType.RemoveConnectionOnFirstFailure - case RemoveConnectionOnFirstFailureFailureDetector() ⇒ FailureDetectorType.RemoveConnectionOnFirstFailure - case CustomFailureDetector(implClass) ⇒ FailureDetectorType.Custom(implClass) - case unknown ⇒ throw new UnsupportedOperationException("Unknown FailureDetector [" + unknown + "]") - } - def routerTypeFor(routing: Routing): RouterType = routing match { case _: Direct | Direct ⇒ RouterType.Direct case _: RoundRobin | RoundRobin ⇒ RouterType.RoundRobin @@ -255,7 +229,7 @@ class DeploymentConfig(val app: AkkaApplication) { def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == app.nodename) def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match { - case Deploy(_, _, _, _, _, ClusterScope(_, replicationScheme)) ⇒ Some(replicationScheme) + case Deploy(_, _, _, _, ClusterScope(_, replicationScheme)) ⇒ Some(replicationScheme) case _ ⇒ None } diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index 9d435c06e1..1632eb9055 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -12,6 +12,8 @@ import scala.annotation.tailrec import System.{ currentTimeMillis ⇒ newTimestamp } +import akka.AkkaApplication + /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf] @@ -22,11 +24,15 @@ import System.{ currentTimeMillis ⇒ newTimestamp } *

* Default threshold is 8, but can be configured in the Akka config. */ -class AccrualFailureDetector( - val threshold: Int = 8, - val maxSampleSize: Int = 1000) { +class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 1000) { - final val PhiFactor = 1.0 / math.log(10.0) + def this(app: AkkaApplication) { + this( + app.config.getInt("akka.remote.failure-detector.theshold", 8), + app.config.getInt("akka.remote.failure-detector.max-sample-size", 1000)) + } + + private final val PhiFactor = 1.0 / math.log(10.0) private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 50d1ebb87f..e531a1e655 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -37,7 +37,7 @@ class Remote(val app: AkkaApplication) { val shouldCompressData = config.getBool("akka.remote.use-compression", false) val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt - val failureDetector = new AccrualFailureDetector(FailureDetectorThreshold, FailureDetectorMaxSampleSize) + val failureDetector = new AccrualFailureDetector(app) // val gossiper = new Gossiper(this) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index dc6b18c6d9..3ed89f00a0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -58,7 +58,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider case null ⇒ val actor: ActorRef = try { deployer.lookupDeploymentFor(address) match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ // FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one // val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 0a5fcb9fa8..ebc7437b17 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -75,20 +75,6 @@ akka { # default is '1' # if the "direct" router is used then this element is ignored (always '1') - failure-detector { # failure detection scheme to use - bannage-period { # available: no-op {} - time-to-ban = 10 # remove-connection-on-first-failure {} - } # bannage-period { ... } - - # Options: - # remove-connection-on-first-failure {} # threshold { ... } - # custom { # custom { ... } - fully qualified class name of the router class - # class = "com.biz.app.MyCustomFailureDetector" - # } - # threshold { - # } - } - #create-as { # class = "com.biz.app.MyActor" # FIXME document 'create-as' #} @@ -236,6 +222,14 @@ akka { remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc. + failure-detector { # accrual failure detection config + threshold = 8 # defines the failure detector threshold + # A low threshold is prone to generate many wrong suspicions but ensures a + # quick detection in the event of a real crash. Conversely, a high threshold + # generates fewer mistakes but needs more time to detect actual crashes + max-sample-size = 1000 + } + server { port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA) message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads