diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 4e80223dbf..232a890910 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -103,9 +103,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg self ! HeartbeatTick } - override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - } + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = { heartbeatTask.cancel() @@ -131,6 +129,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg all = state.members.collect { case m if m.address != selfAddress ⇒ m.address } joinInProgress --= all consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor) + update() } def addMember(m: Member): Unit = if (m.address != selfAddress) { @@ -259,11 +258,11 @@ private[cluster] object ClusterHeartbeatSenderConnection { * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address. * - * Netty blocks when sending to broken connections, and this actor uses - * a configurable circuit breaker to reduce connect attempts to broken + * This actor exists only because Netty blocks when sending to broken connections, + * and this actor uses a configurable circuit breaker to reduce connect attempts to broken * connections. * - * @see ClusterHeartbeatSender + * @see akka.cluster.ClusterHeartbeatSender */ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) extends Actor with ActorLogging { @@ -283,7 +282,8 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ if (!deadline.isOverdue) { log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) - // the CircuitBreaker will measure elapsed time and open if too many long calls + // Netty blocks when sending to broken connections, the CircuitBreaker will + // measure elapsed time and open if too many long calls try breaker.withSyncCircuitBreaker { toRef ! heartbeatMsg } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 38a3f4554c..fa35bc25a8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -16,17 +16,34 @@ import scala.concurrent.util.FiniteDuration class ClusterSettings(val config: Config, val systemName: String) { import config._ - final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold") - final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class") - final val FailureDetectorMinStdDeviation: FiniteDuration = - Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) - final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = - Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + final val FailureDetectorThreshold: Double = { + val x = getDouble("akka.cluster.failure-detector.threshold") + require(x > 0.0, "failure-detector.threshold must be > 0") + x + } + final val FailureDetectorMaxSampleSize: Int = { + val n = getInt("akka.cluster.failure-detector.max-sample-size") + require(n > 0, "failure-detector.max-sample-size must be > 0"); n + } + final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class") + final val FailureDetectorMinStdDeviation: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d + } + final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) + require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d + } + final val HeartbeatInterval: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d + } final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt - final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") + final val MonitoredByNrOfMembers: Int = { + val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") + require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n + } final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr