Merge pull request #1029 from akka/wip-2907-heartbeats-patriknw
Detect failure when no heartbeats sent, see #2907
This commit is contained in:
commit
06807e73b8
25 changed files with 466 additions and 146 deletions
|
|
@ -55,10 +55,6 @@ akka {
|
|||
# A value of 0 s can be used to always publish the stats, when it happens.
|
||||
publish-stats-interval = 10s
|
||||
|
||||
# A joining node stops sending heartbeats to the node to join if it hasn't
|
||||
# become member of the cluster within this deadline.
|
||||
join-timeout = 60s
|
||||
|
||||
# The id of the dispatcher to use for cluster actors. If not specified
|
||||
# default dispatcher is used.
|
||||
# If specified you need to define the settings of the actual dispatcher.
|
||||
|
|
@ -109,7 +105,29 @@ akka {
|
|||
# network drop.
|
||||
acceptable-heartbeat-pause = 3s
|
||||
|
||||
# Number of samples to use for calculation of mean and standard deviation of
|
||||
# inter-arrival times.
|
||||
max-sample-size = 1000
|
||||
|
||||
# When a node stops sending heartbeats to another node it will end that
|
||||
# with this number of EndHeartbeat messages, which will remove the
|
||||
# monitoring from the failure detector.
|
||||
nr-of-end-heartbeats = 8
|
||||
|
||||
# When no expected heartbeat message has been received an explicit
|
||||
# heartbeat request is sent to the node that should emit heartbeats.
|
||||
heartbeat-request {
|
||||
# Grace period until an explicit heartbeat request is sent
|
||||
grace-period = 10 s
|
||||
|
||||
# After the heartbeat request has been sent the first failure detection
|
||||
# will start after this period, even though no heartbeat mesage has
|
||||
# been received.
|
||||
expected-response-after = 3 s
|
||||
|
||||
# Cleanup of obsolete heartbeat requests
|
||||
time-to-live = 60 s
|
||||
}
|
||||
}
|
||||
|
||||
metrics {
|
||||
|
|
|
|||
|
|
@ -109,18 +109,19 @@ class AccrualFailureDetector(
|
|||
|
||||
private val state = new AtomicReference[State](State())
|
||||
|
||||
/**
|
||||
* Returns true if the connection is considered to be up and healthy
|
||||
* and returns false otherwise.
|
||||
*/
|
||||
def isAvailable(connection: Address): Boolean = phi(connection) < threshold
|
||||
override def isAvailable(connection: Address): Boolean = phi(connection) < threshold
|
||||
|
||||
override def isMonitoring(connection: Address): Boolean = state.get.timestamps.get(connection).nonEmpty
|
||||
|
||||
/**
|
||||
* Records a heartbeat for a connection.
|
||||
*/
|
||||
@tailrec
|
||||
final def heartbeat(connection: Address) {
|
||||
log.debug("Heartbeat from connection [{}] ", connection)
|
||||
if (isMonitoring(connection))
|
||||
log.debug("Heartbeat from connection [{}] ", connection)
|
||||
else
|
||||
log.info("First heartbeat from connection [{}] ", connection)
|
||||
|
||||
val timestamp = clock()
|
||||
val oldState = state.get
|
||||
|
|
@ -197,7 +198,9 @@ class AccrualFailureDetector(
|
|||
*/
|
||||
@tailrec
|
||||
final def remove(connection: Address): Unit = {
|
||||
log.debug("Remove connection [{}] ", connection)
|
||||
if (isMonitoring(connection))
|
||||
log.info("Remove heartbeat connection [{}] ", connection)
|
||||
|
||||
val oldState = state.get
|
||||
|
||||
if (oldState.history.contains(connection)) {
|
||||
|
|
|
|||
|
|
@ -176,8 +176,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
/**
|
||||
* Subscribe to cluster domain events.
|
||||
* The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
|
||||
* or subclass. A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
|
||||
* will also be sent to the subscriber.
|
||||
* or subclass.
|
||||
*
|
||||
* A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
|
||||
* will be sent to the subscriber as the first event. When
|
||||
* `to` Class is a [[akka.cluster.ClusterEvent.InstantMemberEvent]]
|
||||
* (or subclass) the snapshot event will instead be a
|
||||
* [[akka.cluster.ClusterEvent.InstantClusterState]].
|
||||
*/
|
||||
def subscribe(subscriber: ActorRef, to: Class[_]): Unit =
|
||||
clusterCore ! InternalClusterAction.Subscribe(subscriber, to)
|
||||
|
|
|
|||
|
|
@ -181,7 +181,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
|
||||
import ClusterLeaderAction._
|
||||
import InternalClusterAction._
|
||||
import ClusterHeartbeatSender.JoinInProgress
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||
|
|
@ -281,7 +280,13 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||
*/
|
||||
def join(address: Address): Unit = {
|
||||
if (!latestGossip.members.exists(_.address == address)) {
|
||||
if (address.protocol != selfAddress.protocol)
|
||||
log.info("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.protocol, address.protocol)
|
||||
else if (address.system != selfAddress.system)
|
||||
log.info("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.system, address.system)
|
||||
else if (!latestGossip.members.exists(_.address == address)) {
|
||||
// wipe our state since a node that joins a cluster must be empty
|
||||
latestGossip = Gossip.empty
|
||||
// wipe the failure detector since we are starting fresh and shouldn't care about the past
|
||||
|
|
@ -290,7 +295,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
publisher ! PublishStart
|
||||
|
||||
publish(latestGossip)
|
||||
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
|
||||
|
||||
context.become(initialized)
|
||||
if (address == selfAddress)
|
||||
|
|
@ -331,7 +335,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
|
||||
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
|
||||
if (node != selfAddress) {
|
||||
failureDetector heartbeat node
|
||||
gossipTo(node)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,13 +6,12 @@ package akka.cluster
|
|||
import language.postfixOps
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration._
|
||||
import java.net.URLEncoder
|
||||
import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props }
|
||||
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.routing.ConsistentHash
|
||||
import akka.routing.MurmurHash
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -55,13 +54,26 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
|
|||
*/
|
||||
private[cluster] object ClusterHeartbeatSender {
|
||||
/**
|
||||
* Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
|
||||
* another node and heartbeats should be sent unconditionally until it becomes
|
||||
* member or deadline is overdue. This is done to be able to detect immediate death
|
||||
* of the joining node.
|
||||
* Request heartbeats from another node. Sent from the node that is
|
||||
* expecting heartbeats from a specific sender, but has not received any.
|
||||
*/
|
||||
case class HeartbeatRequest(from: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* Delayed sending of a HeartbeatRequest. The actual request is
|
||||
* only sent if no expected heartbeat message has been received.
|
||||
* Local only, no need to serialize.
|
||||
*/
|
||||
case class JoinInProgress(address: Address, deadline: Deadline)
|
||||
case class SendHeartbeatRequest(to: Address)
|
||||
|
||||
/**
|
||||
* Trigger a fake heartbeat message to trigger start of failure detection
|
||||
* of a node that this node is expecting heartbeats from. HeartbeatRequest
|
||||
* has been sent to the node so it should have started sending heartbeat
|
||||
* messages.
|
||||
* Local only, no need to serialize.
|
||||
*/
|
||||
case class ExpectedFirstHeartbeat(from: Address)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -88,16 +100,16 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
|
||||
val selfHeartbeat = Heartbeat(selfAddress)
|
||||
val selfEndHeartbeat = EndHeartbeat(selfAddress)
|
||||
val selfHeartbeatRequest = HeartbeatRequest(selfAddress)
|
||||
|
||||
var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor),
|
||||
selfAddress.toString, MonitoredByNrOfMembers)
|
||||
var state = ClusterHeartbeatSenderState.empty(selfAddress, MonitoredByNrOfMembers)
|
||||
|
||||
// start periodic heartbeat to other nodes in cluster
|
||||
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval,
|
||||
HeartbeatInterval, self, HeartbeatTick)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
cluster.subscribe(self, classOf[MemberEvent])
|
||||
cluster.subscribe(self, classOf[InstantMemberEvent])
|
||||
cluster.subscribe(self, classOf[UnreachableMember])
|
||||
}
|
||||
|
||||
|
|
@ -112,18 +124,27 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
|
||||
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
|
||||
|
||||
/**
|
||||
* Looks up and returns the remote cluster heartbeat sender for the specific address.
|
||||
*/
|
||||
def heartbeatSenderFor(address: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(address))
|
||||
|
||||
def receive = {
|
||||
case HeartbeatTick ⇒ heartbeat()
|
||||
case s: CurrentClusterState ⇒ reset(s)
|
||||
case UnreachableMember(m) ⇒ removeMember(m)
|
||||
case MemberDowned(m) ⇒ removeMember(m)
|
||||
case MemberRemoved(m) ⇒ removeMember(m)
|
||||
case e: MemberEvent ⇒ addMember(e.member)
|
||||
case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d)
|
||||
case HeartbeatTick ⇒ heartbeat()
|
||||
case InstantMemberUp(m) ⇒ addMember(m)
|
||||
case UnreachableMember(m) ⇒ removeMember(m)
|
||||
case InstantMemberDowned(m) ⇒ removeMember(m)
|
||||
case InstantMemberRemoved(m) ⇒ removeMember(m)
|
||||
case s: InstantClusterState ⇒ reset(s)
|
||||
case _: CurrentClusterState ⇒ // enough with InstantClusterState
|
||||
case _: InstantMemberEvent ⇒ // not interested in other types of InstantMemberEvent
|
||||
case HeartbeatRequest(from) ⇒ addHeartbeatRequest(from)
|
||||
case SendHeartbeatRequest(to) ⇒ sendHeartbeatRequest(to)
|
||||
case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
|
||||
}
|
||||
|
||||
def reset(snapshot: CurrentClusterState): Unit =
|
||||
state = state.reset(snapshot.members.collect { case m if m.address != selfAddress ⇒ m.address })
|
||||
def reset(snapshot: InstantClusterState): Unit =
|
||||
state = state.reset(snapshot.members.map(_.address))
|
||||
|
||||
def addMember(m: Member): Unit = if (m.address != selfAddress)
|
||||
state = state addMember m.address
|
||||
|
|
@ -131,11 +152,26 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
def removeMember(m: Member): Unit = if (m.address != selfAddress)
|
||||
state = state removeMember m.address
|
||||
|
||||
def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
|
||||
state = state.addJoinInProgress(address, deadline)
|
||||
def addHeartbeatRequest(address: Address): Unit = if (address != selfAddress)
|
||||
state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive)
|
||||
|
||||
def sendHeartbeatRequest(address: Address): Unit =
|
||||
if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) {
|
||||
heartbeatSenderFor(address) ! selfHeartbeatRequest
|
||||
// schedule the expected heartbeat for later, which will give the
|
||||
// sender a chance to start heartbeating, and also trigger some resends of
|
||||
// the heartbeat request
|
||||
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(address))
|
||||
}
|
||||
|
||||
def triggerFirstHeartbeat(address: Address): Unit =
|
||||
if (!cluster.failureDetector.isMonitoring(address)) {
|
||||
log.info("Trigger extra expected heartbeat from [{}]", address)
|
||||
cluster.failureDetector.heartbeat(address)
|
||||
}
|
||||
|
||||
def heartbeat(): Unit = {
|
||||
state = state.removeOverdueJoinInProgress()
|
||||
state = state.removeOverdueHeartbeatRequest()
|
||||
|
||||
def connection(to: Address): ActorRef = {
|
||||
// URL encoded target address as child actor name
|
||||
|
|
@ -161,6 +197,13 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
} else
|
||||
state = state.increaseEndingCount(to)
|
||||
}
|
||||
|
||||
// request heartbeats from expected sender node if no heartbeat messages has been received
|
||||
state.ring.mySenders foreach { address ⇒
|
||||
if (!cluster.failureDetector.isMonitoring(address))
|
||||
scheduler.scheduleOnce(HeartbeatRequestDelay, self, SendHeartbeatRequest(address))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -172,9 +215,8 @@ private[cluster] object ClusterHeartbeatSenderState {
|
|||
/**
|
||||
* Initial, empty state
|
||||
*/
|
||||
def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String,
|
||||
monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers)
|
||||
def empty(selfAddress: Address, monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(HeartbeatNodeRing(selfAddress, Set(selfAddress), monitoredByNrOfMembers))
|
||||
|
||||
/**
|
||||
* Create a new state based on previous state, and
|
||||
|
|
@ -182,33 +224,13 @@ private[cluster] object ClusterHeartbeatSenderState {
|
|||
*/
|
||||
private def apply(
|
||||
old: ClusterHeartbeatSenderState,
|
||||
consistentHash: ConsistentHash[Address],
|
||||
all: Set[Address]): ClusterHeartbeatSenderState = {
|
||||
ring: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
|
||||
|
||||
/**
|
||||
* Select a few peers that heartbeats will be sent to, i.e. that will
|
||||
* monitor this node. Try to send heartbeats to same nodes as much
|
||||
* as possible, but re-balance with consistent hashing algorithm when
|
||||
* new members are added or removed.
|
||||
*/
|
||||
def selectPeers: Set[Address] = {
|
||||
val allSize = all.size
|
||||
val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers)
|
||||
// try more if consistentHash results in same node as already selected
|
||||
val attemptLimit = nrOfPeers * 2
|
||||
@tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
|
||||
if (acc.size == nrOfPeers || n == attemptLimit) acc
|
||||
else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1)
|
||||
}
|
||||
if (nrOfPeers >= allSize) all
|
||||
else select(Set.empty[Address], 0)
|
||||
}
|
||||
|
||||
val curr = selectPeers
|
||||
val curr = ring.myReceivers
|
||||
// start ending process for nodes not selected any more
|
||||
// abort ending process for nodes that have been selected again
|
||||
val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
|
||||
old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end)
|
||||
old.copy(ring = ring, current = curr, ending = end, heartbeatRequest = old.heartbeatRequest -- curr)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -223,13 +245,10 @@ private[cluster] object ClusterHeartbeatSenderState {
|
|||
* i.e. the methods return new instances.
|
||||
*/
|
||||
private[cluster] case class ClusterHeartbeatSenderState private (
|
||||
consistentHash: ConsistentHash[Address],
|
||||
selfAddressStr: String,
|
||||
monitoredByNrOfMembers: Int,
|
||||
all: Set[Address] = Set.empty,
|
||||
ring: HeartbeatNodeRing,
|
||||
current: Set[Address] = Set.empty,
|
||||
ending: Map[Address, Int] = Map.empty,
|
||||
joinInProgress: Map[Address, Deadline] = Map.empty) {
|
||||
heartbeatRequest: Map[Address, Deadline] = Map.empty) {
|
||||
|
||||
// FIXME can be disabled as optimization
|
||||
assertInvariants
|
||||
|
|
@ -237,50 +256,53 @@ private[cluster] case class ClusterHeartbeatSenderState private (
|
|||
private def assertInvariants: Unit = {
|
||||
val currentAndEnding = current.intersect(ending.keySet)
|
||||
require(currentAndEnding.isEmpty,
|
||||
"Same nodes in current and ending not allowed, got [%s]" format currentAndEnding)
|
||||
val joinInProgressAndAll = joinInProgress.keySet.intersect(all)
|
||||
require(joinInProgressAndAll.isEmpty,
|
||||
"Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll)
|
||||
val currentNotInAll = current -- all
|
||||
require(currentNotInAll.isEmpty,
|
||||
"Nodes in current but not in all not allowed, got [%s]" format currentNotInAll)
|
||||
require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]"
|
||||
format all)
|
||||
s"Same nodes in current and ending not allowed, got [${currentAndEnding}]")
|
||||
|
||||
val currentAndHeartbeatRequest = current.intersect(heartbeatRequest.keySet)
|
||||
require(currentAndHeartbeatRequest.isEmpty,
|
||||
s"Same nodes in current and heartbeatRequest not allowed, got [${currentAndHeartbeatRequest}]")
|
||||
|
||||
val currentNotInAll = current -- ring.nodes
|
||||
require(current.isEmpty || currentNotInAll.isEmpty,
|
||||
s"Nodes in current but not in ring nodes not allowed, got [${currentNotInAll}]")
|
||||
|
||||
require(!current.contains(ring.selfAddress),
|
||||
s"Self in current not allowed, got [${ring.selfAddress}]")
|
||||
require(!heartbeatRequest.contains(ring.selfAddress),
|
||||
s"Self in heartbeatRequest not allowed, got [${ring.selfAddress}]")
|
||||
}
|
||||
|
||||
val active: Set[Address] = current ++ joinInProgress.keySet
|
||||
val active: Set[Address] = current ++ heartbeatRequest.keySet
|
||||
|
||||
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ },
|
||||
consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
|
||||
all = nodes)
|
||||
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = {
|
||||
ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeHeartbeatRequest _ }, ring.copy(nodes = nodes + ring.selfAddress))
|
||||
}
|
||||
|
||||
def addMember(a: Address): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a)
|
||||
ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :+ a)
|
||||
|
||||
def removeMember(a: Address): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a)
|
||||
ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :- a)
|
||||
|
||||
private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
|
||||
if (joinInProgress contains address)
|
||||
copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
|
||||
private def removeHeartbeatRequest(address: Address): ClusterHeartbeatSenderState = {
|
||||
if (heartbeatRequest contains address)
|
||||
copy(heartbeatRequest = heartbeatRequest - address, ending = ending + (address -> 0))
|
||||
else this
|
||||
}
|
||||
|
||||
def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
|
||||
if (all contains address) this
|
||||
else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address)
|
||||
def addHeartbeatRequest(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
|
||||
if (current.contains(address)) this
|
||||
else copy(heartbeatRequest = heartbeatRequest + (address -> deadline), ending = ending - address)
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup overdue joinInProgress, in case a joining node never
|
||||
* became member, for some reason.
|
||||
* Cleanup overdue heartbeatRequest
|
||||
*/
|
||||
def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = {
|
||||
val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address }
|
||||
def removeOverdueHeartbeatRequest(): ClusterHeartbeatSenderState = {
|
||||
val overdue = heartbeatRequest collect { case (address, deadline) if deadline.isOverdue ⇒ address }
|
||||
if (overdue.isEmpty) this
|
||||
else
|
||||
copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue)
|
||||
copy(ending = ending ++ overdue.map(_ -> 0), heartbeatRequest = heartbeatRequest -- overdue)
|
||||
}
|
||||
|
||||
def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
|
||||
|
|
@ -330,7 +352,7 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
|
|||
CircuitBreaker(context.system.scheduler,
|
||||
cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout).
|
||||
onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)).
|
||||
onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)).
|
||||
onOpen(log.info("CircuitBreaker Open for [{}]", toRef)).
|
||||
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
|
||||
}
|
||||
|
||||
|
|
@ -344,9 +366,81 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
|
|||
toRef ! heartbeatMsg
|
||||
} catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ }
|
||||
}
|
||||
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
|
||||
if (deadline.isOverdue) log.info("Sending heartbeat to [{}] took longer than expected", toRef)
|
||||
case SendEndHeartbeat(endHeartbeatMsg, _) ⇒
|
||||
log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef)
|
||||
toRef ! endHeartbeatMsg
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Data structure for picking heartbeat receivers and keep track of what nodes
|
||||
* that are expected to send heartbeat messages to a node. The node ring is
|
||||
* shuffled by deterministic hashing to avoid picking physically co-located
|
||||
* neighbors.
|
||||
*
|
||||
* It is immutable, i.e. the methods return new instances.
|
||||
*/
|
||||
private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[Address], monitoredByNrOfMembers: Int) {
|
||||
|
||||
require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]")
|
||||
|
||||
private val nodeRing: immutable.SortedSet[Address] = {
|
||||
implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
|
||||
val ha = hashFor(a)
|
||||
val hb = hashFor(b)
|
||||
ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0)
|
||||
}
|
||||
|
||||
immutable.SortedSet() ++ nodes
|
||||
}
|
||||
|
||||
private def hashFor(node: Address): Int = node match {
|
||||
// cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
|
||||
case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"${host}:${port}")
|
||||
case _ ⇒ 0
|
||||
}
|
||||
|
||||
/**
|
||||
* Receivers for `selfAddress`. Cached for subsequent access.
|
||||
*/
|
||||
lazy val myReceivers: immutable.Set[Address] = receivers(selfAddress)
|
||||
/**
|
||||
* Senders for `selfAddress`. Cached for subsequent access.
|
||||
*/
|
||||
lazy val mySenders: immutable.Set[Address] = senders(selfAddress)
|
||||
|
||||
private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1)
|
||||
|
||||
/**
|
||||
* The receivers to use from a specified sender.
|
||||
*/
|
||||
def receivers(sender: Address): immutable.Set[Address] =
|
||||
if (useAllAsReceivers)
|
||||
nodeRing - sender
|
||||
else {
|
||||
val slice = nodeRing.from(sender).tail.take(monitoredByNrOfMembers)
|
||||
if (slice.size < monitoredByNrOfMembers)
|
||||
(slice ++ nodeRing.take(monitoredByNrOfMembers - slice.size))
|
||||
else slice
|
||||
}
|
||||
|
||||
/**
|
||||
* The expected senders for a specific receiver.
|
||||
*/
|
||||
def senders(receiver: Address): Set[Address] =
|
||||
nodes filter { sender ⇒ receivers(sender) contains receiver }
|
||||
|
||||
/**
|
||||
* Add a node to the ring.
|
||||
*/
|
||||
def :+(node: Address): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node)
|
||||
|
||||
/**
|
||||
* Remove a node from the ring.
|
||||
*/
|
||||
def :-(node: Address): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
|||
MetricsInterval, self, MetricsTick)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
cluster.subscribe(self, classOf[MemberEvent])
|
||||
cluster.subscribe(self, classOf[InstantMemberEvent])
|
||||
cluster.subscribe(self, classOf[UnreachableMember])
|
||||
log.info("Metrics collection has started successfully on node [{}]", selfAddress)
|
||||
}
|
||||
|
|
@ -84,11 +84,15 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
|||
def receive = {
|
||||
case GossipTick ⇒ gossip()
|
||||
case MetricsTick ⇒ collect()
|
||||
case state: CurrentClusterState ⇒ receiveState(state)
|
||||
case MemberUp(m) ⇒ addMember(m)
|
||||
case e: MemberEvent ⇒ removeMember(e.member)
|
||||
case UnreachableMember(m) ⇒ removeMember(m)
|
||||
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
|
||||
case state: InstantClusterState ⇒ receiveState(state)
|
||||
case state: CurrentClusterState ⇒ // enough with InstantClusterState
|
||||
case InstantMemberUp(m) ⇒ addMember(m)
|
||||
case InstantMemberDowned(m) ⇒ removeMember(m)
|
||||
case InstantMemberRemoved(m) ⇒ removeMember(m)
|
||||
case UnreachableMember(m) ⇒ removeMember(m)
|
||||
case _: InstantMemberEvent ⇒ // not interested in other types of InstantMemberEvent
|
||||
|
||||
}
|
||||
|
||||
override def postStop: Unit = {
|
||||
|
|
@ -115,7 +119,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
|||
/**
|
||||
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
|
||||
*/
|
||||
def receiveState(state: CurrentClusterState): Unit =
|
||||
def receiveState(state: InstantClusterState): Unit =
|
||||
nodes = state.members collect { case m if m.status == Up ⇒ m.address }
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -39,8 +39,22 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
|
||||
require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d
|
||||
}
|
||||
final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
|
||||
final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
|
||||
final val HeartbeatRequestDelay: FiniteDuration = {
|
||||
val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.grace-period"), MILLISECONDS)
|
||||
require(d > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0"); d
|
||||
}
|
||||
final val HeartbeatExpectedResponseAfter: FiniteDuration = {
|
||||
val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.expected-response-after"), MILLISECONDS)
|
||||
require(d > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0"); d
|
||||
}
|
||||
final val HeartbeatRequestTimeToLive: FiniteDuration = {
|
||||
val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.time-to-live"), MILLISECONDS)
|
||||
require(d > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0"); d
|
||||
}
|
||||
final val NumberOfEndHeartbeats: Int = {
|
||||
val n = getInt("akka.cluster.failure-detector.nr-of-end-heartbeats")
|
||||
require(n > 0, "failure-detector.nr-of-end-heartbeats must be > 0"); n
|
||||
}
|
||||
final val MonitoredByNrOfMembers: Int = {
|
||||
val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
|
||||
require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
|
||||
|
|
@ -61,7 +75,6 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
require(n > 0, "min-nr-of-members must be > 0"); n
|
||||
}
|
||||
final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled")
|
||||
final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS)
|
||||
final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match {
|
||||
case "" ⇒ Dispatchers.DefaultDispatcherId
|
||||
case id ⇒ id
|
||||
|
|
|
|||
|
|
@ -16,6 +16,12 @@ trait FailureDetector {
|
|||
*/
|
||||
def isAvailable(connection: Address): Boolean
|
||||
|
||||
/**
|
||||
* Returns true if the failure detector has received any heartbeats and started monitoring
|
||||
* of the resource.
|
||||
*/
|
||||
def isMonitoring(connection: Address): Boolean
|
||||
|
||||
/**
|
||||
* Records a heartbeat for a connection.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ object Member {
|
|||
* `Address` ordering type class, sorts addresses by host and port.
|
||||
*/
|
||||
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
|
||||
// cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
|
||||
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
|
||||
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
|
||||
else false
|
||||
|
|
|
|||
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import language.postfixOps
|
||||
import scala.concurrent.duration._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Props
|
||||
import akka.cluster.ClusterEvent.InstantClusterState
|
||||
import akka.cluster.ClusterEvent.InstantMemberJoined
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||
import akka.testkit._
|
||||
|
||||
object InitialHeartbeatMultiJvmSpec extends MultiNodeConfig {
|
||||
val controller = role("controller")
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.failure-detector.heartbeat-request.grace-period = 3 s
|
||||
akka.cluster.failure-detector.threshold = 4""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class InitialHeartbeatMultiJvmNode1 extends InitialHeartbeatSpec
|
||||
class InitialHeartbeatMultiJvmNode2 extends InitialHeartbeatSpec
|
||||
class InitialHeartbeatMultiJvmNode3 extends InitialHeartbeatSpec
|
||||
|
||||
abstract class InitialHeartbeatSpec
|
||||
extends MultiNodeSpec(InitialHeartbeatMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import InitialHeartbeatMultiJvmSpec._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
"A member" must {
|
||||
|
||||
"detect failure even though no heartbeats have been received" taggedAs LongRunningTest in {
|
||||
val secondAddress = address(second)
|
||||
awaitClusterUp(first)
|
||||
|
||||
runOn(first) {
|
||||
val joinLatch = TestLatch()
|
||||
cluster.subscribe(system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case state: InstantClusterState ⇒
|
||||
if (state.members.exists(_.address == secondAddress))
|
||||
joinLatch.countDown()
|
||||
case InstantMemberJoined(m) ⇒
|
||||
if (m.address == secondAddress)
|
||||
joinLatch.countDown()
|
||||
}
|
||||
})), classOf[InstantMemberJoined])
|
||||
|
||||
within(10 seconds) {
|
||||
joinLatch.await
|
||||
}
|
||||
}
|
||||
runOn(second) {
|
||||
cluster.join(first)
|
||||
}
|
||||
enterBarrier("second-joined")
|
||||
|
||||
runOn(controller) {
|
||||
// it is likely that first has not started sending heartbeats to second yet
|
||||
// Direction must be Receive because the gossip from first to second must pass through
|
||||
testConductor.blackhole(first, second, Direction.Receive).await
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
within(15 seconds) {
|
||||
awaitCond(!cluster.failureDetector.isAvailable(first))
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -110,12 +110,12 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
|
|||
}
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(20 seconds) {
|
||||
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(30 seconds) {
|
||||
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0)
|
||||
enterBarrier("after-2")
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(20 seconds) {
|
||||
"be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(30 seconds) {
|
||||
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
|
||||
enterBarrier("after-3")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,8 +69,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
|
|||
".*Cluster Node.* - is starting up.*",
|
||||
".*Shutting down cluster Node.*",
|
||||
".*Cluster node successfully shut down.*",
|
||||
".*Using a dedicated scheduler for cluster.*",
|
||||
".*Phi value.* for connection.*") foreach { s ⇒
|
||||
".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
|
||||
sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -87,10 +87,10 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
|
|||
}
|
||||
|
||||
runOn(side1: _*) {
|
||||
awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds)
|
||||
awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 25 seconds)
|
||||
}
|
||||
runOn(side2: _*) {
|
||||
awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds)
|
||||
awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 25 seconds)
|
||||
}
|
||||
|
||||
enterBarrier("after-2")
|
||||
|
|
|
|||
|
|
@ -351,7 +351,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
nodes foreach { node ⇒
|
||||
val previous = phiByNode(node)
|
||||
val φ = fd.phi(node)
|
||||
if (φ > 0) {
|
||||
if (φ > 0 || fd.isMonitoring(node)) {
|
||||
val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0
|
||||
phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1,
|
||||
math.max(previous.max, φ))
|
||||
|
|
@ -861,7 +861,7 @@ abstract class StressSpec
|
|||
name = "master-" + myself.name)
|
||||
m ! Begin
|
||||
import system.dispatcher
|
||||
system.scheduler.scheduleOnce(highThroughputDuration) {
|
||||
system.scheduler.scheduleOnce(duration) {
|
||||
m.tell(End, testActor)
|
||||
}
|
||||
val workResult = awaitWorkResult
|
||||
|
|
@ -931,7 +931,7 @@ abstract class StressSpec
|
|||
|
||||
"A cluster under stress" must {
|
||||
|
||||
"join seed nodes" taggedAs LongRunningTest in {
|
||||
"join seed nodes" taggedAs LongRunningTest in within(20 seconds) {
|
||||
|
||||
val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
|
||||
val size = seedNodes.size + otherNodesJoiningSeedNodes.size
|
||||
|
|
|
|||
|
|
@ -28,12 +28,14 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
PeriodicTasksInitialDelay must be(1 seconds)
|
||||
GossipInterval must be(1 second)
|
||||
HeartbeatInterval must be(1 second)
|
||||
NumberOfEndHeartbeats must be(4)
|
||||
NumberOfEndHeartbeats must be(8)
|
||||
MonitoredByNrOfMembers must be(5)
|
||||
HeartbeatRequestDelay must be(10 seconds)
|
||||
HeartbeatExpectedResponseAfter must be(3 seconds)
|
||||
HeartbeatRequestTimeToLive must be(1 minute)
|
||||
LeaderActionsInterval must be(1 second)
|
||||
UnreachableNodesReaperInterval must be(1 second)
|
||||
PublishStatsInterval must be(10 second)
|
||||
JoinTimeout must be(60 seconds)
|
||||
AutoJoin must be(true)
|
||||
AutoDown must be(false)
|
||||
MinNrOfMembers must be(1)
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
import akka.actor.Address
|
||||
import akka.routing.ConsistentHash
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.immutable
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
|
||||
|
|
@ -20,8 +21,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
|
|||
val dd = Address("tcp.akka", "sys", "dd", 2552)
|
||||
val ee = Address("tcp.akka", "sys", "ee", 2552)
|
||||
|
||||
val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10),
|
||||
selfAddress.toString, 3)
|
||||
val emptyState = ClusterHeartbeatSenderState.empty(selfAddress, 3)
|
||||
|
||||
"A ClusterHeartbeatSenderState" must {
|
||||
|
||||
|
|
@ -29,47 +29,46 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
|
|||
emptyState.active.isEmpty must be(true)
|
||||
}
|
||||
|
||||
"include joinInProgress in active set" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds)
|
||||
s.joinInProgress.keySet must be(Set(aa))
|
||||
"include heartbeatRequest in active set" in {
|
||||
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds)
|
||||
s.heartbeatRequest.keySet must be(Set(aa))
|
||||
s.active must be(Set(aa))
|
||||
}
|
||||
|
||||
"remove joinInProgress from active set after removeOverdueJoinInProgress" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
|
||||
s.joinInProgress must be(Map.empty)
|
||||
"remove heartbeatRequest from active set after removeOverdueHeartbeatRequest" in {
|
||||
val s = emptyState.addHeartbeatRequest(aa, Deadline.now - 30.seconds).removeOverdueHeartbeatRequest()
|
||||
s.heartbeatRequest must be(Map.empty)
|
||||
s.active must be(Set.empty)
|
||||
s.ending must be(Map(aa -> 0))
|
||||
}
|
||||
|
||||
"remove joinInProgress after reset" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
|
||||
s.joinInProgress must be(Map.empty)
|
||||
"remove heartbeatRequest after reset" in {
|
||||
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
|
||||
s.heartbeatRequest must be(Map.empty)
|
||||
}
|
||||
|
||||
"remove joinInProgress after addMember" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa)
|
||||
s.joinInProgress must be(Map.empty)
|
||||
"remove heartbeatRequest after addMember" in {
|
||||
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).addMember(aa)
|
||||
s.heartbeatRequest must be(Map.empty)
|
||||
}
|
||||
|
||||
"remove joinInProgress after removeMember" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
|
||||
s.joinInProgress must be(Map.empty)
|
||||
"remove heartbeatRequest after removeMember" in {
|
||||
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
|
||||
s.heartbeatRequest must be(Map.empty)
|
||||
s.ending must be(Map(aa -> 0))
|
||||
}
|
||||
|
||||
"remove from ending after addJoinInProgress" in {
|
||||
"remove from ending after addHeartbeatRequest" in {
|
||||
val s = emptyState.reset(Set(aa, bb)).removeMember(aa)
|
||||
s.ending must be(Map(aa -> 0))
|
||||
val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds)
|
||||
s2.joinInProgress.keySet must be(Set(aa))
|
||||
val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds)
|
||||
s2.heartbeatRequest.keySet must be(Set(aa))
|
||||
s2.ending must be(Map.empty)
|
||||
}
|
||||
|
||||
"include nodes from reset in active set" in {
|
||||
val nodes = Set(aa, bb, cc)
|
||||
val s = emptyState.reset(nodes)
|
||||
s.all must be(nodes)
|
||||
s.current must be(nodes)
|
||||
s.ending must be(Map.empty)
|
||||
s.active must be(nodes)
|
||||
|
|
@ -78,7 +77,6 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
|
|||
"limit current nodes to monitoredByNrOfMembers when adding members" in {
|
||||
val nodes = Set(aa, bb, cc, dd)
|
||||
val s = nodes.foldLeft(emptyState) { _ addMember _ }
|
||||
s.all must be(nodes)
|
||||
s.current.size must be(3)
|
||||
s.addMember(ee).current.size must be(3)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte
|
|||
false
|
||||
}
|
||||
|
||||
override def isMonitoring(connection: Address): Boolean = connections.contains(connection)
|
||||
|
||||
def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection)
|
||||
|
||||
def remove(connection: Address): Unit = {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.actor.Address
|
||||
import akka.routing.ConsistentHash
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.immutable
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class HeartbeatNodeRingSpec extends WordSpec with MustMatchers {
|
||||
|
||||
val aa = Address("tcp.akka", "sys", "aa", 2552)
|
||||
val bb = Address("tcp.akka", "sys", "bb", 2552)
|
||||
val cc = Address("tcp.akka", "sys", "cc", 2552)
|
||||
val dd = Address("tcp.akka", "sys", "dd", 2552)
|
||||
val ee = Address("tcp.akka", "sys", "ee", 2552)
|
||||
|
||||
val nodes = Set(aa, bb, cc, dd, ee)
|
||||
|
||||
"A HashedNodeRing" must {
|
||||
|
||||
"pick specified number of nodes as receivers" in {
|
||||
val ring = HeartbeatNodeRing(cc, nodes, 3)
|
||||
ring.myReceivers must be(ring.receivers(cc))
|
||||
|
||||
nodes foreach { n ⇒
|
||||
val receivers = ring.receivers(n)
|
||||
receivers.size must be(3)
|
||||
receivers must not contain (n)
|
||||
}
|
||||
}
|
||||
|
||||
"pick all except own as receivers when less than total number of nodes" in {
|
||||
val expected = Set(aa, bb, dd, ee)
|
||||
HeartbeatNodeRing(cc, nodes, 4).myReceivers must be(expected)
|
||||
HeartbeatNodeRing(cc, nodes, 5).myReceivers must be(expected)
|
||||
HeartbeatNodeRing(cc, nodes, 6).myReceivers must be(expected)
|
||||
}
|
||||
|
||||
"have matching senders and receivers" in {
|
||||
val ring = HeartbeatNodeRing(cc, nodes, 3)
|
||||
ring.mySenders must be(ring.senders(cc))
|
||||
|
||||
for (sender ← nodes; receiver ← ring.receivers(sender)) {
|
||||
ring.senders(receiver) must contain(sender)
|
||||
}
|
||||
}
|
||||
|
||||
"pick none when alone" in {
|
||||
val ring = HeartbeatNodeRing(cc, Set(cc), 3)
|
||||
ring.myReceivers must be(Set())
|
||||
ring.mySenders must be(Set())
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -21,15 +21,16 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec
|
|||
private val resourceToFailureDetector = new AtomicReference[Map[A, FailureDetector]](Map())
|
||||
private final val failureDetectorCreationLock: Lock = new ReentrantLock
|
||||
|
||||
/**
|
||||
* Returns true if the resource is considered to be up and healthy and returns false otherwise. For unregistered
|
||||
* resources it returns true.
|
||||
*/
|
||||
final override def isAvailable(resource: A): Boolean = resourceToFailureDetector.get.get(resource) match {
|
||||
case Some(r) ⇒ r.isAvailable
|
||||
case _ ⇒ true
|
||||
}
|
||||
|
||||
final override def isMonitoring(resource: A): Boolean = resourceToFailureDetector.get.get(resource) match {
|
||||
case Some(r) ⇒ r.isMonitoring
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
final override def heartbeat(resource: A): Unit = {
|
||||
|
||||
resourceToFailureDetector.get.get(resource) match {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,12 @@ trait FailureDetector {
|
|||
*/
|
||||
def isAvailable: Boolean
|
||||
|
||||
/**
|
||||
* Returns true if the failure detector has received any heartbeats and started monitoring
|
||||
* of the resource.
|
||||
*/
|
||||
def isMonitoring: Boolean
|
||||
|
||||
/**
|
||||
* Notifies the FailureDetector that a heartbeat arrived from the monitored resource. This causes the FailureDetector
|
||||
* to update its state.
|
||||
|
|
|
|||
|
|
@ -15,9 +15,16 @@ trait FailureDetectorRegistry[A] {
|
|||
|
||||
/**
|
||||
* Returns true if the resource is considered to be up and healthy and returns false otherwise.
|
||||
* For unregistered resources it returns true.
|
||||
*/
|
||||
def isAvailable(resource: A): Boolean
|
||||
|
||||
/**
|
||||
* Returns true if the failure detector has received any heartbeats and started monitoring
|
||||
* of the resource.
|
||||
*/
|
||||
def isMonitoring(resource: A): Boolean
|
||||
|
||||
/**
|
||||
* Records a heartbeat for a resource. If the resource is not yet registered (i.e. this is the first heartbeat) then
|
||||
* it is automatially registered.
|
||||
|
|
|
|||
|
|
@ -78,6 +78,8 @@ class PhiAccrualFailureDetector(
|
|||
|
||||
override def isAvailable: Boolean = phi < threshold
|
||||
|
||||
override def isMonitoring: Boolean = state.get.timestamp.nonEmpty
|
||||
|
||||
@tailrec
|
||||
final override def heartbeat(): Unit = {
|
||||
|
||||
|
|
|
|||
|
|
@ -94,14 +94,16 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
fd.phi must be > (0.0)
|
||||
}
|
||||
|
||||
"mark node as available after a series of successful heartbeats" in {
|
||||
"mark node as monitored after a series of successful heartbeats" in {
|
||||
val timeInterval = List[Long](0, 1000, 100, 100)
|
||||
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
|
||||
fd.isMonitoring must be(false)
|
||||
|
||||
fd.heartbeat()
|
||||
fd.heartbeat()
|
||||
fd.heartbeat()
|
||||
|
||||
fd.isMonitoring must be(true)
|
||||
fd.isAvailable must be(true)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@ class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
|
||||
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
|
||||
val fd = createFailureDetectorRegistry(clock = fakeTimeGenerator(timeInterval))
|
||||
fd.isMonitoring("resource1") must be(false)
|
||||
|
||||
fd.heartbeat("resource1") //0
|
||||
|
||||
|
|
@ -119,9 +120,11 @@ class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
fd.heartbeat("resource1") //1100
|
||||
|
||||
fd.isAvailable("resource1") must be(true) //2200
|
||||
fd.isMonitoring("resource1") must be(true)
|
||||
|
||||
fd.remove("resource1")
|
||||
|
||||
fd.isMonitoring("resource1") must be(false)
|
||||
fd.isAvailable("resource1") must be(true) //3300
|
||||
|
||||
// it receives heartbeat from an explicitly removed node
|
||||
|
|
@ -130,6 +133,7 @@ class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
fd.heartbeat("resource1") //6600
|
||||
|
||||
fd.isAvailable("resource1") must be(true) //6700
|
||||
fd.isMonitoring("resource1") must be(true)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ object AkkaProtocolSpec {
|
|||
class TestFailureDetector extends FailureDetector {
|
||||
@volatile var isAvailable: Boolean = true
|
||||
|
||||
def isMonitoring: Boolean = called
|
||||
|
||||
@volatile var called: Boolean = false
|
||||
|
||||
def heartbeat(): Unit = called = true
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue