diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index f8d18a516b..4532c0ff8a 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -55,10 +55,6 @@ akka { # A value of 0 s can be used to always publish the stats, when it happens. publish-stats-interval = 10s - # A joining node stops sending heartbeats to the node to join if it hasn't - # become member of the cluster within this deadline. - join-timeout = 60s - # The id of the dispatcher to use for cluster actors. If not specified # default dispatcher is used. # If specified you need to define the settings of the actual dispatcher. @@ -109,7 +105,29 @@ akka { # network drop. acceptable-heartbeat-pause = 3s + # Number of samples to use for calculation of mean and standard deviation of + # inter-arrival times. max-sample-size = 1000 + + # When a node stops sending heartbeats to another node it will end that + # with this number of EndHeartbeat messages, which will remove the + # monitoring from the failure detector. + nr-of-end-heartbeats = 8 + + # When no expected heartbeat message has been received an explicit + # heartbeat request is sent to the node that should emit heartbeats. + heartbeat-request { + # Grace period until an explicit heartbeat request is sent + grace-period = 10 s + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat mesage has + # been received. + expected-response-after = 3 s + + # Cleanup of obsolete heartbeat requests + time-to-live = 60 s + } } metrics { diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 6e812ab6cc..92c876b22d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -109,18 +109,19 @@ class AccrualFailureDetector( private val state = new AtomicReference[State](State()) - /** - * Returns true if the connection is considered to be up and healthy - * and returns false otherwise. - */ - def isAvailable(connection: Address): Boolean = phi(connection) < threshold + override def isAvailable(connection: Address): Boolean = phi(connection) < threshold + + override def isMonitoring(connection: Address): Boolean = state.get.timestamps.get(connection).nonEmpty /** * Records a heartbeat for a connection. */ @tailrec final def heartbeat(connection: Address) { - log.debug("Heartbeat from connection [{}] ", connection) + if (isMonitoring(connection)) + log.debug("Heartbeat from connection [{}] ", connection) + else + log.info("First heartbeat from connection [{}] ", connection) val timestamp = clock() val oldState = state.get @@ -197,7 +198,9 @@ class AccrualFailureDetector( */ @tailrec final def remove(connection: Address): Unit = { - log.debug("Remove connection [{}] ", connection) + if (isMonitoring(connection)) + log.info("Remove heartbeat connection [{}] ", connection) + val oldState = state.get if (oldState.history.contains(connection)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 6da8af20e5..a3ee862eca 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -176,8 +176,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { /** * Subscribe to cluster domain events. * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] - * or subclass. A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] - * will also be sent to the subscriber. + * or subclass. + * + * A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] + * will be sent to the subscriber as the first event. When + * `to` Class is a [[akka.cluster.ClusterEvent.InstantMemberEvent]] + * (or subclass) the snapshot event will instead be a + * [[akka.cluster.ClusterEvent.InstantClusterState]]. */ def subscribe(subscriber: ActorRef, to: Class[_]): Unit = clusterCore ! InternalClusterAction.Subscribe(subscriber, to) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index f43c53926e..5271bd6cd5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -181,7 +181,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ - import ClusterHeartbeatSender.JoinInProgress val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler, failureDetector } @@ -281,7 +280,13 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * A 'Join(thisNodeAddress)' command is sent to the node to join. */ def join(address: Address): Unit = { - if (!latestGossip.members.exists(_.address == address)) { + if (address.protocol != selfAddress.protocol) + log.info("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", + selfAddress.protocol, address.protocol) + else if (address.system != selfAddress.system) + log.info("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", + selfAddress.system, address.system) + else if (!latestGossip.members.exists(_.address == address)) { // wipe our state since a node that joins a cluster must be empty latestGossip = Gossip.empty // wipe the failure detector since we are starting fresh and shouldn't care about the past @@ -290,7 +295,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto publisher ! PublishStart publish(latestGossip) - heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) context.become(initialized) if (address == selfAddress) @@ -331,7 +335,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens if (node != selfAddress) { - failureDetector heartbeat node gossipTo(node) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index a5d230019f..81ac04e64c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -6,13 +6,12 @@ package akka.cluster import language.postfixOps import scala.collection.immutable -import scala.annotation.tailrec import scala.concurrent.duration._ import java.net.URLEncoder import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props } import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } import akka.cluster.ClusterEvent._ -import akka.routing.ConsistentHash +import akka.routing.MurmurHash /** * INTERNAL API @@ -55,13 +54,26 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo */ private[cluster] object ClusterHeartbeatSender { /** - * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of - * another node and heartbeats should be sent unconditionally until it becomes - * member or deadline is overdue. This is done to be able to detect immediate death - * of the joining node. + * Request heartbeats from another node. Sent from the node that is + * expecting heartbeats from a specific sender, but has not received any. + */ + case class HeartbeatRequest(from: Address) extends ClusterMessage + + /** + * Delayed sending of a HeartbeatRequest. The actual request is + * only sent if no expected heartbeat message has been received. * Local only, no need to serialize. */ - case class JoinInProgress(address: Address, deadline: Deadline) + case class SendHeartbeatRequest(to: Address) + + /** + * Trigger a fake heartbeat message to trigger start of failure detection + * of a node that this node is expecting heartbeats from. HeartbeatRequest + * has been sent to the node so it should have started sending heartbeat + * messages. + * Local only, no need to serialize. + */ + case class ExpectedFirstHeartbeat(from: Address) } /* @@ -88,16 +100,16 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val selfHeartbeat = Heartbeat(selfAddress) val selfEndHeartbeat = EndHeartbeat(selfAddress) + val selfHeartbeatRequest = HeartbeatRequest(selfAddress) - var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor), - selfAddress.toString, MonitoredByNrOfMembers) + var state = ClusterHeartbeatSenderState.empty(selfAddress, MonitoredByNrOfMembers) // start periodic heartbeat to other nodes in cluster val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval, HeartbeatInterval, self, HeartbeatTick) override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[InstantMemberEvent]) cluster.subscribe(self, classOf[UnreachableMember]) } @@ -112,18 +124,27 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def clusterHeartbeatConnectionFor(address: Address): ActorRef = context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") + /** + * Looks up and returns the remote cluster heartbeat sender for the specific address. + */ + def heartbeatSenderFor(address: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(address)) + def receive = { - case HeartbeatTick ⇒ heartbeat() - case s: CurrentClusterState ⇒ reset(s) - case UnreachableMember(m) ⇒ removeMember(m) - case MemberDowned(m) ⇒ removeMember(m) - case MemberRemoved(m) ⇒ removeMember(m) - case e: MemberEvent ⇒ addMember(e.member) - case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) + case HeartbeatTick ⇒ heartbeat() + case InstantMemberUp(m) ⇒ addMember(m) + case UnreachableMember(m) ⇒ removeMember(m) + case InstantMemberDowned(m) ⇒ removeMember(m) + case InstantMemberRemoved(m) ⇒ removeMember(m) + case s: InstantClusterState ⇒ reset(s) + case _: CurrentClusterState ⇒ // enough with InstantClusterState + case _: InstantMemberEvent ⇒ // not interested in other types of InstantMemberEvent + case HeartbeatRequest(from) ⇒ addHeartbeatRequest(from) + case SendHeartbeatRequest(to) ⇒ sendHeartbeatRequest(to) + case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } - def reset(snapshot: CurrentClusterState): Unit = - state = state.reset(snapshot.members.collect { case m if m.address != selfAddress ⇒ m.address }) + def reset(snapshot: InstantClusterState): Unit = + state = state.reset(snapshot.members.map(_.address)) def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address @@ -131,11 +152,26 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address - def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress) - state = state.addJoinInProgress(address, deadline) + def addHeartbeatRequest(address: Address): Unit = if (address != selfAddress) + state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive) + + def sendHeartbeatRequest(address: Address): Unit = + if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) { + heartbeatSenderFor(address) ! selfHeartbeatRequest + // schedule the expected heartbeat for later, which will give the + // sender a chance to start heartbeating, and also trigger some resends of + // the heartbeat request + scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(address)) + } + + def triggerFirstHeartbeat(address: Address): Unit = + if (!cluster.failureDetector.isMonitoring(address)) { + log.info("Trigger extra expected heartbeat from [{}]", address) + cluster.failureDetector.heartbeat(address) + } def heartbeat(): Unit = { - state = state.removeOverdueJoinInProgress() + state = state.removeOverdueHeartbeatRequest() def connection(to: Address): ActorRef = { // URL encoded target address as child actor name @@ -161,6 +197,13 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } else state = state.increaseEndingCount(to) } + + // request heartbeats from expected sender node if no heartbeat messages has been received + state.ring.mySenders foreach { address ⇒ + if (!cluster.failureDetector.isMonitoring(address)) + scheduler.scheduleOnce(HeartbeatRequestDelay, self, SendHeartbeatRequest(address)) + } + } } @@ -172,9 +215,8 @@ private[cluster] object ClusterHeartbeatSenderState { /** * Initial, empty state */ - def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String, - monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers) + def empty(selfAddress: Address, monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(HeartbeatNodeRing(selfAddress, Set(selfAddress), monitoredByNrOfMembers)) /** * Create a new state based on previous state, and @@ -182,33 +224,13 @@ private[cluster] object ClusterHeartbeatSenderState { */ private def apply( old: ClusterHeartbeatSenderState, - consistentHash: ConsistentHash[Address], - all: Set[Address]): ClusterHeartbeatSenderState = { + ring: HeartbeatNodeRing): ClusterHeartbeatSenderState = { - /** - * Select a few peers that heartbeats will be sent to, i.e. that will - * monitor this node. Try to send heartbeats to same nodes as much - * as possible, but re-balance with consistent hashing algorithm when - * new members are added or removed. - */ - def selectPeers: Set[Address] = { - val allSize = all.size - val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers) - // try more if consistentHash results in same node as already selected - val attemptLimit = nrOfPeers * 2 - @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { - if (acc.size == nrOfPeers || n == attemptLimit) acc - else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1) - } - if (nrOfPeers >= allSize) all - else select(Set.empty[Address], 0) - } - - val curr = selectPeers + val curr = ring.myReceivers // start ending process for nodes not selected any more // abort ending process for nodes that have been selected again val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr - old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end) + old.copy(ring = ring, current = curr, ending = end, heartbeatRequest = old.heartbeatRequest -- curr) } } @@ -223,13 +245,10 @@ private[cluster] object ClusterHeartbeatSenderState { * i.e. the methods return new instances. */ private[cluster] case class ClusterHeartbeatSenderState private ( - consistentHash: ConsistentHash[Address], - selfAddressStr: String, - monitoredByNrOfMembers: Int, - all: Set[Address] = Set.empty, + ring: HeartbeatNodeRing, current: Set[Address] = Set.empty, ending: Map[Address, Int] = Map.empty, - joinInProgress: Map[Address, Deadline] = Map.empty) { + heartbeatRequest: Map[Address, Deadline] = Map.empty) { // FIXME can be disabled as optimization assertInvariants @@ -237,50 +256,53 @@ private[cluster] case class ClusterHeartbeatSenderState private ( private def assertInvariants: Unit = { val currentAndEnding = current.intersect(ending.keySet) require(currentAndEnding.isEmpty, - "Same nodes in current and ending not allowed, got [%s]" format currentAndEnding) - val joinInProgressAndAll = joinInProgress.keySet.intersect(all) - require(joinInProgressAndAll.isEmpty, - "Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll) - val currentNotInAll = current -- all - require(currentNotInAll.isEmpty, - "Nodes in current but not in all not allowed, got [%s]" format currentNotInAll) - require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]" - format all) + s"Same nodes in current and ending not allowed, got [${currentAndEnding}]") + + val currentAndHeartbeatRequest = current.intersect(heartbeatRequest.keySet) + require(currentAndHeartbeatRequest.isEmpty, + s"Same nodes in current and heartbeatRequest not allowed, got [${currentAndHeartbeatRequest}]") + + val currentNotInAll = current -- ring.nodes + require(current.isEmpty || currentNotInAll.isEmpty, + s"Nodes in current but not in ring nodes not allowed, got [${currentNotInAll}]") + + require(!current.contains(ring.selfAddress), + s"Self in current not allowed, got [${ring.selfAddress}]") + require(!heartbeatRequest.contains(ring.selfAddress), + s"Self in heartbeatRequest not allowed, got [${ring.selfAddress}]") } - val active: Set[Address] = current ++ joinInProgress.keySet + val active: Set[Address] = current ++ heartbeatRequest.keySet - def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ }, - consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), - all = nodes) + def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = { + ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeHeartbeatRequest _ }, ring.copy(nodes = nodes + ring.selfAddress)) + } def addMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a) + ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :+ a) def removeMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a) + ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :- a) - private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { - if (joinInProgress contains address) - copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0)) + private def removeHeartbeatRequest(address: Address): ClusterHeartbeatSenderState = { + if (heartbeatRequest contains address) + copy(heartbeatRequest = heartbeatRequest - address, ending = ending + (address -> 0)) else this } - def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { - if (all contains address) this - else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address) + def addHeartbeatRequest(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { + if (current.contains(address)) this + else copy(heartbeatRequest = heartbeatRequest + (address -> deadline), ending = ending - address) } /** - * Cleanup overdue joinInProgress, in case a joining node never - * became member, for some reason. + * Cleanup overdue heartbeatRequest */ - def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = { - val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } + def removeOverdueHeartbeatRequest(): ClusterHeartbeatSenderState = { + val overdue = heartbeatRequest collect { case (address, deadline) if deadline.isOverdue ⇒ address } if (overdue.isEmpty) this else - copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue) + copy(ending = ending ++ overdue.map(_ -> 0), heartbeatRequest = heartbeatRequest -- overdue) } def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a) @@ -330,7 +352,7 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) CircuitBreaker(context.system.scheduler, cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). - onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onOpen(log.info("CircuitBreaker Open for [{}]", toRef)). onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) } @@ -344,9 +366,81 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) toRef ! heartbeatMsg } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } } - if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + if (deadline.isOverdue) log.info("Sending heartbeat to [{}] took longer than expected", toRef) case SendEndHeartbeat(endHeartbeatMsg, _) ⇒ log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef) toRef ! endHeartbeatMsg } } + +/** + * INTERNAL API + * + * Data structure for picking heartbeat receivers and keep track of what nodes + * that are expected to send heartbeat messages to a node. The node ring is + * shuffled by deterministic hashing to avoid picking physically co-located + * neighbors. + * + * It is immutable, i.e. the methods return new instances. + */ +private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[Address], monitoredByNrOfMembers: Int) { + + require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]") + + private val nodeRing: immutable.SortedSet[Address] = { + implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + val ha = hashFor(a) + val hb = hashFor(b) + ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0) + } + + immutable.SortedSet() ++ nodes + } + + private def hashFor(node: Address): Int = node match { + // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same + case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"${host}:${port}") + case _ ⇒ 0 + } + + /** + * Receivers for `selfAddress`. Cached for subsequent access. + */ + lazy val myReceivers: immutable.Set[Address] = receivers(selfAddress) + /** + * Senders for `selfAddress`. Cached for subsequent access. + */ + lazy val mySenders: immutable.Set[Address] = senders(selfAddress) + + private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1) + + /** + * The receivers to use from a specified sender. + */ + def receivers(sender: Address): immutable.Set[Address] = + if (useAllAsReceivers) + nodeRing - sender + else { + val slice = nodeRing.from(sender).tail.take(monitoredByNrOfMembers) + if (slice.size < monitoredByNrOfMembers) + (slice ++ nodeRing.take(monitoredByNrOfMembers - slice.size)) + else slice + } + + /** + * The expected senders for a specific receiver. + */ + def senders(receiver: Address): Set[Address] = + nodes filter { sender ⇒ receivers(sender) contains receiver } + + /** + * Add a node to the ring. + */ + def :+(node: Address): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node) + + /** + * Remove a node from the ring. + */ + def :-(node: Address): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this + +} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 8f08180b1c..bd093c9f74 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -76,7 +76,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto MetricsInterval, self, MetricsTick) override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[InstantMemberEvent]) cluster.subscribe(self, classOf[UnreachableMember]) log.info("Metrics collection has started successfully on node [{}]", selfAddress) } @@ -84,11 +84,15 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto def receive = { case GossipTick ⇒ gossip() case MetricsTick ⇒ collect() - case state: CurrentClusterState ⇒ receiveState(state) - case MemberUp(m) ⇒ addMember(m) - case e: MemberEvent ⇒ removeMember(e.member) - case UnreachableMember(m) ⇒ removeMember(m) case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) + case state: InstantClusterState ⇒ receiveState(state) + case state: CurrentClusterState ⇒ // enough with InstantClusterState + case InstantMemberUp(m) ⇒ addMember(m) + case InstantMemberDowned(m) ⇒ removeMember(m) + case InstantMemberRemoved(m) ⇒ removeMember(m) + case UnreachableMember(m) ⇒ removeMember(m) + case _: InstantMemberEvent ⇒ // not interested in other types of InstantMemberEvent + } override def postStop: Unit = { @@ -115,7 +119,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto /** * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]]. */ - def receiveState(state: CurrentClusterState): Unit = + def receiveState(state: InstantClusterState): Unit = nodes = state.members collect { case m if m.status == Up ⇒ m.address } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 0477bbd002..7ccf7e48a8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -39,8 +39,22 @@ class ClusterSettings(val config: Config, val systemName: String) { val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d } - final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration - final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt + final val HeartbeatRequestDelay: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.grace-period"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0"); d + } + final val HeartbeatExpectedResponseAfter: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.expected-response-after"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0"); d + } + final val HeartbeatRequestTimeToLive: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-request.time-to-live"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0"); d + } + final val NumberOfEndHeartbeats: Int = { + val n = getInt("akka.cluster.failure-detector.nr-of-end-heartbeats") + require(n > 0, "failure-detector.nr-of-end-heartbeats must be > 0"); n + } final val MonitoredByNrOfMembers: Int = { val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n @@ -61,7 +75,6 @@ class ClusterSettings(val config: Config, val systemName: String) { require(n > 0, "min-nr-of-members must be > 0"); n } final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled") - final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId case id ⇒ id diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 9ef77c6834..dbb17ac80a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -16,6 +16,12 @@ trait FailureDetector { */ def isAvailable(connection: Address): Boolean + /** + * Returns true if the failure detector has received any heartbeats and started monitoring + * of the resource. + */ + def isMonitoring(connection: Address): Boolean + /** * Records a heartbeat for a connection. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 1b4cf54dfc..141018834e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -34,6 +34,7 @@ object Member { * `Address` ordering type class, sorts addresses by host and port. */ implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0) else false diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala new file mode 100644 index 0000000000..4e6bae48d5 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.Props +import akka.cluster.ClusterEvent.InstantClusterState +import akka.cluster.ClusterEvent.InstantMemberJoined +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ + +object InitialHeartbeatMultiJvmSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.failure-detector.heartbeat-request.grace-period = 3 s + akka.cluster.failure-detector.threshold = 4""")). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) +} + +class InitialHeartbeatMultiJvmNode1 extends InitialHeartbeatSpec +class InitialHeartbeatMultiJvmNode2 extends InitialHeartbeatSpec +class InitialHeartbeatMultiJvmNode3 extends InitialHeartbeatSpec + +abstract class InitialHeartbeatSpec + extends MultiNodeSpec(InitialHeartbeatMultiJvmSpec) + with MultiNodeClusterSpec { + + import InitialHeartbeatMultiJvmSpec._ + + muteMarkingAsUnreachable() + + "A member" must { + + "detect failure even though no heartbeats have been received" taggedAs LongRunningTest in { + val secondAddress = address(second) + awaitClusterUp(first) + + runOn(first) { + val joinLatch = TestLatch() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case state: InstantClusterState ⇒ + if (state.members.exists(_.address == secondAddress)) + joinLatch.countDown() + case InstantMemberJoined(m) ⇒ + if (m.address == secondAddress) + joinLatch.countDown() + } + })), classOf[InstantMemberJoined]) + + within(10 seconds) { + joinLatch.await + } + } + runOn(second) { + cluster.join(first) + } + enterBarrier("second-joined") + + runOn(controller) { + // it is likely that first has not started sending heartbeats to second yet + // Direction must be Receive because the gossip from first to second must pass through + testConductor.blackhole(first, second, Direction.Receive).await + } + + runOn(second) { + within(15 seconds) { + awaitCond(!cluster.failureDetector.isAvailable(first)) + } + } + + enterBarrier("after-1") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 655f86ab3b..39adca3ac5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -110,12 +110,12 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig } } - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(20 seconds) { + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(30 seconds) { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) enterBarrier("after-2") } - "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(20 seconds) { + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(30 seconds) { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) enterBarrier("after-3") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 08f6d96937..fe8deb887d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -69,8 +69,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS ".*Cluster Node.* - is starting up.*", ".*Shutting down cluster Node.*", ".*Cluster node successfully shut down.*", - ".*Using a dedicated scheduler for cluster.*", - ".*Phi value.* for connection.*") foreach { s ⇒ + ".*Using a dedicated scheduler for cluster.*") foreach { s ⇒ sys.eventStream.publish(Mute(EventFilter.info(pattern = s))) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index a8ca35684d..d1c48565d5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -87,10 +87,10 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) } runOn(side1: _*) { - awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds) + awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 25 seconds) } runOn(side2: _*) { - awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds) + awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 25 seconds) } enterBarrier("after-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index bceb723031..f765e764a3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -351,7 +351,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { nodes foreach { node ⇒ val previous = phiByNode(node) val φ = fd.phi(node) - if (φ > 0) { + if (φ > 0 || fd.isMonitoring(node)) { val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0 phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1, math.max(previous.max, φ)) @@ -861,7 +861,7 @@ abstract class StressSpec name = "master-" + myself.name) m ! Begin import system.dispatcher - system.scheduler.scheduleOnce(highThroughputDuration) { + system.scheduler.scheduleOnce(duration) { m.tell(End, testActor) } val workResult = awaitWorkResult @@ -931,7 +931,7 @@ abstract class StressSpec "A cluster under stress" must { - "join seed nodes" taggedAs LongRunningTest in { + "join seed nodes" taggedAs LongRunningTest in within(20 seconds) { val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) val size = seedNodes.size + otherNodesJoiningSeedNodes.size diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index ec2b113a85..60c83399be 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -28,12 +28,14 @@ class ClusterConfigSpec extends AkkaSpec { PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) - NumberOfEndHeartbeats must be(4) + NumberOfEndHeartbeats must be(8) MonitoredByNrOfMembers must be(5) + HeartbeatRequestDelay must be(10 seconds) + HeartbeatExpectedResponseAfter must be(3 seconds) + HeartbeatRequestTimeToLive must be(1 minute) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) PublishStatsInterval must be(10 second) - JoinTimeout must be(60 seconds) AutoJoin must be(true) AutoDown must be(false) MinNrOfMembers must be(1) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index 82ad7c2cf2..c3176208c5 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -9,6 +9,7 @@ import org.scalatest.matchers.MustMatchers import akka.actor.Address import akka.routing.ConsistentHash import scala.concurrent.duration._ +import scala.collection.immutable @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { @@ -20,8 +21,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { val dd = Address("tcp.akka", "sys", "dd", 2552) val ee = Address("tcp.akka", "sys", "ee", 2552) - val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10), - selfAddress.toString, 3) + val emptyState = ClusterHeartbeatSenderState.empty(selfAddress, 3) "A ClusterHeartbeatSenderState" must { @@ -29,47 +29,46 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { emptyState.active.isEmpty must be(true) } - "include joinInProgress in active set" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds) - s.joinInProgress.keySet must be(Set(aa)) + "include heartbeatRequest in active set" in { + val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds) + s.heartbeatRequest.keySet must be(Set(aa)) s.active must be(Set(aa)) } - "remove joinInProgress from active set after removeOverdueJoinInProgress" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress() - s.joinInProgress must be(Map.empty) + "remove heartbeatRequest from active set after removeOverdueHeartbeatRequest" in { + val s = emptyState.addHeartbeatRequest(aa, Deadline.now - 30.seconds).removeOverdueHeartbeatRequest() + s.heartbeatRequest must be(Map.empty) s.active must be(Set.empty) s.ending must be(Map(aa -> 0)) } - "remove joinInProgress after reset" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)) - s.joinInProgress must be(Map.empty) + "remove heartbeatRequest after reset" in { + val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)) + s.heartbeatRequest must be(Map.empty) } - "remove joinInProgress after addMember" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa) - s.joinInProgress must be(Map.empty) + "remove heartbeatRequest after addMember" in { + val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).addMember(aa) + s.heartbeatRequest must be(Map.empty) } - "remove joinInProgress after removeMember" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa) - s.joinInProgress must be(Map.empty) + "remove heartbeatRequest after removeMember" in { + val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa) + s.heartbeatRequest must be(Map.empty) s.ending must be(Map(aa -> 0)) } - "remove from ending after addJoinInProgress" in { + "remove from ending after addHeartbeatRequest" in { val s = emptyState.reset(Set(aa, bb)).removeMember(aa) s.ending must be(Map(aa -> 0)) - val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds) - s2.joinInProgress.keySet must be(Set(aa)) + val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds) + s2.heartbeatRequest.keySet must be(Set(aa)) s2.ending must be(Map.empty) } "include nodes from reset in active set" in { val nodes = Set(aa, bb, cc) val s = emptyState.reset(nodes) - s.all must be(nodes) s.current must be(nodes) s.ending must be(Map.empty) s.active must be(nodes) @@ -78,7 +77,6 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { "limit current nodes to monitoredByNrOfMembers when adding members" in { val nodes = Set(aa, bb, cc, dd) val s = nodes.foldLeft(emptyState) { _ addMember _ } - s.all must be(nodes) s.current.size must be(3) s.addMember(ee).current.size must be(3) } diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index 58fbd220fc..e212da2f74 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -50,6 +50,8 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte false } + override def isMonitoring(connection: Address): Boolean = connections.contains(connection) + def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) def remove(connection: Address): Unit = { diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala new file mode 100644 index 0000000000..8f47cc6613 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import akka.routing.ConsistentHash +import scala.concurrent.duration._ +import scala.collection.immutable + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class HeartbeatNodeRingSpec extends WordSpec with MustMatchers { + + val aa = Address("tcp.akka", "sys", "aa", 2552) + val bb = Address("tcp.akka", "sys", "bb", 2552) + val cc = Address("tcp.akka", "sys", "cc", 2552) + val dd = Address("tcp.akka", "sys", "dd", 2552) + val ee = Address("tcp.akka", "sys", "ee", 2552) + + val nodes = Set(aa, bb, cc, dd, ee) + + "A HashedNodeRing" must { + + "pick specified number of nodes as receivers" in { + val ring = HeartbeatNodeRing(cc, nodes, 3) + ring.myReceivers must be(ring.receivers(cc)) + + nodes foreach { n ⇒ + val receivers = ring.receivers(n) + receivers.size must be(3) + receivers must not contain (n) + } + } + + "pick all except own as receivers when less than total number of nodes" in { + val expected = Set(aa, bb, dd, ee) + HeartbeatNodeRing(cc, nodes, 4).myReceivers must be(expected) + HeartbeatNodeRing(cc, nodes, 5).myReceivers must be(expected) + HeartbeatNodeRing(cc, nodes, 6).myReceivers must be(expected) + } + + "have matching senders and receivers" in { + val ring = HeartbeatNodeRing(cc, nodes, 3) + ring.mySenders must be(ring.senders(cc)) + + for (sender ← nodes; receiver ← ring.receivers(sender)) { + ring.senders(receiver) must contain(sender) + } + } + + "pick none when alone" in { + val ring = HeartbeatNodeRing(cc, Set(cc), 3) + ring.myReceivers must be(Set()) + ring.mySenders must be(Set()) + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala index c98daa7ed2..7053e77603 100644 --- a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala @@ -21,15 +21,16 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec private val resourceToFailureDetector = new AtomicReference[Map[A, FailureDetector]](Map()) private final val failureDetectorCreationLock: Lock = new ReentrantLock - /** - * Returns true if the resource is considered to be up and healthy and returns false otherwise. For unregistered - * resources it returns true. - */ final override def isAvailable(resource: A): Boolean = resourceToFailureDetector.get.get(resource) match { case Some(r) ⇒ r.isAvailable case _ ⇒ true } + final override def isMonitoring(resource: A): Boolean = resourceToFailureDetector.get.get(resource) match { + case Some(r) ⇒ r.isMonitoring + case _ ⇒ false + } + final override def heartbeat(resource: A): Unit = { resourceToFailureDetector.get.get(resource) match { diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala index 5c7b7ac9c0..fc2cd61acc 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -16,6 +16,12 @@ trait FailureDetector { */ def isAvailable: Boolean + /** + * Returns true if the failure detector has received any heartbeats and started monitoring + * of the resource. + */ + def isMonitoring: Boolean + /** * Notifies the FailureDetector that a heartbeat arrived from the monitored resource. This causes the FailureDetector * to update its state. diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala index 2339a6409a..31fa03bfe6 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala @@ -15,9 +15,16 @@ trait FailureDetectorRegistry[A] { /** * Returns true if the resource is considered to be up and healthy and returns false otherwise. + * For unregistered resources it returns true. */ def isAvailable(resource: A): Boolean + /** + * Returns true if the failure detector has received any heartbeats and started monitoring + * of the resource. + */ + def isMonitoring(resource: A): Boolean + /** * Records a heartbeat for a resource. If the resource is not yet registered (i.e. this is the first heartbeat) then * it is automatially registered. diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index 194e3b57be..a4117ebeaa 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -78,6 +78,8 @@ class PhiAccrualFailureDetector( override def isAvailable: Boolean = phi < threshold + override def isMonitoring: Boolean = state.get.timestamp.nonEmpty + @tailrec final override def heartbeat(): Unit = { diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 5783897148..9b50be79e7 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -94,14 +94,16 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { fd.phi must be > (0.0) } - "mark node as available after a series of successful heartbeats" in { + "mark node as monitored after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) + fd.isMonitoring must be(false) fd.heartbeat() fd.heartbeat() fd.heartbeat() + fd.isMonitoring must be(true) fd.isAvailable must be(true) } diff --git a/akka-remote/src/test/scala/akka/remote/FailureDetectorRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/FailureDetectorRegistrySpec.scala index 871d37accf..7c9e9f6bab 100644 --- a/akka-remote/src/test/scala/akka/remote/FailureDetectorRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/FailureDetectorRegistrySpec.scala @@ -112,6 +112,7 @@ class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") { "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) val fd = createFailureDetectorRegistry(clock = fakeTimeGenerator(timeInterval)) + fd.isMonitoring("resource1") must be(false) fd.heartbeat("resource1") //0 @@ -119,9 +120,11 @@ class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") { fd.heartbeat("resource1") //1100 fd.isAvailable("resource1") must be(true) //2200 + fd.isMonitoring("resource1") must be(true) fd.remove("resource1") + fd.isMonitoring("resource1") must be(false) fd.isAvailable("resource1") must be(true) //3300 // it receives heartbeat from an explicitly removed node @@ -130,6 +133,7 @@ class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") { fd.heartbeat("resource1") //6600 fd.isAvailable("resource1") must be(true) //6700 + fd.isMonitoring("resource1") must be(true) } } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 592a491693..5b61c1e875 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -19,6 +19,8 @@ object AkkaProtocolSpec { class TestFailureDetector extends FailureDetector { @volatile var isAvailable: Boolean = true + def isMonitoring: Boolean = called + @volatile var called: Boolean = false def heartbeat(): Unit = called = true