EventStream is now passed to failure detectors
- Also, dynamic loading is now centralized (DRY)
This commit is contained in:
parent
d0ed7385b2
commit
fd4bc09035
8 changed files with 37 additions and 29 deletions
|
|
@ -93,7 +93,8 @@ akka {
|
|||
|
||||
# FQCN of the failure detector implementation.
|
||||
# It must implement akka.remote.FailureDetector and have
|
||||
# a public constructor with a com.typesafe.config.Config parameter.
|
||||
# a public constructor with a com.typesafe.config.Config and
|
||||
# akka.actor.EventStream parameter.
|
||||
implementation-class = "akka.remote.PhiAccrualFailureDetector"
|
||||
|
||||
# How often keep-alive heartbeat messages should be sent to each connection.
|
||||
|
|
|
|||
|
|
@ -91,14 +91,8 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
log.info("Cluster Node [{}] - is starting up...", selfAddress)
|
||||
|
||||
val failureDetector: FailureDetectorRegistry[Address] = {
|
||||
def createFailureDetector(): FailureDetector = {
|
||||
import settings.{ FailureDetectorImplementationClass ⇒ fqcn }
|
||||
system.dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, List(classOf[Config] -> settings.FailureDetectorConfig)).recover({
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create custom cluster failure detector [$fqcn] due to: ${e.toString}", e)
|
||||
}).get
|
||||
}
|
||||
def createFailureDetector(): FailureDetector =
|
||||
FailureDetectorLoader.load(settings.FailureDetectorImplementationClass, settings.FailureDetectorConfig, system)
|
||||
|
||||
new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,11 +8,12 @@ import java.util.concurrent.atomic.AtomicReference
|
|||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.FailureDetector
|
||||
import com.typesafe.config.Config
|
||||
import akka.event.EventStream
|
||||
|
||||
/**
|
||||
* User controllable "puppet" failure detector.
|
||||
*/
|
||||
class FailureDetectorPuppet(config: Config) extends FailureDetector {
|
||||
class FailureDetectorPuppet(config: Config, ev: EventStream) extends FailureDetector {
|
||||
|
||||
trait Status
|
||||
object Up extends Status
|
||||
|
|
|
|||
|
|
@ -140,7 +140,8 @@ akka {
|
|||
|
||||
# FQCN of the failure detector implementation.
|
||||
# It must implement akka.remote.FailureDetector and have
|
||||
# a public constructor with a com.typesafe.config.Config parameter.
|
||||
# a public constructor with a com.typesafe.config.Config and
|
||||
# akka.actor.EventStream parameter.
|
||||
implementation-class = "akka.remote.PhiAccrualFailureDetector"
|
||||
|
||||
# How often keep-alive heartbeat messages should be sent to each connection.
|
||||
|
|
@ -177,7 +178,8 @@ akka {
|
|||
|
||||
# FQCN of the failure detector implementation.
|
||||
# It must implement akka.remote.FailureDetector and have
|
||||
# a public constructor with a com.typesafe.config.Config parameter.
|
||||
# a public constructor with a com.typesafe.config.Config and
|
||||
# akka.actor.EventStream parameter.
|
||||
implementation-class = "akka.remote.PhiAccrualFailureDetector"
|
||||
|
||||
# How often keep-alive heartbeat messages should be sent to each connection.
|
||||
|
|
|
|||
|
|
@ -4,6 +4,11 @@
|
|||
|
||||
package akka.remote
|
||||
|
||||
import akka.actor.{ ActorContext, ActorSystem, ExtendedActorSystem }
|
||||
import com.typesafe.config.Config
|
||||
import akka.event.EventStream
|
||||
import akka.ConfigurationException
|
||||
|
||||
/**
|
||||
* Interface for a registry of Akka failure detectors. New resources are implicitly registered when heartbeat is first
|
||||
* called with the resource given as parameter.
|
||||
|
|
@ -41,3 +46,19 @@ trait FailureDetectorRegistry[A] {
|
|||
*/
|
||||
def reset(): Unit
|
||||
}
|
||||
|
||||
object FailureDetectorLoader {
|
||||
|
||||
def load(fqcn: String, config: Config, system: ActorSystem): FailureDetector = {
|
||||
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, List(
|
||||
classOf[Config] -> config,
|
||||
classOf[EventStream] -> system.eventStream)).recover({
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create custom failure detector [$fqcn] due to: ${e.toString}", e)
|
||||
}).get
|
||||
}
|
||||
|
||||
def apply(fqcn: String, config: Config)(implicit ctx: ActorContext) = load(fqcn, config, ctx.system)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import scala.concurrent.duration.Duration
|
|||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.collection.immutable
|
||||
import com.typesafe.config.Config
|
||||
import akka.event.EventStream
|
||||
|
||||
/**
|
||||
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
|
||||
|
|
@ -66,7 +67,7 @@ class PhiAccrualFailureDetector(
|
|||
* `min-std-deviation`, `acceptable-heartbeat-pause` and
|
||||
* `heartbeat-interval`.
|
||||
*/
|
||||
def this(config: Config) =
|
||||
def this(config: Config, ev: EventStream) =
|
||||
this(
|
||||
threshold = config.getDouble("threshold"),
|
||||
maxSampleSize = config.getInt("max-sample-size"),
|
||||
|
|
|
|||
|
|
@ -195,14 +195,8 @@ private[akka] class RemoteActorRefProvider(
|
|||
}
|
||||
|
||||
protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = {
|
||||
def createFailureDetector(): FailureDetector = {
|
||||
import remoteSettings.{ WatchFailureDetectorImplementationClass ⇒ fqcn }
|
||||
system.dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, List(classOf[Config] -> remoteSettings.WatchFailureDetectorConfig)).recover({
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create custom remote watcher failure detector [$fqcn] due to: ${e.toString}", e)
|
||||
}).get
|
||||
}
|
||||
def createFailureDetector(): FailureDetector =
|
||||
FailureDetectorLoader.load(remoteSettings.WatchFailureDetectorImplementationClass, remoteSettings.WatchFailureDetectorConfig, system)
|
||||
|
||||
new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -138,14 +138,8 @@ private[transport] class AkkaProtocolManager(
|
|||
failureDetector), actorNameFor(remoteAddress))
|
||||
}
|
||||
|
||||
private def createTransportFailureDetector(): FailureDetector = {
|
||||
import settings.{ TransportFailureDetectorImplementationClass ⇒ fqcn }
|
||||
context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, List(classOf[Config] -> settings.TransportFailureDetectorConfig)).recover({
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create custom remote failure detector [$fqcn] due to: ${e.toString}", e)
|
||||
}).get
|
||||
}
|
||||
private def createTransportFailureDetector(): FailureDetector =
|
||||
FailureDetectorLoader(settings.TransportFailureDetectorImplementationClass, settings.TransportFailureDetectorConfig)
|
||||
|
||||
override def postStop() {
|
||||
wrappedTransport.shutdown()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue