Removing the naught default in code of the failure detector and changed so that the AccrualFailureDetectors constructor matches what the instantiator expects
This commit is contained in:
parent
063c260615
commit
9b73d75c1b
5 changed files with 11 additions and 16 deletions
|
|
@ -59,7 +59,7 @@ akka {
|
||||||
# network drop.
|
# network drop.
|
||||||
acceptable-heartbeat-pause = 3s
|
acceptable-heartbeat-pause = 3s
|
||||||
|
|
||||||
implementation-class = ""
|
implementation-class = "akka.cluster.AccrualFailureDetector"
|
||||||
|
|
||||||
max-sample-size = 1000
|
max-sample-size = 1000
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -403,14 +403,12 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
|
||||||
override def createExtension(system: ExtendedActorSystem): Cluster = {
|
override def createExtension(system: ExtendedActorSystem): Cluster = {
|
||||||
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
||||||
|
|
||||||
val failureDetector = clusterSettings.FailureDetectorImplementationClass match {
|
val failureDetector = {
|
||||||
case None ⇒ new AccrualFailureDetector(system, clusterSettings)
|
import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn }
|
||||||
case Some(fqcn) ⇒
|
|
||||||
system.dynamicAccess.createInstanceFor[FailureDetector](
|
system.dynamicAccess.createInstanceFor[FailureDetector](
|
||||||
fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match {
|
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold(
|
||||||
case Right(fd) ⇒ fd
|
e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString),
|
||||||
case Left(e) ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
|
identity)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
new Cluster(system, failureDetector)
|
new Cluster(system, failureDetector)
|
||||||
|
|
|
||||||
|
|
@ -16,10 +16,7 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
|
|
||||||
final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
|
final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
|
||||||
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
|
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
|
||||||
final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match {
|
final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class")
|
||||||
case "" ⇒ None
|
|
||||||
case fqcn ⇒ Some(fqcn)
|
|
||||||
}
|
|
||||||
final val FailureDetectorMinStdDeviation: Duration =
|
final val FailureDetectorMinStdDeviation: Duration =
|
||||||
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
|
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
|
||||||
final val FailureDetectorAcceptableHeartbeatPause: Duration =
|
final val FailureDetectorAcceptableHeartbeatPause: Duration =
|
||||||
|
|
|
||||||
|
|
@ -55,7 +55,7 @@ trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: Mul
|
||||||
|
|
||||||
override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name))
|
override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name))
|
||||||
|
|
||||||
override def markNodeAsAvailable(address: Address): Unit = { /* no-op */ }
|
override def markNodeAsAvailable(address: Address): Unit = ()
|
||||||
|
|
||||||
override def markNodeAsUnavailable(address: Address): Unit = { /* no-op */ }
|
override def markNodeAsUnavailable(address: Address): Unit = ()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ class ClusterConfigSpec extends AkkaSpec {
|
||||||
import settings._
|
import settings._
|
||||||
FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001)
|
FailureDetectorThreshold must be(8.0 plusOrMinus 0.0001)
|
||||||
FailureDetectorMaxSampleSize must be(1000)
|
FailureDetectorMaxSampleSize must be(1000)
|
||||||
FailureDetectorImplementationClass must be(None)
|
FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName)
|
||||||
FailureDetectorMinStdDeviation must be(100 millis)
|
FailureDetectorMinStdDeviation must be(100 millis)
|
||||||
FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
|
FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
|
||||||
NodeToJoin must be(None)
|
NodeToJoin must be(None)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue