From fd4bc090358d19414ed152443c70157a06101883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 17 May 2013 14:16:26 +0200 Subject: [PATCH] EventStream is now passed to failure detectors - Also, dynamic loading is now centralized (DRY) --- .../src/main/resources/reference.conf | 3 ++- .../src/main/scala/akka/cluster/Cluster.scala | 10 ++------- .../akka/cluster/FailureDetectorPuppet.scala | 3 ++- akka-remote/src/main/resources/reference.conf | 6 ++++-- .../akka/remote/FailureDetectorRegistry.scala | 21 +++++++++++++++++++ .../remote/PhiAccrualFailureDetector.scala | 3 ++- .../akka/remote/RemoteActorRefProvider.scala | 10 ++------- .../transport/AkkaProtocolTransport.scala | 10 ++------- 8 files changed, 37 insertions(+), 29 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b34c0c9eb1..855e904569 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -93,7 +93,8 @@ akka { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have - # a public constructor with a com.typesafe.config.Config parameter. + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. implementation-class = "akka.remote.PhiAccrualFailureDetector" # How often keep-alive heartbeat messages should be sent to each connection. diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1ca230fdae..c3716ca2b6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -91,14 +91,8 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { log.info("Cluster Node [{}] - is starting up...", selfAddress) val failureDetector: FailureDetectorRegistry[Address] = { - def createFailureDetector(): FailureDetector = { - import settings.{ FailureDetectorImplementationClass ⇒ fqcn } - system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({ - case e ⇒ throw new ConfigurationException( - s"Could not create custom cluster failure detector [$fqcn] due to: ${e.toString}", e) - }).get - } + def createFailureDetector(): FailureDetector = + FailureDetectorLoader.load(settings.FailureDetectorImplementationClass, settings.FailureDetectorConfig, system) new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) } diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index 40848f340c..4a62c49bf9 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -8,11 +8,12 @@ import java.util.concurrent.atomic.AtomicReference import akka.remote.testkit.MultiNodeConfig import akka.remote.FailureDetector import com.typesafe.config.Config +import akka.event.EventStream /** * User controllable "puppet" failure detector. */ -class FailureDetectorPuppet(config: Config) extends FailureDetector { +class FailureDetectorPuppet(config: Config, ev: EventStream) extends FailureDetector { trait Status object Up extends Status diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 6d97ac78a7..42044c912f 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -140,7 +140,8 @@ akka { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have - # a public constructor with a com.typesafe.config.Config parameter. + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. implementation-class = "akka.remote.PhiAccrualFailureDetector" # How often keep-alive heartbeat messages should be sent to each connection. @@ -177,7 +178,8 @@ akka { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have - # a public constructor with a com.typesafe.config.Config parameter. + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. implementation-class = "akka.remote.PhiAccrualFailureDetector" # How often keep-alive heartbeat messages should be sent to each connection. diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala index 31fa03bfe6..9a8ee8e2b8 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala @@ -4,6 +4,11 @@ package akka.remote +import akka.actor.{ ActorContext, ActorSystem, ExtendedActorSystem } +import com.typesafe.config.Config +import akka.event.EventStream +import akka.ConfigurationException + /** * Interface for a registry of Akka failure detectors. New resources are implicitly registered when heartbeat is first * called with the resource given as parameter. @@ -41,3 +46,19 @@ trait FailureDetectorRegistry[A] { */ def reset(): Unit } + +object FailureDetectorLoader { + + def load(fqcn: String, config: Config, system: ActorSystem): FailureDetector = { + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, List( + classOf[Config] -> config, + classOf[EventStream] -> system.eventStream)).recover({ + case e ⇒ throw new ConfigurationException( + s"Could not create custom failure detector [$fqcn] due to: ${e.toString}", e) + }).get + } + + def apply(fqcn: String, config: Config)(implicit ctx: ActorContext) = load(fqcn, config, ctx.system) + +} diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index c4b6a6c8bb..336fa0b7d7 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import com.typesafe.config.Config +import akka.event.EventStream /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: @@ -66,7 +67,7 @@ class PhiAccrualFailureDetector( * `min-std-deviation`, `acceptable-heartbeat-pause` and * `heartbeat-interval`. */ - def this(config: Config) = + def this(config: Config, ev: EventStream) = this( threshold = config.getDouble("threshold"), maxSampleSize = config.getInt("max-sample-size"), diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index c1943acf44..01c45e06b3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -195,14 +195,8 @@ private[akka] class RemoteActorRefProvider( } protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = { - def createFailureDetector(): FailureDetector = { - import remoteSettings.{ WatchFailureDetectorImplementationClass ⇒ fqcn } - system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, List(classOf[Config] -> remoteSettings.WatchFailureDetectorConfig)).recover({ - case e ⇒ throw new ConfigurationException( - s"Could not create custom remote watcher failure detector [$fqcn] due to: ${e.toString}", e) - }).get - } + def createFailureDetector(): FailureDetector = + FailureDetectorLoader.load(remoteSettings.WatchFailureDetectorImplementationClass, remoteSettings.WatchFailureDetectorConfig, system) new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index ec418b3ac4..3b6c83981b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -138,14 +138,8 @@ private[transport] class AkkaProtocolManager( failureDetector), actorNameFor(remoteAddress)) } - private def createTransportFailureDetector(): FailureDetector = { - import settings.{ TransportFailureDetectorImplementationClass ⇒ fqcn } - context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, List(classOf[Config] -> settings.TransportFailureDetectorConfig)).recover({ - case e ⇒ throw new ConfigurationException( - s"Could not create custom remote failure detector [$fqcn] due to: ${e.toString}", e) - }).get - } + private def createTransportFailureDetector(): FailureDetector = + FailureDetectorLoader(settings.TransportFailureDetectorImplementationClass, settings.TransportFailureDetectorConfig) override def postStop() { wrappedTransport.shutdown()