2012-07-05 13:55:08 +02:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
2012-07-05 13:55:08 +02:00
|
|
|
*/
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
2015-01-19 10:10:30 +01:00
|
|
|
import scala.annotation.tailrec
|
2012-11-15 12:33:11 +01:00
|
|
|
import scala.collection.immutable
|
2017-07-07 13:17:41 +02:00
|
|
|
import akka.actor.{ Actor, ActorLogging, ActorPath, ActorRef, ActorSelection, Address, DeadLetterSuppression, RootActorPath }
|
2012-10-01 14:12:20 +02:00
|
|
|
import akka.cluster.ClusterEvent._
|
2013-11-07 13:52:08 +01:00
|
|
|
import akka.remote.FailureDetectorRegistry
|
2016-09-19 11:17:41 +02:00
|
|
|
import akka.remote.HeartbeatMessage
|
2017-07-07 13:17:41 +02:00
|
|
|
import akka.annotation.InternalApi
|
2012-07-05 13:55:08 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API.
|
|
|
|
|
*
|
2013-11-07 13:52:08 +01:00
|
|
|
* Receives Heartbeat messages and replies.
|
2012-07-05 13:55:08 +02:00
|
|
|
*/
|
2017-07-07 13:17:41 +02:00
|
|
|
@InternalApi
|
2012-10-01 10:02:48 +02:00
|
|
|
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
|
2013-11-07 13:52:08 +01:00
|
|
|
import ClusterHeartbeatSender._
|
2012-09-06 21:48:40 +02:00
|
|
|
|
2015-08-27 08:58:52 +02:00
|
|
|
// Important - don't use Cluster(context.system) in constructor because that would
|
|
|
|
|
// cause deadlock. See startup sequence in ClusterDaemon.
|
|
|
|
|
lazy val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress)
|
2012-07-05 13:55:08 +02:00
|
|
|
|
|
|
|
|
def receive = {
|
2014-01-16 15:16:35 +01:00
|
|
|
case Heartbeat(from) ⇒ sender() ! selfHeartbeatRsp
|
2012-07-05 13:55:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-07 13:17:41 +02:00
|
|
|
/** INTERNAL API: Utilities to obtain ClusterHeartbeatReceiver paths */
|
|
|
|
|
@InternalApi
|
|
|
|
|
private[cluster] object ClusterHeartbeatReceiver {
|
|
|
|
|
|
|
|
|
|
def name: String = "heartbeatReceiver"
|
|
|
|
|
def path(address: Address): ActorPath =
|
|
|
|
|
RootActorPath(address) / "system" / "cluster" / name
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-05 13:55:08 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[cluster] object ClusterHeartbeatSender {
|
|
|
|
|
/**
|
2013-11-07 13:52:08 +01:00
|
|
|
* Sent at regular intervals for failure detection.
|
2013-01-15 09:35:07 +01:00
|
|
|
*/
|
2016-09-19 11:17:41 +02:00
|
|
|
final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
2013-01-15 09:35:07 +01:00
|
|
|
|
|
|
|
|
/**
|
2013-11-07 13:52:08 +01:00
|
|
|
* Sent as reply to [[Heartbeat]] messages.
|
2013-01-15 09:35:07 +01:00
|
|
|
*/
|
2016-09-19 11:17:41 +02:00
|
|
|
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
2013-11-07 13:52:08 +01:00
|
|
|
|
|
|
|
|
// sent to self only
|
|
|
|
|
case object HeartbeatTick
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class ExpectedFirstHeartbeat(from: UniqueAddress)
|
2013-01-15 09:35:07 +01:00
|
|
|
|
2012-07-05 13:55:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*
|
|
|
|
|
* This actor is responsible for sending the heartbeat messages to
|
2013-11-07 13:52:08 +01:00
|
|
|
* a few other nodes, which will reply and then this actor updates the
|
|
|
|
|
* failure detector.
|
2012-07-05 13:55:08 +02:00
|
|
|
*/
|
2012-09-06 21:48:40 +02:00
|
|
|
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
|
2012-07-05 13:55:08 +02:00
|
|
|
import ClusterHeartbeatSender._
|
2012-10-01 10:02:48 +02:00
|
|
|
|
|
|
|
|
val cluster = Cluster(context.system)
|
2016-01-08 01:19:16 +01:00
|
|
|
val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
|
2013-11-07 13:52:08 +01:00
|
|
|
import cluster.{ selfAddress, selfUniqueAddress, scheduler }
|
2012-10-01 10:02:48 +02:00
|
|
|
import cluster.settings._
|
|
|
|
|
import context.dispatcher
|
|
|
|
|
|
2017-07-07 13:17:41 +02:00
|
|
|
val filterInternalClusterMembers: Member ⇒ Boolean =
|
|
|
|
|
_.dataCenter == cluster.selfDataCenter
|
2013-11-07 13:52:08 +01:00
|
|
|
|
2012-10-01 10:02:48 +02:00
|
|
|
val selfHeartbeat = Heartbeat(selfAddress)
|
|
|
|
|
|
2017-07-07 13:17:41 +02:00
|
|
|
val failureDetector = cluster.failureDetector
|
|
|
|
|
|
|
|
|
|
var state: ClusterHeartbeatSenderState = ClusterHeartbeatSenderState(
|
2015-01-19 10:10:30 +01:00
|
|
|
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, MonitoredByNrOfMembers),
|
|
|
|
|
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
|
2013-11-07 13:52:08 +01:00
|
|
|
failureDetector)
|
2012-10-01 10:02:48 +02:00
|
|
|
|
|
|
|
|
// start periodic heartbeat to other nodes in cluster
|
2016-06-02 14:06:57 +02:00
|
|
|
val heartbeatTask = scheduler.schedule(
|
|
|
|
|
PeriodicTasksInitialDelay max HeartbeatInterval,
|
2012-10-09 18:11:36 +02:00
|
|
|
HeartbeatInterval, self, HeartbeatTick)
|
2012-10-01 10:02:48 +02:00
|
|
|
|
2012-11-27 18:07:37 +01:00
|
|
|
override def preStart(): Unit = {
|
2015-01-19 10:10:30 +01:00
|
|
|
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
|
2012-11-27 18:07:37 +01:00
|
|
|
}
|
2012-10-01 10:02:48 +02:00
|
|
|
|
|
|
|
|
override def postStop(): Unit = {
|
2013-11-07 13:52:08 +01:00
|
|
|
state.activeReceivers.foreach(a ⇒ failureDetector.remove(a.address))
|
2012-10-01 10:02:48 +02:00
|
|
|
heartbeatTask.cancel()
|
|
|
|
|
cluster.unsubscribe(self)
|
|
|
|
|
}
|
2012-07-05 13:55:08 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Looks up and returns the remote cluster heartbeat connection for the specific address.
|
|
|
|
|
*/
|
2013-03-26 18:17:50 +01:00
|
|
|
def heartbeatReceiver(address: Address): ActorSelection =
|
2017-07-07 13:17:41 +02:00
|
|
|
context.actorSelection(ClusterHeartbeatReceiver.path(address))
|
2012-07-05 13:55:08 +02:00
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def receive = initializing
|
2013-01-15 09:35:07 +01:00
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def initializing: Actor.Receive = {
|
|
|
|
|
case s: CurrentClusterState ⇒
|
|
|
|
|
init(s)
|
|
|
|
|
context.become(active)
|
|
|
|
|
case HeartbeatTick ⇒
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def active: Actor.Receive = {
|
2013-01-15 09:35:07 +01:00
|
|
|
case HeartbeatTick ⇒ heartbeat()
|
2013-11-07 13:52:08 +01:00
|
|
|
case HeartbeatRsp(from) ⇒ heartbeatRsp(from)
|
2013-05-23 11:09:32 +02:00
|
|
|
case MemberRemoved(m, _) ⇒ removeMember(m)
|
2015-12-26 11:30:18 +01:00
|
|
|
case evt: MemberEvent ⇒ addMember(evt.member)
|
2015-01-19 10:10:30 +01:00
|
|
|
case UnreachableMember(m) ⇒ unreachableMember(m)
|
|
|
|
|
case ReachableMember(m) ⇒ reachableMember(m)
|
2013-01-15 09:35:07 +01:00
|
|
|
case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
|
2012-07-05 13:55:08 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def init(snapshot: CurrentClusterState): Unit = {
|
2017-07-07 13:17:41 +02:00
|
|
|
val nodes = snapshot.members.collect { case m if filterInternalClusterMembers(m) ⇒ m.uniqueAddress }
|
|
|
|
|
val unreachable = snapshot.unreachable.collect { case m if filterInternalClusterMembers(m) ⇒ m.uniqueAddress }
|
2015-01-19 10:10:30 +01:00
|
|
|
state = state.init(nodes, unreachable)
|
2013-05-09 09:49:59 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def addMember(m: Member): Unit =
|
2017-07-07 13:17:41 +02:00
|
|
|
if (m.uniqueAddress != selfUniqueAddress && // is not self
|
|
|
|
|
!state.contains(m.uniqueAddress) && // not already added
|
|
|
|
|
filterInternalClusterMembers(m) // should be watching members from this DC (internal / external)
|
|
|
|
|
) {
|
2013-11-07 13:52:08 +01:00
|
|
|
state = state.addMember(m.uniqueAddress)
|
2017-07-07 13:17:41 +02:00
|
|
|
}
|
2013-11-07 13:52:08 +01:00
|
|
|
|
|
|
|
|
def removeMember(m: Member): Unit =
|
2017-07-07 13:17:41 +02:00
|
|
|
if (filterInternalClusterMembers(m)) { // we only ever deal with internal cluster members here
|
|
|
|
|
if (m.uniqueAddress == cluster.selfUniqueAddress) {
|
|
|
|
|
// This cluster node will be shutdown, but stop this actor immediately
|
|
|
|
|
// to avoid further updates
|
|
|
|
|
context stop self
|
|
|
|
|
} else {
|
|
|
|
|
state = state.removeMember(m.uniqueAddress)
|
|
|
|
|
}
|
2013-01-15 09:35:07 +01:00
|
|
|
}
|
2012-10-01 10:02:48 +02:00
|
|
|
|
2015-01-19 10:10:30 +01:00
|
|
|
def unreachableMember(m: Member): Unit =
|
|
|
|
|
state = state.unreachableMember(m.uniqueAddress)
|
|
|
|
|
|
|
|
|
|
def reachableMember(m: Member): Unit =
|
|
|
|
|
state = state.reachableMember(m.uniqueAddress)
|
|
|
|
|
|
2012-10-01 10:02:48 +02:00
|
|
|
def heartbeat(): Unit = {
|
2013-11-07 13:52:08 +01:00
|
|
|
state.activeReceivers foreach { to ⇒
|
2017-07-07 13:17:41 +02:00
|
|
|
if (failureDetector.isMonitoring(to.address)) {
|
2016-01-08 01:19:16 +01:00
|
|
|
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address)
|
|
|
|
|
} else {
|
|
|
|
|
if (verboseHeartbeat) log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address)
|
2013-11-07 13:52:08 +01:00
|
|
|
// schedule the expected first heartbeat for later, which will give the
|
|
|
|
|
// other side a chance to reply, and also trigger some resends if needed
|
|
|
|
|
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to))
|
|
|
|
|
}
|
|
|
|
|
heartbeatReceiver(to.address) ! selfHeartbeat
|
2013-01-15 09:35:07 +01:00
|
|
|
}
|
2012-07-05 13:55:08 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def heartbeatRsp(from: UniqueAddress): Unit = {
|
2016-01-08 01:19:16 +01:00
|
|
|
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address)
|
2013-11-07 13:52:08 +01:00
|
|
|
state = state.heartbeatRsp(from)
|
2013-08-27 15:14:53 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
|
|
|
|
|
if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) {
|
2016-01-08 01:19:16 +01:00
|
|
|
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address)
|
2013-11-07 13:52:08 +01:00
|
|
|
failureDetector.heartbeat(from.address)
|
|
|
|
|
}
|
2012-10-01 10:02:48 +02:00
|
|
|
|
2012-10-10 15:23:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
2013-11-07 13:52:08 +01:00
|
|
|
* State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing.
|
|
|
|
|
* It is immutable, but it updates the failureDetector.
|
2012-10-10 15:23:18 +02:00
|
|
|
*/
|
2017-07-07 13:17:41 +02:00
|
|
|
@InternalApi
|
2014-03-07 13:20:01 +01:00
|
|
|
private[cluster] final case class ClusterHeartbeatSenderState(
|
2016-06-02 14:06:57 +02:00
|
|
|
ring: HeartbeatNodeRing,
|
2015-01-19 10:10:30 +01:00
|
|
|
oldReceiversNowUnreachable: Set[UniqueAddress],
|
2016-06-02 14:06:57 +02:00
|
|
|
failureDetector: FailureDetectorRegistry[Address]) {
|
2013-01-15 09:35:07 +01:00
|
|
|
|
2015-10-30 14:59:36 +01:00
|
|
|
val activeReceivers: Set[UniqueAddress] = ring.myReceivers union oldReceiversNowUnreachable
|
2012-10-10 15:23:18 +02:00
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def selfAddress = ring.selfAddress
|
2012-10-10 15:23:18 +02:00
|
|
|
|
2015-01-19 10:10:30 +01:00
|
|
|
def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState =
|
|
|
|
|
copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable))
|
2012-10-10 15:23:18 +02:00
|
|
|
|
2015-12-26 11:30:18 +01:00
|
|
|
def contains(node: UniqueAddress): Boolean = ring.nodes(node)
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def addMember(node: UniqueAddress): ClusterHeartbeatSenderState =
|
|
|
|
|
membershipChange(ring :+ node)
|
2012-10-10 15:23:18 +02:00
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def removeMember(node: UniqueAddress): ClusterHeartbeatSenderState = {
|
|
|
|
|
val newState = membershipChange(ring :- node)
|
2012-10-10 15:23:18 +02:00
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
failureDetector remove node.address
|
2015-01-19 10:10:30 +01:00
|
|
|
if (newState.oldReceiversNowUnreachable(node))
|
|
|
|
|
newState.copy(oldReceiversNowUnreachable = newState.oldReceiversNowUnreachable - node)
|
2013-11-07 13:52:08 +01:00
|
|
|
else
|
|
|
|
|
newState
|
2012-10-01 14:12:20 +02:00
|
|
|
}
|
|
|
|
|
|
2015-01-19 10:10:30 +01:00
|
|
|
def unreachableMember(node: UniqueAddress): ClusterHeartbeatSenderState =
|
|
|
|
|
membershipChange(ring.copy(unreachable = ring.unreachable + node))
|
|
|
|
|
|
|
|
|
|
def reachableMember(node: UniqueAddress): ClusterHeartbeatSenderState =
|
|
|
|
|
membershipChange(ring.copy(unreachable = ring.unreachable - node))
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
|
|
|
|
|
val oldReceivers = ring.myReceivers
|
2015-10-30 14:59:36 +01:00
|
|
|
val removedReceivers = oldReceivers diff newRing.myReceivers
|
2015-01-19 10:10:30 +01:00
|
|
|
var adjustedOldReceiversNowUnreachable = oldReceiversNowUnreachable
|
2013-11-07 13:52:08 +01:00
|
|
|
removedReceivers foreach { a ⇒
|
|
|
|
|
if (failureDetector.isAvailable(a.address))
|
|
|
|
|
failureDetector remove a.address
|
|
|
|
|
else
|
2015-01-19 10:10:30 +01:00
|
|
|
adjustedOldReceiversNowUnreachable += a
|
2013-11-07 13:52:08 +01:00
|
|
|
}
|
2015-01-19 10:10:30 +01:00
|
|
|
copy(newRing, adjustedOldReceiversNowUnreachable)
|
2012-07-05 13:55:08 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState =
|
|
|
|
|
if (activeReceivers(from)) {
|
|
|
|
|
failureDetector heartbeat from.address
|
2015-01-19 10:10:30 +01:00
|
|
|
if (oldReceiversNowUnreachable(from)) {
|
2013-11-07 13:52:08 +01:00
|
|
|
// back from unreachable, ok to stop heartbeating to it
|
|
|
|
|
if (!ring.myReceivers(from))
|
|
|
|
|
failureDetector remove from.address
|
2015-01-19 10:10:30 +01:00
|
|
|
copy(oldReceiversNowUnreachable = oldReceiversNowUnreachable - from)
|
2013-11-07 13:52:08 +01:00
|
|
|
} else this
|
|
|
|
|
} else this
|
2012-10-10 15:23:18 +02:00
|
|
|
|
2012-07-05 13:55:08 +02:00
|
|
|
}
|
|
|
|
|
|
2013-01-15 09:35:07 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*
|
2013-11-07 13:52:08 +01:00
|
|
|
* Data structure for picking heartbeat receivers. The node ring is
|
2013-01-15 09:35:07 +01:00
|
|
|
* shuffled by deterministic hashing to avoid picking physically co-located
|
|
|
|
|
* neighbors.
|
|
|
|
|
*
|
|
|
|
|
* It is immutable, i.e. the methods return new instances.
|
|
|
|
|
*/
|
2015-01-19 10:10:30 +01:00
|
|
|
private[cluster] final case class HeartbeatNodeRing(
|
2016-06-02 14:06:57 +02:00
|
|
|
selfAddress: UniqueAddress,
|
|
|
|
|
nodes: Set[UniqueAddress],
|
|
|
|
|
unreachable: Set[UniqueAddress],
|
2015-01-19 10:10:30 +01:00
|
|
|
monitoredByNrOfMembers: Int) {
|
2013-01-15 09:35:07 +01:00
|
|
|
|
|
|
|
|
require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]")
|
|
|
|
|
|
2013-11-07 13:52:08 +01:00
|
|
|
private val nodeRing: immutable.SortedSet[UniqueAddress] = {
|
|
|
|
|
implicit val ringOrdering: Ordering[UniqueAddress] = Ordering.fromLessThan[UniqueAddress] { (a, b) ⇒
|
2013-09-11 14:46:08 +02:00
|
|
|
val ha = a.##
|
|
|
|
|
val hb = b.##
|
2013-11-07 13:52:08 +01:00
|
|
|
ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0)
|
2013-01-15 09:35:07 +01:00
|
|
|
}
|
|
|
|
|
|
2015-10-30 14:59:36 +01:00
|
|
|
immutable.SortedSet() union nodes
|
2013-01-15 09:35:07 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Receivers for `selfAddress`. Cached for subsequent access.
|
|
|
|
|
*/
|
2017-07-07 13:17:41 +02:00
|
|
|
lazy val myReceivers: Set[UniqueAddress] = receivers(selfAddress)
|
2013-01-15 09:35:07 +01:00
|
|
|
|
|
|
|
|
private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The receivers to use from a specified sender.
|
|
|
|
|
*/
|
2015-01-19 10:10:30 +01:00
|
|
|
def receivers(sender: UniqueAddress): Set[UniqueAddress] =
|
2013-01-15 09:35:07 +01:00
|
|
|
if (useAllAsReceivers)
|
|
|
|
|
nodeRing - sender
|
|
|
|
|
else {
|
2015-01-19 10:10:30 +01:00
|
|
|
|
|
|
|
|
// Pick nodes from the iterator until n nodes that are not unreachable have been selected.
|
|
|
|
|
// Intermediate unreachable nodes up to `monitoredByNrOfMembers` are also included in the result.
|
|
|
|
|
// The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must
|
|
|
|
|
// be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown
|
|
|
|
|
// at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes.
|
|
|
|
|
// This was reported in issue #16624.
|
|
|
|
|
@tailrec def take(n: Int, iter: Iterator[UniqueAddress], acc: Set[UniqueAddress]): (Int, Set[UniqueAddress]) =
|
|
|
|
|
if (iter.isEmpty || n == 0) (n, acc)
|
|
|
|
|
else {
|
|
|
|
|
val next = iter.next()
|
|
|
|
|
val isUnreachable = unreachable(next)
|
|
|
|
|
if (isUnreachable && acc.size >= monitoredByNrOfMembers)
|
|
|
|
|
take(n, iter, acc) // skip the unreachable, since we have already picked `monitoredByNrOfMembers`
|
|
|
|
|
else if (isUnreachable)
|
|
|
|
|
take(n, iter, acc + next) // include the unreachable, but don't count it
|
|
|
|
|
else
|
|
|
|
|
take(n - 1, iter, acc + next) // include the reachable
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val (remaining, slice1) = take(monitoredByNrOfMembers, nodeRing.from(sender).tail.iterator, Set.empty)
|
|
|
|
|
val slice =
|
|
|
|
|
if (remaining == 0)
|
|
|
|
|
slice1
|
|
|
|
|
else {
|
|
|
|
|
// wrap around
|
|
|
|
|
val (_, slice2) = take(remaining, nodeRing.to(sender).iterator.filterNot(_ == sender), slice1)
|
|
|
|
|
slice2
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
slice
|
2013-01-15 09:35:07 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Add a node to the ring.
|
|
|
|
|
*/
|
2013-11-07 13:52:08 +01:00
|
|
|
def :+(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node)
|
2013-01-15 09:35:07 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove a node from the ring.
|
|
|
|
|
*/
|
2015-01-19 10:10:30 +01:00
|
|
|
def :-(node: UniqueAddress): HeartbeatNodeRing =
|
|
|
|
|
if (nodes.contains(node) || unreachable.contains(node))
|
|
|
|
|
copy(nodes = nodes - node, unreachable = unreachable - node)
|
|
|
|
|
else this
|
2013-01-15 09:35:07 +01:00
|
|
|
|
|
|
|
|
}
|