diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index eacccafb2a..684acbbcef 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -65,18 +65,8 @@ message Welcome { */ /** - * EndHeartbeat - * Sends an Address - */ - - /** - * EndHeartbeatAck - * Sends an Address - */ - -/** - * HeartbeatRequest - * Sends an Address + * HeartbeatRsp + * Sends an UniqueAddress */ /**************************************** diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 8dfcb415ea..4ca09954fb 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -138,21 +138,12 @@ akka { # Number of member nodes that each member will send heartbeat messages to, # i.e. each node will be monitored by this number of other nodes. monitored-by-nr-of-members = 5 + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat mesage has + # been received. + expected-response-after = 5 s - # When no expected heartbeat message has been received an explicit - # heartbeat request is sent to the node that should emit heartbeats. - heartbeat-request { - # Grace period until an explicit heartbeat request is sent - grace-period = 10 s - - # After the heartbeat request has been sent the first failure detection - # will start after this period, even though no heartbeat mesage has - # been received. - expected-response-after = 5 s - - # Cleanup of obsolete heartbeat requests - time-to-live = 60 s - } } metrics { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 4f6e4673bc..cd160019f0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -113,8 +113,6 @@ private[cluster] object InternalClusterAction { case object GossipSpeedupTick extends Tick - case object HeartbeatTick extends Tick - case object ReapUnreachableTick extends Tick case object MetricsTick extends Tick diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b2260075da..f24050b754 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -4,54 +4,25 @@ package akka.cluster import language.postfixOps - import scala.collection.immutable import scala.concurrent.duration._ import akka.actor.{ ActorLogging, ActorRef, ActorSelection, Address, Actor, RootActorPath, Props } import akka.cluster.ClusterEvent._ import akka.routing.MurmurHash - -/** - * INTERNAL API - */ -private[akka] object ClusterHeartbeatReceiver { - /** - * Sent at regular intervals for failure detection. - */ - case class Heartbeat(from: Address) extends ClusterMessage - - /** - * Tell failure detector at receiving side that it should - * remove the monitoring, because heartbeats will end from - * this node. - */ - case class EndHeartbeat(from: Address) extends ClusterMessage - - /** - * Acknowledgment that `EndHeartbeat` was received and heartbeating - * can stop. - */ - case class EndHeartbeatAck(from: Address) extends ClusterMessage -} +import akka.remote.FailureDetectorRegistry /** * INTERNAL API. * - * Receives Heartbeat messages and updates failure detector. - * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized - * to Cluster message after message, but concurrent with other types of messages. + * Receives Heartbeat messages and replies. */ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { - import ClusterHeartbeatReceiver._ + import ClusterHeartbeatSender._ - val failureDetector = Cluster(context.system).failureDetector - val selfEndHeartbeatAck = EndHeartbeatAck(Cluster(context.system).selfAddress) + val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress) def receive = { - case Heartbeat(from) ⇒ failureDetector heartbeat from - case EndHeartbeat(from) ⇒ - failureDetector remove from - sender ! selfEndHeartbeatAck + case Heartbeat(from) ⇒ sender ! selfHeartbeatRsp } } @@ -61,50 +32,46 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo */ private[cluster] object ClusterHeartbeatSender { /** - * Request heartbeats from another node. Sent from the node that is - * expecting heartbeats from a specific sender, but has not received any. + * Sent at regular intervals for failure detection. */ - case class HeartbeatRequest(from: Address) extends ClusterMessage + case class Heartbeat(from: Address) extends ClusterMessage /** - * Delayed sending of a HeartbeatRequest. The actual request is - * only sent if no expected heartbeat message has been received. - * Local only, no need to serialize. + * Sent as reply to [[Heartbeat]] messages. */ - case class SendHeartbeatRequest(to: Address) + case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage + + // sent to self only + case object HeartbeatTick + case class ExpectedFirstHeartbeat(from: UniqueAddress) - /** - * Trigger a fake heartbeat message to trigger start of failure detection - * of a node that this node is expecting heartbeats from. HeartbeatRequest - * has been sent to the node so it should have started sending heartbeat - * messages. - * Local only, no need to serialize. - */ - case class ExpectedFirstHeartbeat(from: Address) } /* * INTERNAL API * * This actor is responsible for sending the heartbeat messages to - * a few other nodes that will monitor this node. + * a few other nodes, which will reply and then this actor updates the + * failure detector. */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ - import ClusterHeartbeatReceiver._ - import InternalClusterAction.HeartbeatTick val cluster = Cluster(context.system) - import cluster.{ selfAddress, scheduler } + import cluster.{ selfAddress, selfUniqueAddress, scheduler } import cluster.settings._ import cluster.InfoLogger._ import context.dispatcher - val selfHeartbeat = Heartbeat(selfAddress) - val selfEndHeartbeat = EndHeartbeat(selfAddress) - val selfHeartbeatRequest = HeartbeatRequest(selfAddress) + // the failureDetector is only updated by this actor, but read from other places + val failureDetector = Cluster(context.system).failureDetector - var state = ClusterHeartbeatSenderState.empty(selfAddress, MonitoredByNrOfMembers) + val selfHeartbeat = Heartbeat(selfAddress) + + var state = ClusterHeartbeatSenderState( + ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), MonitoredByNrOfMembers), + unreachable = Set.empty[UniqueAddress], + failureDetector) // start periodic heartbeat to other nodes in cluster val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval, @@ -115,6 +82,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } override def postStop(): Unit = { + state.activeReceivers.foreach(a ⇒ failureDetector.remove(a.address)) heartbeatTask.cancel() cluster.unsubscribe(self) } @@ -125,215 +93,146 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def heartbeatReceiver(address: Address): ActorSelection = context.actorSelection(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") - /** - * Looks up and returns the remote cluster heartbeat sender for the specific address. - */ - def heartbeatSender(address: Address): ActorSelection = - context.actorSelection(self.path.toStringWithAddress(address)) + def receive = initializing - def receive = { + def initializing: Actor.Receive = { + case s: CurrentClusterState ⇒ + init(s) + context.become(active) + case HeartbeatTick ⇒ + } + + def active: Actor.Receive = { case HeartbeatTick ⇒ heartbeat() + case HeartbeatRsp(from) ⇒ heartbeatRsp(from) case MemberUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) - case s: CurrentClusterState ⇒ reset(s) - case MemberExited(m) ⇒ memberExited(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent - case HeartbeatRequest(from) ⇒ addHeartbeatRequest(from) - case SendHeartbeatRequest(to) ⇒ sendHeartbeatRequest(to) case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) - case EndHeartbeatAck(from) ⇒ ackEndHeartbeat(from) } - def reset(snapshot: CurrentClusterState): Unit = - state = state.reset(snapshot.members.collect { - case m if m.status == MemberStatus.Up ⇒ m.address - }(collection.breakOut)) - - def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address - - def removeMember(m: Member): Unit = { - if (m.uniqueAddress == cluster.selfUniqueAddress) - // This cluster node will be shutdown, but stop this actor immediately - // to prevent it from sending out anything more and avoid sending EndHeartbeat. - context stop self - else - state = state removeMember m.address + def init(snapshot: CurrentClusterState): Unit = { + val nodes: Set[UniqueAddress] = snapshot.members.collect { + case m if m.status == MemberStatus.Up ⇒ m.uniqueAddress + }(collection.breakOut) + state = state.init(nodes) } - def memberExited(m: Member): Unit = + def addMember(m: Member): Unit = + if (m.uniqueAddress != selfUniqueAddress) + state = state.addMember(m.uniqueAddress) + + def removeMember(m: Member): Unit = if (m.uniqueAddress == cluster.selfUniqueAddress) { // This cluster node will be shutdown, but stop this actor immediately - // to prevent it from sending out anything more and avoid sending EndHeartbeat. + // to avoid further updates context stop self - } - - def addHeartbeatRequest(address: Address): Unit = - if (address != selfAddress) state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive) - - def sendHeartbeatRequest(address: Address): Unit = - if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) { - heartbeatSender(address) ! selfHeartbeatRequest - // schedule the expected heartbeat for later, which will give the - // sender a chance to start heartbeating, and also trigger some resends of - // the heartbeat request - scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(address)) - } - - def triggerFirstHeartbeat(address: Address): Unit = - if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) { - logInfo("Trigger extra expected heartbeat from [{}]", address) - cluster.failureDetector.heartbeat(address) + } else { + state = state.removeMember(m.uniqueAddress) } def heartbeat(): Unit = { - state = state.removeOverdueHeartbeatRequest() - - state.active foreach { to ⇒ - log.debug("Cluster Node [{}] - Heartbeat to [{}]", cluster.selfAddress, to) - heartbeatReceiver(to) ! selfHeartbeat - } - - // When sending heartbeats to a node is stopped a `EndHeartbeat` messages are - // sent to notify it that no more heartbeats will be sent. This will continue - // until `EndHeartbeatAck` is received. - for (to ← state.ending) { - log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", cluster.selfAddress, to) - heartbeatReceiver(to) ! selfEndHeartbeat - } - - // request heartbeats from expected sender node if no heartbeat messages has been received - state.ring.mySenders foreach { address ⇒ - if (!cluster.failureDetector.isMonitoring(address)) - scheduler.scheduleOnce(HeartbeatRequestDelay, self, SendHeartbeatRequest(address)) + state.activeReceivers foreach { to ⇒ + if (cluster.failureDetector.isMonitoring(to.address)) + log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address) + else { + log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address) + // schedule the expected first heartbeat for later, which will give the + // other side a chance to reply, and also trigger some resends if needed + scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to)) + } + heartbeatReceiver(to.address) ! selfHeartbeat } } - def ackEndHeartbeat(from: Address): Unit = { - state = state.removeEnding(from) + def heartbeatRsp(from: UniqueAddress): Unit = { + log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address) + state = state.heartbeatRsp(from) } + def triggerFirstHeartbeat(from: UniqueAddress): Unit = + if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) { + log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address) + failureDetector.heartbeat(from.address) + } + } /** * INTERNAL API + * State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing. + * It is immutable, but it updates the failureDetector. */ -private[cluster] object ClusterHeartbeatSenderState { - /** - * Initial, empty state - */ - def empty(selfAddress: Address, monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(HeartbeatNodeRing(selfAddress, Set(selfAddress), monitoredByNrOfMembers)) - - /** - * Create a new state based on previous state, and - * keep track of which nodes to stop sending heartbeats to. - */ - private def apply( - old: ClusterHeartbeatSenderState, - ring: HeartbeatNodeRing): ClusterHeartbeatSenderState = { - - val curr = ring.myReceivers - // start ending process for nodes not selected any more - // abort ending process for nodes that have been selected again - val end = old.ending ++ (old.current -- curr) -- curr - old.copy(ring = ring, current = curr, ending = end, heartbeatRequest = old.heartbeatRequest -- curr) - } - -} - -/** - * INTERNAL API - * - * State used by [akka.cluster.ClusterHeartbeatSender]. - * The initial state is created with `empty` in the of - * the companion object, thereafter the state is modified - * with the methods, such as `addMember`. It is immutable, - * i.e. the methods return new instances. - */ -private[cluster] case class ClusterHeartbeatSenderState private ( +private[cluster] case class ClusterHeartbeatSenderState( ring: HeartbeatNodeRing, - current: Set[Address] = Set.empty, - ending: Set[Address] = Set.empty, - heartbeatRequest: Map[Address, Deadline] = Map.empty) { + unreachable: Set[UniqueAddress], + failureDetector: FailureDetectorRegistry[Address]) { - if (Cluster.isAssertInvariantsEnabled) assertInvariants() + val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ unreachable - private def assertInvariants(): Unit = { - val currentAndEnding = current.intersect(ending) - require(currentAndEnding.isEmpty, - s"Same nodes in current and ending not allowed, got [${currentAndEnding}]") + def selfAddress = ring.selfAddress - val currentAndHeartbeatRequest = current.intersect(heartbeatRequest.keySet) - require(currentAndHeartbeatRequest.isEmpty, - s"Same nodes in current and heartbeatRequest not allowed, got [${currentAndHeartbeatRequest}]") + def init(nodes: Set[UniqueAddress]): ClusterHeartbeatSenderState = + copy(ring = ring.copy(nodes = nodes + selfAddress)) - val currentNotInAll = current -- ring.nodes - require(current.isEmpty || currentNotInAll.isEmpty, - s"Nodes in current but not in ring nodes not allowed, got [${currentNotInAll}]") + def addMember(node: UniqueAddress): ClusterHeartbeatSenderState = + membershipChange(ring :+ node) - require(!current.contains(ring.selfAddress), - s"Self in current not allowed, got [${ring.selfAddress}]") - require(!heartbeatRequest.contains(ring.selfAddress), - s"Self in heartbeatRequest not allowed, got [${ring.selfAddress}]") - } + def removeMember(node: UniqueAddress): ClusterHeartbeatSenderState = { + val newState = membershipChange(ring :- node) - val active: Set[Address] = current ++ heartbeatRequest.keySet - - def reset(nodes: immutable.HashSet[Address]): ClusterHeartbeatSenderState = { - ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeHeartbeatRequest _ }, ring.copy(nodes = nodes + ring.selfAddress)) - } - - def addMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :+ a) - - def removeMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :- a) - - private def removeHeartbeatRequest(address: Address): ClusterHeartbeatSenderState = { - if (heartbeatRequest contains address) - copy(heartbeatRequest = heartbeatRequest - address, ending = ending + address) - else this - } - - def addHeartbeatRequest(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { - if (current.contains(address)) this - else copy(heartbeatRequest = heartbeatRequest + (address -> deadline), ending = ending - address) - } - - /** - * Cleanup overdue heartbeatRequest - */ - def removeOverdueHeartbeatRequest(): ClusterHeartbeatSenderState = { - val overdue = heartbeatRequest collect { case (address, deadline) if deadline.isOverdue ⇒ address } - if (overdue.isEmpty) this + failureDetector remove node.address + if (newState.unreachable(node)) + newState.copy(unreachable = newState.unreachable - node) else - copy(ending = ending ++ overdue, heartbeatRequest = heartbeatRequest -- overdue) + newState } - def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a) + private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = { + val oldReceivers = ring.myReceivers + val removedReceivers = oldReceivers -- newRing.myReceivers + var newUnreachable = unreachable + removedReceivers foreach { a ⇒ + if (failureDetector.isAvailable(a.address)) + failureDetector remove a.address + else + newUnreachable += a + } + copy(newRing, newUnreachable) + } + + def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState = + if (activeReceivers(from)) { + failureDetector heartbeat from.address + if (unreachable(from)) { + // back from unreachable, ok to stop heartbeating to it + if (!ring.myReceivers(from)) + failureDetector remove from.address + copy(unreachable = unreachable - from) + } else this + } else this } /** * INTERNAL API * - * Data structure for picking heartbeat receivers and keep track of what nodes - * that are expected to send heartbeat messages to a node. The node ring is + * Data structure for picking heartbeat receivers. The node ring is * shuffled by deterministic hashing to avoid picking physically co-located * neighbors. * * It is immutable, i.e. the methods return new instances. */ -private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[Address], monitoredByNrOfMembers: Int) { +private[cluster] case class HeartbeatNodeRing(selfAddress: UniqueAddress, nodes: Set[UniqueAddress], monitoredByNrOfMembers: Int) { require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]") - private val nodeRing: immutable.SortedSet[Address] = { - implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + private val nodeRing: immutable.SortedSet[UniqueAddress] = { + implicit val ringOrdering: Ordering[UniqueAddress] = Ordering.fromLessThan[UniqueAddress] { (a, b) ⇒ val ha = a.## val hb = b.## - ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0) + ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0) } immutable.SortedSet() ++ nodes @@ -342,18 +241,14 @@ private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[A /** * Receivers for `selfAddress`. Cached for subsequent access. */ - lazy val myReceivers: immutable.Set[Address] = receivers(selfAddress) - /** - * Senders for `selfAddress`. Cached for subsequent access. - */ - lazy val mySenders: immutable.Set[Address] = senders(selfAddress) + lazy val myReceivers: immutable.Set[UniqueAddress] = receivers(selfAddress) private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1) /** * The receivers to use from a specified sender. */ - def receivers(sender: Address): immutable.Set[Address] = + def receivers(sender: UniqueAddress): immutable.Set[UniqueAddress] = if (useAllAsReceivers) nodeRing - sender else { @@ -363,20 +258,14 @@ private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[A else slice } - /** - * The expected senders for a specific receiver. - */ - def senders(receiver: Address): Set[Address] = - nodes filter { sender ⇒ receivers(sender) contains receiver } - /** * Add a node to the ring. */ - def :+(node: Address): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node) + def :+(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node) /** * Remove a node from the ring. */ - def :-(node: Address): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this + def :-(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index dd62cf745f..b3ec175e75 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -25,15 +25,9 @@ final class ClusterSettings(val config: Config, val systemName: String) { val HeartbeatInterval: FiniteDuration = { Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0") - val HeartbeatRequestDelay: FiniteDuration = { - Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS) - } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0") val HeartbeatExpectedResponseAfter: FiniteDuration = { - Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS) - } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0") - val HeartbeatRequestTimeToLive: FiniteDuration = { - Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS) - } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0") + Duration(FailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS) + } requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0") val MonitoredByNrOfMembers: Int = { FailureDetectorConfig.getInt("monitored-by-nr-of-members") } requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0") diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 8b478dfee5..0fdb3bfd61 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -47,10 +47,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ InternalClusterAction.InitJoin.getClass -> (_ ⇒ InternalClusterAction.InitJoin), classOf[InternalClusterAction.InitJoinAck] -> (bytes ⇒ InternalClusterAction.InitJoinAck(addressFromBinary(bytes))), classOf[InternalClusterAction.InitJoinNack] -> (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), - classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))), - classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))), - classOf[ClusterHeartbeatReceiver.EndHeartbeatAck] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeatAck(addressFromBinary(bytes))), - classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))), + classOf[ClusterHeartbeatSender.Heartbeat] -> (bytes ⇒ ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))), + classOf[ClusterHeartbeatSender.HeartbeatRsp] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))), classOf[GossipStatus] -> gossipStatusFromBinary, classOf[GossipEnvelope] -> gossipEnvelopeFromBinary, classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary) @@ -60,20 +58,18 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def identifier = 5 def toBinary(obj: AnyRef): Array[Byte] = obj match { - case ClusterHeartbeatReceiver.Heartbeat(from) ⇒ addressToProtoByteArray(from) - case m: GossipEnvelope ⇒ gossipEnvelopeToProto(m).toByteArray - case m: GossipStatus ⇒ gossipStatusToProto(m).toByteArray - case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) - case InternalClusterAction.Join(node, roles) ⇒ joinToProto(node, roles).toByteArray - case InternalClusterAction.Welcome(from, gossip) ⇒ compress(welcomeToProto(from, gossip)) - case ClusterUserAction.Leave(address) ⇒ addressToProtoByteArray(address) - case ClusterUserAction.Down(address) ⇒ addressToProtoByteArray(address) - case InternalClusterAction.InitJoin ⇒ cm.Empty.getDefaultInstance.toByteArray - case InternalClusterAction.InitJoinAck(address) ⇒ addressToProtoByteArray(address) - case InternalClusterAction.InitJoinNack(address) ⇒ addressToProtoByteArray(address) - case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ addressToProtoByteArray(from) - case ClusterHeartbeatReceiver.EndHeartbeatAck(from) ⇒ addressToProtoByteArray(from) - case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ addressToProtoByteArray(from) + case ClusterHeartbeatSender.Heartbeat(from) ⇒ addressToProtoByteArray(from) + case ClusterHeartbeatSender.HeartbeatRsp(from) ⇒ uniqueAddressToProtoByteArray(from) + case m: GossipEnvelope ⇒ gossipEnvelopeToProto(m).toByteArray + case m: GossipStatus ⇒ gossipStatusToProto(m).toByteArray + case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) + case InternalClusterAction.Join(node, roles) ⇒ joinToProto(node, roles).toByteArray + case InternalClusterAction.Welcome(from, gossip) ⇒ compress(welcomeToProto(from, gossip)) + case ClusterUserAction.Leave(address) ⇒ addressToProtoByteArray(address) + case ClusterUserAction.Down(address) ⇒ addressToProtoByteArray(address) + case InternalClusterAction.InitJoin ⇒ cm.Empty.getDefaultInstance.toByteArray + case InternalClusterAction.InitJoinAck(address) ⇒ addressToProtoByteArray(address) + case InternalClusterAction.InitJoinNack(address) ⇒ addressToProtoByteArray(address) case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") } @@ -127,6 +123,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder = cm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid) + private def uniqueAddressToProtoByteArray(uniqueAddress: UniqueAddress): Array[Byte] = + uniqueAddressToProto(uniqueAddress).build.toByteArray + // we don't care about races here since it's just a cache @volatile private var protocolCache: String = null diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala index 111f643d76..fa7379e18d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala @@ -21,7 +21,6 @@ object InitialHeartbeatMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" - akka.cluster.failure-detector.heartbeat-request.grace-period = 3 s akka.cluster.failure-detector.threshold = 4""")). withFallback(MultiNodeClusterSpec.clusterConfig)) @@ -43,26 +42,35 @@ abstract class InitialHeartbeatSpec "A member" must { "detect failure even though no heartbeats have been received" taggedAs LongRunningTest in { + val firstAddress = address(first) val secondAddress = address(second) awaitClusterUp(first) runOn(first) { within(10 seconds) { - awaitAssert { + awaitAssert({ cluster.sendCurrentClusterState(testActor) expectMsgType[CurrentClusterState].members.map(_.address) must contain(secondAddress) - } + }, interval = 50.millis) } } runOn(second) { cluster.join(first) + within(10 seconds) { + awaitAssert({ + cluster.sendCurrentClusterState(testActor) + expectMsgType[CurrentClusterState].members.map(_.address) must contain(firstAddress) + }, interval = 50.millis) + } } enterBarrier("second-joined") runOn(controller) { - // it is likely that first has not started sending heartbeats to second yet - // Direction must be Receive because the gossip from first to second must pass through - testConductor.blackhole(first, second, Direction.Receive).await + // It is likely that second has not started heartbeating to first yet, + // and when it does the messages doesn't go through and the first extra heartbeat is triggered. + // If the first heartbeat arrives, it will detect the failure anyway but not really exercise the + // part that we are trying to test here. + testConductor.blackhole(first, second, Direction.Both).await } runOn(second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index 914db4ee78..504fd3af36 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -148,7 +148,7 @@ abstract class MBeanSpec |""".stripMargin // awaitAssert to make sure that all nodes detects unreachable - within(5.seconds) { + within(15.seconds) { awaitAssert(mbeanServer.getAttribute(mbeanName, "ClusterStatus") must be(expectedJson)) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 68fb3df12b..0bb8ba6fc1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -105,8 +105,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro } muteDeadLetters( - classOf[ClusterHeartbeatReceiver.Heartbeat], - classOf[ClusterHeartbeatReceiver.EndHeartbeat], + classOf[ClusterHeartbeatSender.Heartbeat], + classOf[ClusterHeartbeatSender.HeartbeatRsp], classOf[GossipEnvelope], classOf[GossipStatus], classOf[MetricsGossipEnvelope], diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 7c41ebed19..4c356d5081 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -32,9 +32,7 @@ class ClusterConfigSpec extends AkkaSpec { GossipTimeToLive must be(2 seconds) HeartbeatInterval must be(1 second) MonitoredByNrOfMembers must be(5) - HeartbeatRequestDelay must be(10 seconds) HeartbeatExpectedResponseAfter must be(5 seconds) - HeartbeatRequestTimeToLive must be(1 minute) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) PublishStatsInterval must be(Duration.Undefined) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index adf9edb1c9..652d49a85a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -7,100 +7,212 @@ package akka.cluster import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.actor.Address -import akka.routing.ConsistentHash import scala.concurrent.duration._ import scala.collection.immutable -import scala.collection.immutable.HashSet +import akka.remote.FailureDetector +import akka.remote.DefaultFailureDetectorRegistry +import scala.concurrent.forkjoin.ThreadLocalRandom + +object ClusterHeartbeatSenderStateSpec { + class FailureDetectorStub extends FailureDetector { + + trait Status + object Up extends Status + object Down extends Status + object Unknown extends Status + + private var status: Status = Unknown + + def markNodeAsUnavailable(): Unit = status = Down + + def markNodeAsAvailable(): Unit = status = Up + + override def isAvailable: Boolean = status match { + case Unknown | Up ⇒ true + case Down ⇒ false + } + + override def isMonitoring: Boolean = status != Unknown + + override def heartbeat(): Unit = status = Up + + } +} @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { + import ClusterHeartbeatSenderStateSpec._ - val selfAddress = Address("akka.tcp", "sys", "myself", 2552) - val aa = Address("akka.tcp", "sys", "aa", 2552) - val bb = Address("akka.tcp", "sys", "bb", 2552) - val cc = Address("akka.tcp", "sys", "cc", 2552) - val dd = Address("akka.tcp", "sys", "dd", 2552) - val ee = Address("akka.tcp", "sys", "ee", 2552) + val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1) + val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2) + val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3) + val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) + val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) - val emptyState = ClusterHeartbeatSenderState.empty(selfAddress, 3) + def emptyState: ClusterHeartbeatSenderState = emptyState(aa) + + def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState( + ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), monitoredByNrOfMembers = 3), + unreachable = Set.empty[UniqueAddress], + failureDetector = new DefaultFailureDetectorRegistry[Address](() ⇒ new FailureDetectorStub)) + + def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub = + state.failureDetector.asInstanceOf[DefaultFailureDetectorRegistry[Address]].failureDetector(node.address). + get.asInstanceOf[FailureDetectorStub] "A ClusterHeartbeatSenderState" must { "return empty active set when no nodes" in { - emptyState.active.isEmpty must be(true) + emptyState.activeReceivers.isEmpty must be(true) } - "include heartbeatRequest in active set" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds) - s.heartbeatRequest.keySet must be(Set(aa)) - s.active must be(Set(aa)) + "init with empty" in { + emptyState.init(Set.empty).activeReceivers must be(Set.empty) } - "remove heartbeatRequest from active set after removeOverdueHeartbeatRequest" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now - 30.seconds).removeOverdueHeartbeatRequest() - s.heartbeatRequest must be(Map.empty) - s.active must be(Set.empty) - s.ending must be(Set(aa)) + "init with self" in { + emptyState.init(Set(aa, bb, cc)).activeReceivers must be(Set(bb, cc)) } - "remove heartbeatRequest after reset" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)) - s.heartbeatRequest must be(Map.empty) + "init without self" in { + emptyState.init(Set(bb, cc)).activeReceivers must be(Set(bb, cc)) } - "remove heartbeatRequest after addMember" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).addMember(aa) - s.heartbeatRequest must be(Map.empty) + "use added members" in { + emptyState.addMember(bb).addMember(cc).activeReceivers must be(Set(bb, cc)) } - "remove heartbeatRequest after removeMember" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)).removeMember(aa) - s.heartbeatRequest must be(Map.empty) - s.ending must be(Set(aa)) + "not use removed members" in { + emptyState.addMember(bb).addMember(cc).removeMember(bb).activeReceivers must be(Set(cc)) } - "remove from ending after addHeartbeatRequest" in { - val s = emptyState.reset(HashSet(aa, bb)).removeMember(aa) - s.ending must be(Set(aa)) - val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds) - s2.heartbeatRequest.keySet must be(Set(aa)) - s2.ending must be(Set.empty) + "use specified number of members" in { + // they are sorted by the hash (uid) of the UniqueAddress + emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).activeReceivers must be(Set(bb, cc, dd)) } - "include nodes from reset in active set" in { - val nodes = HashSet(aa, bb, cc) - val s = emptyState.reset(nodes) - s.current must be(nodes) - s.ending must be(Set.empty) - s.active must be(nodes) + "update failure detector in active set" in { + val s1 = emptyState.addMember(bb).addMember(cc).addMember(dd) + val s2 = s1.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) + s2.failureDetector.isMonitoring(bb.address) must be(true) + s2.failureDetector.isMonitoring(cc.address) must be(true) + s2.failureDetector.isMonitoring(dd.address) must be(true) + s2.failureDetector.isMonitoring(ee.address) must be(false) } - "limit current nodes to monitoredByNrOfMembers when adding members" in { - val nodes = Set(aa, bb, cc, dd) - val s = nodes.foldLeft(emptyState) { _ addMember _ } - s.current.size must be(3) - s.addMember(ee).current.size must be(3) + "continue to use unreachable" in { + val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee) + val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) + fd(s2, ee).markNodeAsUnavailable() + s2.failureDetector.isAvailable(ee.address) must be(false) + s2.addMember(bb).activeReceivers must be(Set(bb, cc, dd, ee)) } - "move member to ending set when removing member" in { - val nodes = HashSet(aa, bb, cc, dd, ee) - val s = emptyState.reset(nodes) - s.ending must be(Set.empty) - val included = s.current.head - val s2 = s.removeMember(included) - s2.ending must be(Set(included)) - s2.current must not contain (included) - val s3 = s2.addMember(included) - s3.current must contain(included) - s3.ending must not contain (included) + "remove unreachable when coming back" in { + val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee) + val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) + fd(s2, dd).markNodeAsUnavailable() + fd(s2, ee).markNodeAsUnavailable() + val s3 = s2.addMember(bb) + s3.activeReceivers must be(Set(bb, cc, dd, ee)) + val s4 = s3.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) + s4.activeReceivers must be(Set(bb, cc, dd)) + s4.failureDetector.isMonitoring(ee.address) must be(false) } - "remove ending correctly" in { - val s = emptyState.reset(HashSet(aa)).removeMember(aa) - s.ending must be(Set(aa)) - val s2 = s.removeEnding(aa) - s2.ending must be(Set.empty) + "remove unreachable when member removed" in { + val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee) + val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) + fd(s2, cc).markNodeAsUnavailable() + fd(s2, ee).markNodeAsUnavailable() + val s3 = s2.addMember(bb).heartbeatRsp(bb) + s3.activeReceivers must be(Set(bb, cc, dd, ee)) + val s4 = s3.removeMember(cc).removeMember(ee) + s4.activeReceivers must be(Set(bb, dd)) + s4.failureDetector.isMonitoring(cc.address) must be(false) + s4.failureDetector.isMonitoring(ee.address) must be(false) + } + + "behave correctly for random operations" in { + val rnd = ThreadLocalRandom.current + val nodes = (1 to rnd.nextInt(10, 200)).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n)).toVector + def rndNode() = nodes(rnd.nextInt(0, nodes.size)) + val selfUniqueAddress = rndNode() + var state = emptyState(selfUniqueAddress) + val Add = 0 + val Remove = 1 + val Unreachable = 2 + val HeartbeatRsp = 3 + for (i ← 1 to 100000) { + val operation = rnd.nextInt(Add, HeartbeatRsp + 1) + val node = rndNode() + try { + operation match { + case Add ⇒ + if (node != selfUniqueAddress && !state.ring.nodes.contains(node)) { + val oldUnreachable = state.unreachable + state = state.addMember(node) + // keep unreachable + (oldUnreachable -- state.activeReceivers) must be(Set.empty) + state.failureDetector.isMonitoring(node.address) must be(false) + state.failureDetector.isAvailable(node.address) must be(true) + } + + case Remove ⇒ + if (node != selfUniqueAddress && state.ring.nodes.contains(node)) { + val oldUnreachable = state.unreachable + state = state.removeMember(node) + // keep unreachable, unless it was the removed + if (oldUnreachable(node)) + (oldUnreachable -- state.activeReceivers) must be(Set(node)) + else + (oldUnreachable -- state.activeReceivers) must be(Set.empty) + + state.failureDetector.isMonitoring(node.address) must be(false) + state.failureDetector.isAvailable(node.address) must be(true) + state.activeReceivers must not contain (node) + } + + case Unreachable ⇒ + if (node != selfUniqueAddress && state.activeReceivers(node)) { + state.failureDetector.heartbeat(node.address) // make sure the fd is created + fd(state, node).markNodeAsUnavailable() + state.failureDetector.isMonitoring(node.address) must be(true) + state.failureDetector.isAvailable(node.address) must be(false) + } + + case HeartbeatRsp ⇒ + if (node != selfUniqueAddress && state.ring.nodes.contains(node)) { + val oldUnreachable = state.unreachable + val oldReceivers = state.activeReceivers + val oldRingReceivers = state.ring.myReceivers + state = state.heartbeatRsp(node) + + if (oldUnreachable(node)) + state.unreachable must not contain (node) + + if (oldUnreachable(node) && !oldRingReceivers(node)) + state.failureDetector.isMonitoring(node.address) must be(false) + + if (oldRingReceivers(node)) + state.failureDetector.isMonitoring(node.address) must be(true) + + state.ring.myReceivers must be(oldRingReceivers) + state.failureDetector.isAvailable(node.address) must be(true) + + } + + } + } catch { + case e: Throwable ⇒ + println(s"Failure context: i=$i, node=$node, op=$operation, unreachable=${state.unreachable}, " + + s"ringReceivers=${state.ring.myReceivers}, ringNodes=${state.ring.nodes}") + throw e + } + } + } } } + diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala index 4119a2d7f8..74c217d3fe 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala @@ -16,58 +16,21 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with ShouldMatchers { val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("10000").toInt def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = { - val nodes = (1 to size).map(n ⇒ Address("akka.tcp", "sys", "node-" + n, 2552)).toSet - val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552) - HeartbeatNodeRing(selfAddress, nodes, 5) - } - - def createClusterHeartbeatSenderStateOfSize(size: Int): ClusterHeartbeatSenderState = { - val nodes = (1 to size).map(n ⇒ Address("akka.tcp", "sys", "node-" + n, 2552)).to[HashSet] - val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552) - ClusterHeartbeatSenderState.empty(selfAddress, 5).reset(nodes) + val nodes = (1 to size).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n)) + val selfAddress = nodes(size / 2) + HeartbeatNodeRing(selfAddress, nodes.toSet, 5) } val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize) - val heartbeatSenderState = createClusterHeartbeatSenderStateOfSize(nodesSize) def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing ⇒ Unit, times: Int): Unit = for (i ← 1 to times) thunk(ring) - def checkThunkForState(state: ClusterHeartbeatSenderState, thunk: ClusterHeartbeatSenderState ⇒ Unit, times: Int): Unit = - for (i ← 1 to times) thunk(state) - def myReceivers(ring: HeartbeatNodeRing): Unit = { val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers) r.myReceivers.isEmpty should be(false) } - def mySenders(ring: HeartbeatNodeRing): Unit = { - val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers) - r.mySenders.isEmpty should be(false) - } - - def reset(state: ClusterHeartbeatSenderState): Unit = { - val s = ClusterHeartbeatSenderState.empty(state.ring.selfAddress, state.ring.monitoredByNrOfMembers).reset( - state.ring.nodes.asInstanceOf[HashSet[Address]]) - s.active.isEmpty should be(false) - } - - def addMember(state: ClusterHeartbeatSenderState): Unit = { - val s = state.addMember(Address("akka.tcp", "sys", "new-node", 2552)) - s.active.isEmpty should be(false) - } - - def removeMember(state: ClusterHeartbeatSenderState): Unit = { - val s = state.removeMember(Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552)) - s.active.isEmpty should be(false) - } - - def addHeartbeatRequest(state: ClusterHeartbeatSenderState): Unit = { - val a = Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552) - val s = state.addHeartbeatRequest(a, Deadline.now) - s.active should contain(a) - } - s"HeartbeatNodeRing of size $nodesSize" must { s"do a warm up run, $iterations times" in { @@ -78,32 +41,6 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with ShouldMatchers { checkThunkForRing(heartbeatNodeRing, myReceivers, iterations) } - s"produce mySenders, $iterations times" in { - checkThunkForRing(heartbeatNodeRing, mySenders, iterations) - } } - s"ClusterHeartbeatSenderState of size $nodesSize" must { - - s"do a warm up run, $iterations times" in { - checkThunkForState(heartbeatSenderState, reset, iterations) - } - - s"reset, $iterations times" in { - checkThunkForState(heartbeatSenderState, reset, iterations) - } - - s"addMember node, $iterations times" in { - checkThunkForState(heartbeatSenderState, addMember, iterations) - } - - s"removeMember node, $iterations times" in { - checkThunkForState(heartbeatSenderState, removeMember, iterations) - } - - s"addHeartbeatRequest node, $iterations times" in { - checkThunkForState(heartbeatSenderState, addHeartbeatRequest, iterations) - } - - } } diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala index 619ae0a630..a710fec57d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala @@ -14,11 +14,11 @@ import scala.collection.immutable @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class HeartbeatNodeRingSpec extends WordSpec with MustMatchers { - val aa = Address("akka.tcp", "sys", "aa", 2552) - val bb = Address("akka.tcp", "sys", "bb", 2552) - val cc = Address("akka.tcp", "sys", "cc", 2552) - val dd = Address("akka.tcp", "sys", "dd", 2552) - val ee = Address("akka.tcp", "sys", "ee", 2552) + val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1) + val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2) + val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3) + val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) + val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) val nodes = Set(aa, bb, cc, dd, ee) @@ -42,19 +42,9 @@ class HeartbeatNodeRingSpec extends WordSpec with MustMatchers { HeartbeatNodeRing(cc, nodes, 6).myReceivers must be(expected) } - "have matching senders and receivers" in { - val ring = HeartbeatNodeRing(cc, nodes, 3) - ring.mySenders must be(ring.senders(cc)) - - for (sender ← nodes; receiver ← ring.receivers(sender)) { - ring.senders(receiver) must contain(sender) - } - } - "pick none when alone" in { val ring = HeartbeatNodeRing(cc, Set(cc), 3) ring.myReceivers must be(Set()) - ring.mySenders must be(Set()) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index ab0b4ee73f..c3ebd8afa2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -52,10 +52,8 @@ class ClusterMessageSerializerSpec extends AkkaSpec( checkSerialization(InternalClusterAction.InitJoin) checkSerialization(InternalClusterAction.InitJoinAck(address)) checkSerialization(InternalClusterAction.InitJoinNack(address)) - checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address)) - checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address)) - checkSerialization(ClusterHeartbeatReceiver.EndHeartbeatAck(address)) - checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address)) + checkSerialization(ClusterHeartbeatSender.Heartbeat(address)) + checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress)) val node1 = VectorClock.Node("node1") val node2 = VectorClock.Node("node2") diff --git a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst index 4cc1139a17..8d2e54b26b 100644 --- a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst @@ -92,3 +92,10 @@ without much trouble. Read more about the new routers in the :ref:`documentation for Scala ` and :ref:`documentation for Java `. + +Changed cluster expected-response-after configuration +===================================================== + +Configuration property ``akka.cluster.failure-detector.heartbeat-request.expected-response-after`` +has been renamed to ``akka.cluster.failure-detector.expected-response-after``. + \ No newline at end of file