Incorporate review comments, see #2284

This commit is contained in:
Patrik Nordwall 2012-10-09 17:54:54 +02:00
parent 3f73705abc
commit 59f8210b85
2 changed files with 33 additions and 16 deletions

View file

@ -103,9 +103,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
self ! HeartbeatTick
}
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
}
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
override def postStop(): Unit = {
heartbeatTask.cancel()
@ -131,6 +129,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
all = state.members.collect { case m if m.address != selfAddress m.address }
joinInProgress --= all
consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
update()
}
def addMember(m: Member): Unit = if (m.address != selfAddress) {
@ -259,11 +258,11 @@ private[cluster] object ClusterHeartbeatSenderConnection {
* Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
* and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
*
* Netty blocks when sending to broken connections, and this actor uses
* a configurable circuit breaker to reduce connect attempts to broken
* This actor exists only because Netty blocks when sending to broken connections,
* and this actor uses a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
* @see ClusterHeartbeatSender
* @see akka.cluster.ClusterHeartbeatSender
*/
private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging {
@ -283,7 +282,8 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// the CircuitBreaker will measure elapsed time and open if too many long calls
// Netty blocks when sending to broken connections, the CircuitBreaker will
// measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
toRef ! heartbeatMsg
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }

View file

@ -16,17 +16,34 @@ import scala.concurrent.util.FiniteDuration
class ClusterSettings(val config: Config, val systemName: String) {
import config._
final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class")
final val FailureDetectorMinStdDeviation: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
final val FailureDetectorThreshold: Double = {
val x = getDouble("akka.cluster.failure-detector.threshold")
require(x > 0.0, "failure-detector.threshold must be > 0")
x
}
final val FailureDetectorMaxSampleSize: Int = {
val n = getInt("akka.cluster.failure-detector.max-sample-size")
require(n > 0, "failure-detector.max-sample-size must be > 0"); n
}
final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class")
final val FailureDetectorMinStdDeviation: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d
}
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d
}
final val HeartbeatInterval: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d
}
final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
final val MonitoredByNrOfMembers: Int = {
val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
}
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr