diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 630a2f8024..5a72e5eaae 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -69,6 +69,11 @@ message Welcome { * Sends an Address */ + /** + * EndHeartbeatAck + * Sends an Address + */ + /** * HeartbeatRequest * Sends an Address @@ -114,9 +119,34 @@ message Gossip { message GossipOverview { /* This is the address indexes for the nodes that have seen this gossip */ repeated int32 seen = 1; - repeated Member unreachable = 2; + repeated ObserverReachability observerReachability = 2; } +/** + * Reachability + */ +message ObserverReachability { + required int32 addressIndex = 1; + required int64 version = 4; + repeated SubjectReachability subjectReachability = 2; +} + +message SubjectReachability { + required int32 addressIndex = 1; + required ReachabilityStatus status = 3; + required int64 version = 4; +} + +/** + * Reachability status + */ +enum ReachabilityStatus { + Reachable = 0; + Unreachable = 1; + Terminated = 2; +} + + /** * Member */ diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 960d3d50fa..8f275830d8 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -128,11 +128,6 @@ akka { # i.e. each node will be monitored by this number of other nodes. monitored-by-nr-of-members = 5 - # When a node stops sending heartbeats to another node it will end that - # with this number of EndHeartbeat messages, which will remove the - # monitoring from the failure detector. - nr-of-end-heartbeats = 8 - # When no expected heartbeat message has been received an explicit # heartbeat request is sent to the node that should emit heartbeats. heartbeat-request { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index d7d4eb3be5..fafd58ab2d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -15,6 +15,7 @@ import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import scala.collection.breakOut +import akka.remote.QuarantinedEvent /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -264,13 +265,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick)) } - override def preStart(): Unit = + override def preStart(): Unit = { + context.system.eventStream.subscribe(self, classOf[QuarantinedEvent]) if (SeedNodes.isEmpty) logInfo("No seed-nodes configured, manual cluster join required") else self ! JoinSeedNodes(SeedNodes) + } override def postStop(): Unit = { + context.system.eventStream.unsubscribe(self) gossipTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() @@ -323,6 +327,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case ClusterUserAction.Leave(address) ⇒ leaving(address) case SendGossipTo(address) ⇒ sendGossipTo(address) case msg: SubscriptionMessage ⇒ publisher forward msg + case QuarantinedEvent(address, uid) ⇒ quarantined(UniqueAddress(address, uid)) case ClusterUserAction.JoinTo(address) ⇒ logInfo("Trying to join [{}] when already part of a cluster, ignoring", address) case JoinSeedNodes(seedNodes) ⇒ @@ -419,12 +424,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with selfAddress.system, node.address.system) else { val localMembers = latestGossip.members - val localUnreachable = latestGossip.overview.unreachable // check by address without uid to make sure that node with same host:port is not allowed // to join until previous node with that host:port has been removed from the cluster val alreadyMember = localMembers.exists(_.address == node.address) - val isUnreachable = localUnreachable.exists(_.address == node.address) + val isUnreachable = !latestGossip.overview.reachability.isReachable(node) if (alreadyMember) logInfo("Existing member [{}] is trying to join, ignoring", node) @@ -488,14 +492,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } /** - * This method is called when a member sees itself as Exiting. + * This method is called when a member sees itself as Exiting or Down. */ def shutdown(): Unit = cluster.shutdown() /** - * State transition to DOW. - * The node to DOWN is removed from the `members` set and put in the `unreachable` set (if not already there) - * and its status is set to DOWN. The node is also removed from the `seen` table. + * State transition to DOWN. + * Its status is set to DOWN. The node is also removed from the `seen` table. * * The node will eventually be removed by the leader, and only after removal a new node with same address can * join the cluster through the normal joining procedure. @@ -505,46 +508,50 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localMembers = localGossip.members val localOverview = localGossip.overview val localSeen = localOverview.seen - val localUnreachableMembers = localOverview.unreachable + val localReachability = localOverview.reachability - // 1. check if the node to DOWN is in the `members` set - val downedMember: Option[Member] = - localMembers.collectFirst { case m if m.address == address ⇒ m.copy(status = Down) } - - val newMembers = downedMember match { + // check if the node to DOWN is in the `members` set + localMembers.collectFirst { case m if m.address == address ⇒ m.copy(status = Down) } match { case Some(m) ⇒ - logInfo("Marking node [{}] as [{}]", m.address, Down) - localMembers - m - case None ⇒ localMembers + if (localReachability.isReachable(m.uniqueAddress)) + logInfo("Marking node [{}] as [{}]", m.address, Down) + else + logInfo("Marking unreachable node [{}] as [{}]", m.address, Down) + + // replace member (changed status) + val newMembers = localMembers - m + m + // remove nodes marked as DOWN from the `seen` table + val newSeen = localSeen - m.uniqueAddress + + // update gossip overview + val newOverview = localOverview copy (seen = newSeen) + val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip + updateLatestGossip(newGossip) + + publish(latestGossip) + case None ⇒ + logInfo("Ignoring down of unknown node [{}] as [{}]", address) } - // 2. check if the node to DOWN is in the `unreachable` set - val newUnreachableMembers = - localUnreachableMembers.map { member ⇒ - // no need to DOWN members already DOWN - if (member.address == address && member.status != Down) { - logInfo("Marking unreachable node [{}] as [{}]", member.address, Down) - member copy (status = Down) - } else member - } + } - // 3. add the newly DOWNED members from the `members` (in step 1.) to the `newUnreachableMembers` set. - val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember - - // 4. remove nodes marked as DOWN from the `seen` table - val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down ⇒ m.uniqueAddress } - - // update gossip overview - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) - val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip - updateLatestGossip(newGossip) - - publish(latestGossip) + def quarantined(node: UniqueAddress): Unit = { + val localGossip = latestGossip + if (localGossip.hasMember(node)) { + val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, node) + val newOverview = localGossip.overview copy (reachability = newReachability) + val newGossip = localGossip copy (overview = newOverview) + updateLatestGossip(newGossip) + log.warning("Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine", + selfAddress, node.address) + publish(latestGossip) + downing(node.address) + } } def receiveGossipStatus(status: GossipStatus): Unit = { val from = status.from - if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from)) + if (!latestGossip.overview.reachability.isReachable(selfUniqueAddress, from)) logInfo("Ignoring received gossip status from unreachable [{}] ", from) else if (latestGossip.members.forall(_.uniqueAddress != from)) log.debug("Cluster Node [{}] - Ignoring received gossip status from unknown [{}]", selfAddress, from) @@ -578,10 +585,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (envelope.to != selfUniqueAddress) { logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) Ignored - } else if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { + } else if (!remoteGossip.overview.reachability.isReachable(selfUniqueAddress)) { logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address) Ignored - } else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) { + } else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) { logInfo("Ignoring received gossip from unreachable [{}] ", from) Ignored } else if (localGossip.members.forall(_.uniqueAddress != from)) { @@ -634,7 +641,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with publish(latestGossip) - if (latestGossip.member(selfUniqueAddress).status == Exiting) + val selfStatus = latestGossip.member(selfUniqueAddress).status + if (selfStatus == Exiting || selfStatus == Down) shutdown() else if (talkback) { // send back gossip to sender when sender had different view, i.e. merge, or sender had @@ -653,23 +661,26 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def gossip(): Unit = { log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) - if (!isSingletonCluster && isAvailable) { + if (!isSingletonCluster) { val localGossip = latestGossip val preferredGossipTargets: Vector[UniqueAddress] = if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view // gossip to a random alive member with preference to a member with older gossip version - localGossip.members.collect { case m if !localGossip.seenByNode(m.uniqueAddress) ⇒ m.uniqueAddress }(breakOut) - } else Vector.empty[UniqueAddress] + localGossip.members.collect { + case m if !localGossip.seenByNode(m.uniqueAddress) && validNodeForGossip(m.uniqueAddress) ⇒ + m.uniqueAddress + }(breakOut) + } else Vector.empty if (preferredGossipTargets.nonEmpty) { - val peer = selectRandomNode(preferredGossipTargets filterNot (_ == selfUniqueAddress)) + val peer = selectRandomNode(preferredGossipTargets) // send full gossip because it has different view peer foreach gossipTo } else { // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect { - case m if m.uniqueAddress != selfUniqueAddress ⇒ m.uniqueAddress + case m if validNodeForGossip(m.uniqueAddress) ⇒ m.uniqueAddress }) peer foreach { node ⇒ if (localGossip.seenByNode(node)) gossipStatusTo(node) @@ -684,8 +695,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with * assigning partitions etc. */ def leaderActions(): Unit = - if (latestGossip.isLeader(selfUniqueAddress) && isAvailable) { - // only run the leader actions if we are the LEADER and available + if (latestGossip.isLeader(selfUniqueAddress)) { + // only run the leader actions if we are the LEADER if (AutoDown) leaderAutoDownActions() @@ -712,7 +723,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localMembers = localGossip.members val localOverview = localGossip.overview val localSeen = localOverview.seen - val localUnreachableMembers = localOverview.unreachable val hasPartionHandoffCompletedSuccessfully: Boolean = { // TODO implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully @@ -726,9 +736,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers - val (removedUnreachable, newUnreachable) = localUnreachableMembers partition { m ⇒ - Gossip.removeUnreachableWithMemberStatus(m.status) - } + val removedUnreachable = for { + node ← localOverview.reachability.allUnreachableOrTerminated + m = localGossip.member(node) + if Gossip.removeUnreachableWithMemberStatus(m.status) + } yield m val changedMembers = localMembers collect { var upNumber = 0 @@ -758,12 +770,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // handle changes // replace changed members - val newMembers = localMembers -- changedMembers ++ changedMembers + val newMembers = changedMembers ++ localMembers -- removedUnreachable // removing REMOVED nodes from the `seen` table - val newSeen = localSeen -- removedUnreachable.map(_.uniqueAddress) - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview - val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip + val removed = removedUnreachable.map(_.uniqueAddress) + val newSeen = localSeen -- removed + // removing REMOVED nodes from the `reachability` table + val newReachability = localOverview.reachability.remove(removed) + val newOverview = localOverview copy (seen = newSeen, reachability = newReachability) + val newGossip = localGossip copy (members = newMembers, overview = newOverview) updateLatestGossip(newGossip) @@ -802,25 +817,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with */ def leaderAutoDownActions(): Unit = { val localGossip = latestGossip + val localMembers = localGossip.members val localOverview = localGossip.overview val localSeen = localOverview.seen - val localUnreachableMembers = localOverview.unreachable - val changedUnreachableMembers = localUnreachableMembers collect { - case m if !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status) ⇒ m copy (status = Down) - } + val changedUnreachableMembers = for { + node ← localOverview.reachability.allUnreachableOrTerminated + m = localGossip.member(node) + if m.status != Removed && !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status) + } yield m.copy(status = Down) if (changedUnreachableMembers.nonEmpty) { // handle changes // replace changed unreachable - val newUnreachableMembers = localUnreachableMembers -- changedUnreachableMembers ++ changedUnreachableMembers + val newMembers = localMembers -- changedUnreachableMembers ++ changedUnreachableMembers // removing nodes marked as Down/Exiting from the `seen` table val newSeen = localSeen -- changedUnreachableMembers.map(_.uniqueAddress) - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview - val newGossip = localGossip copy (overview = newOverview) // update gossip + val newOverview = localOverview copy (seen = newSeen) // update gossip overview + val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip updateLatestGossip(newGossip) @@ -834,39 +851,54 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } /** - * Reaps the unreachable members (moves them to the `unreachable` list in the cluster overview) according to the failure detector's verdict. + * Reaps the unreachable members according to the failure detector's verdict. */ def reapUnreachableMembers(): Unit = { - if (!isSingletonCluster && isAvailable) { - // only scrutinize if we are a non-singleton cluster and available + if (!isSingletonCluster) { + // only scrutinize if we are a non-singleton cluster val localGossip = latestGossip val localOverview = localGossip.overview val localMembers = localGossip.members - val localUnreachableMembers = localGossip.overview.unreachable val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ - member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address) + member.uniqueAddress == selfUniqueAddress || + localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Unreachable || + localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Terminated || + failureDetector.isAvailable(member.address) } - if (newlyDetectedUnreachableMembers.nonEmpty) { + val newlyDetectedReachableMembers = localOverview.reachability.allUnreachableFrom(selfUniqueAddress) collect { + case node if node != selfUniqueAddress && failureDetector.isAvailable(node.address) ⇒ + localGossip.member(node) + } - val newMembers = localMembers -- newlyDetectedUnreachableMembers - val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers + if (newlyDetectedUnreachableMembers.nonEmpty || newlyDetectedReachableMembers.nonEmpty) { - val newOverview = localOverview copy (unreachable = newUnreachableMembers) - val newGossip = localGossip copy (overview = newOverview, members = newMembers) + val newReachability1 = (localOverview.reachability /: newlyDetectedUnreachableMembers) { + (reachability, m) ⇒ reachability.unreachable(selfUniqueAddress, m.uniqueAddress) + } + val newReachability2 = (newReachability1 /: newlyDetectedReachableMembers) { + (reachability, m) ⇒ reachability.reachable(selfUniqueAddress, m.uniqueAddress) + } - updateLatestGossip(newGossip) + if (newReachability2 ne localOverview.reachability) { + val newOverview = localOverview copy (reachability = newReachability2) + val newGossip = localGossip copy (overview = newOverview) - val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting) - if (nonExiting.nonEmpty) - log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", ")) - if (exiting.nonEmpty) - logInfo("Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", - exiting.mkString(", ")) + updateLatestGossip(newGossip) - publish(latestGossip) + val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting) + if (nonExiting.nonEmpty) + log.warning("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", ")) + if (exiting.nonEmpty) + logInfo("Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", + exiting.mkString(", ")) + if (newlyDetectedReachableMembers.nonEmpty) + logInfo("Marking node(s) as REACHABLE [{}]", newlyDetectedReachableMembers.mkString(", ")) + + publish(latestGossip) + } } } } @@ -877,8 +909,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def isSingletonCluster: Boolean = latestGossip.isSingletonCluster - def isAvailable: Boolean = !latestGossip.isUnreachable(selfUniqueAddress) - // needed for tests def sendGossipTo(address: Address): Unit = { latestGossip.members.foreach(m ⇒ @@ -906,7 +936,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version) def validNodeForGossip(node: UniqueAddress): Boolean = - (node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node)) + (node != selfUniqueAddress && latestGossip.hasMember(node) && + latestGossip.overview.reachability.isReachable(node)) def updateLatestGossip(newGossip: Gossip): Unit = { // Updating the vclock version for the changes diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 32359f1811..fd7f522645 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -10,7 +10,6 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream -import akka.actor.AddressTerminated import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** @@ -144,10 +143,23 @@ object ClusterEvent { def getLeader: Address = leader orNull } + /** + * Marker interface to facilitate subscription of + * both [[UnreachableMember]] and [[ReachableMember]]. + */ + sealed trait ReachabilityEvent extends ClusterDomainEvent + /** * A member is considered as unreachable by the failure detector. */ - case class UnreachableMember(member: Member) extends ClusterDomainEvent + case class UnreachableMember(member: Member) extends ReachabilityEvent + + /** + * A member is considered as reachable by the failure detector + * after having been unreachable. + * @see [[UnreachableMember]] + */ + case class ReachableMember(member: Member) extends ReachabilityEvent /** * Current snapshot of cluster node metrics. Published to subscribers. @@ -166,6 +178,11 @@ object ClusterEvent { */ private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent + /** + * INTERNAL API + */ + private[cluster] case class ReachabilityChanged(reachability: Reachability) extends ClusterDomainEvent + /** * INTERNAL API */ @@ -179,10 +196,24 @@ object ClusterEvent { private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] = if (newGossip eq oldGossip) Nil else { - val newUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable - val unreachableEvents = newUnreachable map UnreachableMember + val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated + (newGossip.overview.reachability.allUnreachableOrTerminated.collect { + case node if !oldUnreachableNodes.contains(node) ⇒ + UnreachableMember(newGossip.member(node)) + })(collection.breakOut) + } + + /** + * INTERNAL API + */ + private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachableMember] = + if (newGossip eq oldGossip) Nil + else { + (oldGossip.overview.reachability.allUnreachable.collect { + case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) ⇒ + ReachableMember(newGossip.member(node)) + })(collection.breakOut) - immutable.Seq.empty ++ unreachableEvents } /** @@ -202,10 +233,7 @@ object ClusterEvent { // no events for other transitions } - val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable - - val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++ - (oldGossip.overview.unreachable -- newGossip.overview.unreachable) + val removedMembers = oldGossip.members -- newGossip.members val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed), m.status)) (new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result() @@ -243,6 +271,14 @@ object ClusterEvent { List(SeenChanged(newConvergence, newSeenBy.map(_.address))) else Nil } + + /** + * INTERNAL API + */ + private[cluster] def diffReachability(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachabilityChanged] = + if (newGossip.overview.reachability eq oldGossip.overview.reachability) Nil + else List(ReachabilityChanged(newGossip.overview.reachability)) + } /** @@ -283,7 +319,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { val state = CurrentClusterState( members = latestGossip.members, - unreachable = latestGossip.overview.unreachable, + unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member, seenBy = latestGossip.seenBy.map(_.address), leader = latestGossip.leader.map(_.address), roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut)) @@ -307,13 +343,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val oldGossip = latestGossip // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - // first publish the diffUnreachable between the last two gossips diffUnreachable(oldGossip, newGossip) foreach publish + diffReachable(oldGossip, newGossip) foreach publish diffMemberEvents(oldGossip, newGossip) foreach publish diffLeader(oldGossip, newGossip) foreach publish diffRolesLeader(oldGossip, newGossip) foreach publish // publish internal SeenState for testing purposes diffSeen(oldGossip, newGossip) foreach publish + diffReachability(oldGossip, newGossip) foreach publish } def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index bcfde2f07e..14813dfd02 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -26,6 +26,12 @@ private[akka] object ClusterHeartbeatReceiver { * this node. */ case class EndHeartbeat(from: Address) extends ClusterMessage + + /** + * Acknowledgment that `EndHeartbeat` was received and heartbeating + * can stop. + */ + case class EndHeartbeatAck(from: Address) extends ClusterMessage } /** @@ -39,10 +45,13 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo import ClusterHeartbeatReceiver._ val failureDetector = Cluster(context.system).failureDetector + val selfEndHeartbeatAck = EndHeartbeatAck(Cluster(context.system).selfAddress) def receive = { - case Heartbeat(from) ⇒ failureDetector heartbeat from - case EndHeartbeat(from) ⇒ failureDetector remove from + case Heartbeat(from) ⇒ failureDetector heartbeat from + case EndHeartbeat(from) ⇒ + failureDetector remove from + sender ! selfEndHeartbeatAck } } @@ -103,7 +112,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[UnreachableMember]) } override def postStop(): Unit = { @@ -126,7 +134,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def receive = { case HeartbeatTick ⇒ heartbeat() case MemberUp(m) ⇒ addMember(m) - case UnreachableMember(m) ⇒ removeMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) case s: CurrentClusterState ⇒ reset(s) case MemberExited(m) ⇒ memberExited(m) @@ -134,6 +141,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case HeartbeatRequest(from) ⇒ addHeartbeatRequest(from) case SendHeartbeatRequest(to) ⇒ sendHeartbeatRequest(to) case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) + case EndHeartbeatAck(from) ⇒ ackEndHeartbeat(from) } def reset(snapshot: CurrentClusterState): Unit = @@ -183,15 +191,12 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg heartbeatReceiver(to) ! selfHeartbeat } - // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is - // sent to notify it that no more heartbeats will be sent. - for ((to, count) ← state.ending) { + // When sending heartbeats to a node is stopped a `EndHeartbeat` messages are + // sent to notify it that no more heartbeats will be sent. This will continue + // until `EndHeartbeatAck` is received. + for (to ← state.ending) { log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", cluster.selfAddress, to) heartbeatReceiver(to) ! selfEndHeartbeat - if (count == NumberOfEndHeartbeats) - state = state.removeEnding(to) - else - state = state.increaseEndingCount(to) } // request heartbeats from expected sender node if no heartbeat messages has been received @@ -202,6 +207,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } + def ackEndHeartbeat(from: Address): Unit = { + state.removeEnding(from) + } + } /** @@ -225,7 +234,7 @@ private[cluster] object ClusterHeartbeatSenderState { val curr = ring.myReceivers // start ending process for nodes not selected any more // abort ending process for nodes that have been selected again - val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr + val end = old.ending ++ (old.current -- curr) -- curr old.copy(ring = ring, current = curr, ending = end, heartbeatRequest = old.heartbeatRequest -- curr) } @@ -243,14 +252,14 @@ private[cluster] object ClusterHeartbeatSenderState { private[cluster] case class ClusterHeartbeatSenderState private ( ring: HeartbeatNodeRing, current: Set[Address] = Set.empty, - ending: Map[Address, Int] = Map.empty, + ending: Set[Address] = Set.empty, heartbeatRequest: Map[Address, Deadline] = Map.empty) { // TODO can be disabled as optimization assertInvariants() private def assertInvariants(): Unit = { - val currentAndEnding = current.intersect(ending.keySet) + val currentAndEnding = current.intersect(ending) require(currentAndEnding.isEmpty, s"Same nodes in current and ending not allowed, got [${currentAndEnding}]") @@ -282,7 +291,7 @@ private[cluster] case class ClusterHeartbeatSenderState private ( private def removeHeartbeatRequest(address: Address): ClusterHeartbeatSenderState = { if (heartbeatRequest contains address) - copy(heartbeatRequest = heartbeatRequest - address, ending = ending + (address -> 0)) + copy(heartbeatRequest = heartbeatRequest - address, ending = ending + address) else this } @@ -298,13 +307,11 @@ private[cluster] case class ClusterHeartbeatSenderState private ( val overdue = heartbeatRequest collect { case (address, deadline) if deadline.isOverdue ⇒ address } if (overdue.isEmpty) this else - copy(ending = ending ++ overdue.map(_ -> 0), heartbeatRequest = heartbeatRequest -- overdue) + copy(ending = ending ++ overdue, heartbeatRequest = heartbeatRequest -- overdue) } def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a) - def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1))) - } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index 53feb35beb..cf958599ef 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -35,13 +35,45 @@ trait ClusterNodeMBean { def getUnreachable: String /* - * String that will list all nodes in the node ring as follows: + * JSON format of the status of all nodes in the cluster as follows: * {{{ - * Members: - * Member(address = akka://system0@localhost:5550, status = Up) - * Member(address = akka://system1@localhost:5551, status = Up) - * Unreachable: - * Member(address = akka://system2@localhost:5553, status = Down) + * { + * "self-address": "akka://system@host1:2552", + * "members": [ + * { + * "address": "akka://system@host1:2552", + * "status": "Up" + * }, + * { + * "address": "akka://system@host2:2552", + * "status": "Up" + * }, + * { + * "address": "akka://system@host3:2552", + * "status": "Down" + * }, + * { + * "address": "akka://system@host4:2552", + * "status": "Joining" + * } + * ], + * "unreachable": [ + * { + * "node": "akka://system@host2:2552", + * "observed-by": [ + * "akka://system@host1:2552", + * "akka://system@host3:2552" + * ] + * }, + * { + * "node": "akka://system@host3:2552", + * "observed-by": [ + * "akka://system@host1:2552", + * "akka://system@host2:2552" + * ] + * } + * ] + * } * }}} */ def getClusterStatus: String @@ -102,9 +134,33 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { // JMX attributes (bean-style) def getClusterStatus: String = { - val unreachable = clusterView.unreachableMembers - "\nMembers:\n\t" + clusterView.members.mkString("\n\t") + - { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + val members = clusterView.members.toSeq.sorted(Member.ordering).map { m ⇒ + s"""{ + | "address": "${m.address}", + | "status": "${m.status}" + | }""".stripMargin + } mkString (",\n ") + + val unreachable = clusterView.reachability.observersGroupedByUnreachable.toSeq.sortBy(_._1).map { + case (subject, observers) ⇒ + s"""{ + | "node": "${subject.address}", + | "observed-by": [ + | ${observers.toSeq.sorted.map(_.address).mkString("\"", "\",\n \"", "\"")} + | ] + | }""".stripMargin + } mkString (",\n") + + s"""{ + | "self-address": "${clusterView.selfAddress}", + | "members": [ + | ${members} + | ], + | "unreachable": [ + | ${unreachable} + | ] + |} + |""".stripMargin } def getMembers: String = diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 028fbd4e14..7d914f2778 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -78,7 +78,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[UnreachableMember]) + cluster.subscribe(self, classOf[ReachabilityEvent]) logInfo("Metrics collection has started successfully") } @@ -91,6 +91,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto case MemberRemoved(m, _) ⇒ removeMember(m) case MemberExited(m) ⇒ removeMember(m) case UnreachableMember(m) ⇒ removeMember(m) + case ReachableMember(m) ⇒ if (m.status == Up) addMember(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index b1d280f45a..58dc99f435 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -26,6 +26,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { @volatile private var state: CurrentClusterState = CurrentClusterState() + @volatile + private var _reachability: Reachability = Reachability.empty + /** * Current internal cluster stats, updated periodically via event bus. */ @@ -50,15 +53,22 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case e: ClusterDomainEvent ⇒ e match { case SeenChanged(convergence, seenBy) ⇒ state = state.copy(seenBy = seenBy) + case ReachabilityChanged(reachability) ⇒ + _reachability = reachability case MemberRemoved(member, _) ⇒ state = state.copy(members = state.members - member, unreachable = state.unreachable - member) case UnreachableMember(member) ⇒ // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) + state = state.copy(unreachable = state.unreachable - member + member) + case ReachableMember(member) ⇒ + state = state.copy(unreachable = state.unreachable - member) case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) + val newUnreachable = + if (state.unreachable.contains(event.member)) state.unreachable - event.member + event.member + else state.unreachable state = state.copy(members = state.members - event.member + event.member, - unreachable = state.unreachable - event.member) + unreachable = newUnreachable) case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) case RoleLeaderChanged(role, leader) ⇒ @@ -73,7 +83,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def self: Member = { import cluster.selfUniqueAddress - state.members.find(_.uniqueAddress == selfUniqueAddress).orElse(state.unreachable.find(_.uniqueAddress == selfUniqueAddress)). + state.members.find(_.uniqueAddress == selfUniqueAddress). getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)) } @@ -127,6 +137,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { myself.status != MemberStatus.Removed } + def reachability: Reachability = _reachability + /** * Current cluster metrics. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 08c598754b..ffd3200bb0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -34,9 +34,6 @@ final class ClusterSettings(val config: Config, val systemName: String) { val HeartbeatRequestTimeToLive: FiniteDuration = { Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0") - val NumberOfEndHeartbeats: Int = { - FailureDetectorConfig.getInt("nr-of-end-heartbeats") - } requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0") val MonitoredByNrOfMembers: Int = { FailureDetectorConfig.getInt("monitored-by-nr-of-members") } requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0") diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 72a86daf64..b3b282dbba 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -67,24 +67,25 @@ private[cluster] case class Gossip( assertInvariants() private def assertInvariants(): Unit = { - val unreachableAndLive = members.intersect(overview.unreachable) - if (unreachableAndLive.nonEmpty) - throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" - format unreachableAndLive.mkString(", ")) - val allowedLiveMemberStatus: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) - def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatus(m.status) - if (members exists hasNotAllowedLiveMemberStatus) - throw new IllegalArgumentException("Live members must have status [%s], got [%s]" - format (allowedLiveMemberStatus.mkString(", "), - (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) + if (members.exists(_.status == Removed)) + throw new IllegalArgumentException(s"Live members must have status [${Removed}], " + + s"got [${members.filter(_.status == Removed)}]") - val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress) + val inReachabilityButNotMember = overview.reachability.allObservers -- members.map(_.uniqueAddress) + if (inReachabilityButNotMember.nonEmpty) + throw new IllegalArgumentException("Nodes not part of cluster in reachability table, got [%s]" + format inReachabilityButNotMember.mkString(", ")) + + val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) if (seenButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" format seenButNotMember.mkString(", ")) } + @transient private lazy val membersMap: Map[UniqueAddress, Member] = + members.map(m ⇒ m.uniqueAddress -> m)(collection.breakOut) + /** * Increments the version for this 'Node'. */ @@ -138,17 +139,17 @@ private[cluster] case class Gossip( // 1. merge vector clocks val mergedVClock = this.version merge that.version - // 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) + // 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups + val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members) - // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups, - // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + // 3. merge reachability table by picking records with highest version + val mergedReachability = this.overview.reachability.merge(mergedMembers.map(_.uniqueAddress), + that.overview.reachability) // 4. Nobody can have seen this new gossip yet val mergedSeen = Set.empty[UniqueAddress] - Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock) + Gossip(mergedMembers, GossipOverview(mergedSeen, mergedReachability), mergedVClock) } /** @@ -165,7 +166,8 @@ private[cluster] case class Gossip( // When that is done we check that all members with a convergence // status is in the seen table and has the latest vector clock // version - overview.unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) && + val unreachable = overview.reachability.allUnreachableOrTerminated map member + unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) && !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress)) } @@ -176,34 +178,28 @@ private[cluster] case class Gossip( def roleLeader(role: String): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role))) private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = { - if (mbrs.isEmpty) None - else mbrs.find(m ⇒ Gossip.leaderMemberStatus(m.status)). - orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.uniqueAddress) + val reachableMembers = + if (overview.reachability.isAllReachable) mbrs + else mbrs.filter(m ⇒ overview.reachability.isReachable(m.uniqueAddress)) + if (reachableMembers.isEmpty) None + else reachableMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status)). + orElse(Some(reachableMembers.min(Member.leaderStatusOrdering))).map(_.uniqueAddress) } def allRoles: Set[String] = members.flatMap(_.roles) def isSingletonCluster: Boolean = members.size == 1 - /** - * Returns true if the node is in the unreachable set - */ - def isUnreachable(node: UniqueAddress): Boolean = - overview.unreachable exists { _.uniqueAddress == node } - def member(node: UniqueAddress): Member = { - members.find(_.uniqueAddress == node).orElse(overview.unreachable.find(_.uniqueAddress == node)). - getOrElse(Member.removed(node)) // placeholder for removed member + membersMap.getOrElse(node, + Member.removed(node)) // placeholder for removed member } + def hasMember(node: UniqueAddress): Boolean = membersMap.contains(node) + def youngestMember: Member = { require(members.nonEmpty, "No youngest when no members") - def maxByUpNumber(mbrs: Iterable[Member]): Member = - mbrs.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) - if (overview.unreachable.isEmpty) - maxByUpNumber(members) - else - maxByUpNumber(members ++ overview.unreachable) + members.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) } override def toString = @@ -217,10 +213,10 @@ private[cluster] case class Gossip( @SerialVersionUID(1L) private[cluster] case class GossipOverview( seen: Set[UniqueAddress] = Set.empty, - unreachable: Set[Member] = Set.empty) { + reachability: Reachability = Reachability.empty) { override def toString = - s"GossipOverview(unreachable = [${unreachable.mkString(", ")}], seen = [${seen.mkString(", ")}])" + s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])" } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 2fa1cfdecf..c32cf4b5e7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -213,10 +213,9 @@ object MemberStatus { */ @SerialVersionUID(1L) private[cluster] case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] { - @transient - override lazy val hashCode = scala.util.hashing.MurmurHash3.productHash(this) + override def hashCode = uid - override def compare(that: UniqueAddress): Int = { + def compare(that: UniqueAddress): Int = { val result = Member.addressOrdering.compare(this.address, that.address) if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1 else result diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala new file mode 100644 index 0000000000..8de73861b2 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -0,0 +1,294 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.collection.breakOut +import akka.actor.Address + +/** + * INTERNAL API + */ +private[cluster] object Reachability { + val empty = new Reachability(Vector.empty, Map.empty) + + def apply(records: immutable.IndexedSeq[Record], versions: Map[UniqueAddress, Long]): Reachability = + new Reachability(records, versions) + + def create(records: immutable.Seq[Record], versions: Map[UniqueAddress, Long]): Reachability = records match { + case r: immutable.IndexedSeq[Record] ⇒ apply(r, versions) + case _ ⇒ apply(records.toVector, versions) + } + + @SerialVersionUID(1L) + case class Record(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus, version: Long) + + sealed trait ReachabilityStatus + @SerialVersionUID(1L) case object Reachable extends ReachabilityStatus + @SerialVersionUID(1L) case object Unreachable extends ReachabilityStatus + @SerialVersionUID(1L) case object Terminated extends ReachabilityStatus + +} + +/** + * INTERNAL API + * + * Immutable data structure that holds the reachability status of subject nodes as seen + * from observer nodes. Failure detector for the subject nodes exist on the + * observer nodes. Changes (reachable, unreachable, terminated) are only performed + * by observer nodes to its own records. Each change bumps the version number of the + * record, and thereby it is always possible to determine which record is newest when + * merging two instances. + * + * Aggregated status of a subject node is defined as (in this order): + * - Terminated if any observer node considers it as Terminated + * - Unreachable if any observer node considers it as Unreachable + * - Reachable otherwise, i.e. no observer node considers it as Unreachable + */ +@SerialVersionUID(1L) +private[cluster] class Reachability private ( + val records: immutable.IndexedSeq[Reachability.Record], + val versions: Map[UniqueAddress, Long]) extends Serializable { + + import Reachability._ + + private class Cache { + val (observerRowsMap, allUnreachable, allTerminated) = { + if (records.isEmpty) { + val observerRowsMap = Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]] + val allTerminated = Set.empty[UniqueAddress] + val allUnreachable = Set.empty[UniqueAddress] + (observerRowsMap, allUnreachable, allTerminated) + } else { + val mapBuilder = scala.collection.mutable.Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]] + import scala.collection.mutable.SetBuilder + val terminatedBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty) + val unreachableBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty) + + records foreach { r ⇒ + val m = mapBuilder.get(r.observer) match { + case None ⇒ Map(r.subject -> r) + case Some(m) ⇒ m.updated(r.subject, r) + } + mapBuilder += (r.observer -> m) + + if (r.status == Unreachable) unreachableBuilder += r.subject + else if (r.status == Terminated) terminatedBuilder += r.subject + } + + val observerRowsMap: Map[UniqueAddress, Map[UniqueAddress, Reachability.Record]] = mapBuilder.toMap + val allTerminated: Set[UniqueAddress] = terminatedBuilder.result() + val allUnreachable: Set[UniqueAddress] = unreachableBuilder.result() -- allTerminated + + (observerRowsMap, allUnreachable, allTerminated) + } + } + + val allUnreachableOrTerminated: Set[UniqueAddress] = + if (allTerminated.isEmpty) allUnreachable + else allUnreachable ++ allTerminated + + } + + @transient private lazy val cache = new Cache + + private def observerRows(observer: UniqueAddress): Option[Map[UniqueAddress, Reachability.Record]] = + cache.observerRowsMap.get(observer) + + def unreachable(observer: UniqueAddress, subject: UniqueAddress): Reachability = + change(observer, subject, Unreachable) + + def reachable(observer: UniqueAddress, subject: UniqueAddress): Reachability = + change(observer, subject, Reachable) + + def terminated(observer: UniqueAddress, subject: UniqueAddress): Reachability = + change(observer, subject, Terminated) + + private def currentVersion(observer: UniqueAddress): Long = versions.get(observer) match { + case None ⇒ 0 + case Some(v) ⇒ v + } + + private def nextVersion(observer: UniqueAddress): Long = currentVersion(observer) + 1 + + private def change(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus): Reachability = { + val v = nextVersion(observer) + val newVersions = versions.updated(observer, v) + val newRecord = Record(observer, subject, status, v) + observerRows(observer) match { + case None if status == Reachable ⇒ this + case None ⇒ + new Reachability(records :+ newRecord, newVersions) + + case Some(oldObserverRows) ⇒ + + oldObserverRows.get(subject) match { + case None ⇒ + if (status == Reachable && oldObserverRows.forall { case (_, r) ⇒ r.status == Reachable }) { + // all Reachable, prune by removing the records of the observer, and bump the version + new Reachability(records.filterNot(_.observer == observer), newVersions) + } else + new Reachability(records :+ newRecord, newVersions) + case Some(oldRecord) ⇒ + if (oldRecord.status == Terminated || oldRecord.status == status) + this + else { + if (status == Reachable && oldObserverRows.forall { case (_, r) ⇒ r.status == Reachable || r.subject == subject }) { + // all Reachable, prune by removing the records of the observer, and bump the version + new Reachability(records.filterNot(_.observer == observer), newVersions) + } else { + val newRecords = records.updated(records.indexOf(oldRecord), newRecord) + new Reachability(newRecords, newVersions) + } + } + } + } + } + + def merge(allowed: immutable.Set[UniqueAddress], other: Reachability): Reachability = { + val recordBuilder = new immutable.VectorBuilder[Record] + recordBuilder.sizeHint(math.max(this.records.size, other.records.size)) + var newVersions = versions + allowed foreach { observer ⇒ + val observerVersion1 = this.currentVersion(observer) + val observerVersion2 = other.currentVersion(observer) + + (this.observerRows(observer), other.observerRows(observer)) match { + case (None, None) ⇒ + case (Some(rows1), Some(rows2)) ⇒ + mergeObserverRows(rows1, rows2, observerVersion1, observerVersion2, recordBuilder) + case (Some(rows1), None) ⇒ + recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 ⇒ r } + case (None, Some(rows2)) ⇒ + recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 ⇒ r } + } + + if (observerVersion2 > observerVersion1) + newVersions += (observer -> observerVersion2) + } + + new Reachability(recordBuilder.result(), newVersions) + } + + private def mergeObserverRows( + rows1: Map[UniqueAddress, Reachability.Record], rows2: Map[UniqueAddress, Reachability.Record], + observerVersion1: Long, observerVersion2: Long, + recordBuilder: immutable.VectorBuilder[Record]): Unit = { + + val allSubjects = rows1.keySet ++ rows2.keySet + allSubjects foreach { subject ⇒ + (rows1.get(subject), rows2.get(subject)) match { + case (Some(r1), Some(r2)) ⇒ + recordBuilder += (if (r1.version > r2.version) r1 else r2) + case (Some(r1), None) ⇒ + if (r1.version > observerVersion2) + recordBuilder += r1 + case (None, Some(r2)) ⇒ + if (r2.version > observerVersion1) + recordBuilder += r2 + case (None, None) ⇒ + throw new IllegalStateException(s"Unexpected [$subject]") + } + } + } + + def remove(nodes: Iterable[UniqueAddress]): Reachability = { + val nodesSet = nodes.to[immutable.HashSet] + val newRecords = records.filterNot(r ⇒ nodesSet(r.observer) || nodesSet(r.subject)) + if (newRecords.size == records.size) this + else { + val newVersions = versions -- nodes + Reachability(newRecords, newVersions) + } + } + + def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus = + observerRows(observer) match { + case None ⇒ Reachable + case Some(observerRows) ⇒ observerRows.get(subject) match { + case None ⇒ Reachable + case Some(record) ⇒ record.status + } + } + + def status(node: UniqueAddress): ReachabilityStatus = + if (cache.allTerminated(node)) Terminated + else if (cache.allUnreachable(node)) Unreachable + else Reachable + + def isReachable(node: UniqueAddress): Boolean = isAllReachable || !allUnreachableOrTerminated.contains(node) + + def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean = + status(observer, subject) == Reachable + + def isAllReachable: Boolean = records.isEmpty + + /** + * Doesn't include terminated + */ + def allUnreachable: Set[UniqueAddress] = cache.allUnreachable + + def allUnreachableOrTerminated: Set[UniqueAddress] = cache.allUnreachableOrTerminated + + /** + * Doesn't include terminated + */ + def allUnreachableFrom(observer: UniqueAddress): Set[UniqueAddress] = + observerRows(observer) match { + case None ⇒ Set.empty + case Some(observerRows) ⇒ + observerRows.collect { + case (subject, record) if record.status == Unreachable ⇒ subject + }(breakOut) + } + + def observersGroupedByUnreachable: Map[UniqueAddress, Set[UniqueAddress]] = { + records.groupBy(_.subject).collect { + case (subject, records) if records.exists(_.status == Unreachable) ⇒ + val observers: Set[UniqueAddress] = + records.collect { case r if r.status == Unreachable ⇒ r.observer }(breakOut) + (subject -> observers) + } + } + + def allObservers: Set[UniqueAddress] = versions.keySet + + def recordsFrom(observer: UniqueAddress): immutable.IndexedSeq[Record] = { + observerRows(observer) match { + case None ⇒ Vector.empty + case Some(rows) ⇒ rows.valuesIterator.toVector + } + } + + // only used for testing + override def hashCode: Int = versions.hashCode + + // only used for testing + override def equals(obj: Any): Boolean = obj match { + case other: Reachability ⇒ + records.size == other.records.size && versions == versions && + cache.observerRowsMap == other.cache.observerRowsMap + case _ ⇒ false + } + + override def toString: String = { + val rows = for { + observer ← versions.keys.toSeq.sorted + rowsOption = observerRows(observer) + if rowsOption.isDefined // compilation err for subject <- rowsOption + rows = rowsOption.get + subject ← rows.keys.toSeq.sorted + } yield { + val record = rows(subject) + val aggregated = status(subject) + s"${observer.address} -> ${subject.address}: ${record.status} [$aggregated] (${record.version})" + "" + } + + rows.mkString(", ") + } + +} + diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 680a63c9e5..47babf0a0b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -44,6 +44,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ classOf[InternalClusterAction.InitJoinNack] -> (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))), + classOf[ClusterHeartbeatReceiver.EndHeartbeatAck] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeatAck(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))), classOf[GossipStatus] -> gossipStatusFromBinary, classOf[GossipEnvelope] -> gossipEnvelopeFromBinary, @@ -67,6 +68,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ case InternalClusterAction.InitJoinAck(address) ⇒ addressToProto(address).toByteArray case InternalClusterAction.InitJoinNack(address) ⇒ addressToProto(address).toByteArray case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ addressToProto(from).toByteArray + case ClusterHeartbeatReceiver.EndHeartbeatAck(from) ⇒ addressToProto(from).toByteArray case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ addressToProto(from).toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") } @@ -132,6 +134,13 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private val memberStatusFromInt = memberStatusToInt.map { case (a, b) ⇒ (b, a) } + private val reachabilityStatusToInt = scala.collection.immutable.HashMap[Reachability.ReachabilityStatus, Int]( + Reachability.Reachable -> msg.ReachabilityStatus.Reachable_VALUE, + Reachability.Unreachable -> msg.ReachabilityStatus.Unreachable_VALUE, + Reachability.Terminated -> msg.ReachabilityStatus.Terminated_VALUE) + + private val reachabilityStatusFromInt = reachabilityStatusToInt.map { case (a, b) ⇒ (b, a) } + private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match { case Some(x) ⇒ x case _ ⇒ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message") @@ -139,8 +148,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def gossipToProto(gossip: Gossip): msg.Gossip = { import scala.collection.breakOut - val allMembers = (gossip.members.iterator ++ gossip.overview.unreachable.iterator).toIndexedSeq - val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)(breakOut) + val allMembers = gossip.members.toVector + val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress) val addressMapping = allAddresses.zipWithIndex.toMap val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc ++ m.roles).to[Vector] val roleMapping = allRoles.zipWithIndex.toMap @@ -154,11 +163,21 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber, msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut)) - val unreachable: Vector[msg.Member] = gossip.overview.unreachable.map(memberToProto)(breakOut) + def reachabilityToProto(reachability: Reachability): Vector[msg.ObserverReachability] = { + reachability.versions.map { + case (observer, version) ⇒ + val subjectReachability = reachability.recordsFrom(observer).map(r ⇒ + msg.SubjectReachability(mapUniqueAddress(r.subject), + msg.ReachabilityStatus.valueOf(reachabilityStatusToInt(r.status)), r.version)) + msg.ObserverReachability(mapUniqueAddress(observer), version, subjectReachability) + }(breakOut) + } + + val reachability = reachabilityToProto(gossip.overview.reachability) val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut) val seen: Vector[Int] = gossip.overview.seen.map(mapUniqueAddress)(breakOut) - val overview = msg.GossipOverview(seen, unreachable) + val overview = msg.GossipOverview(seen, reachability) msg.Gossip(allAddresses.map(uniqueAddressToProto), allRoles, allHashes, members, overview, vectorClockToProto(gossip.version, hashMapping)) @@ -192,14 +211,31 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val roleMapping = gossip.allRoles val hashMapping = gossip.allHashes + def reachabilityFromProto(observerReachability: immutable.Seq[msg.ObserverReachability]): Reachability = { + val recordBuilder = new immutable.VectorBuilder[Reachability.Record] + val versionsBuilder = new scala.collection.mutable.MapBuilder[UniqueAddress, Long, Map[UniqueAddress, Long]](Map.empty) + for (o ← observerReachability) { + val observer = addressMapping(o.addressIndex) + versionsBuilder += ((observer, o.version)) + for (s ← o.subjectReachability) { + val subject = addressMapping(s.addressIndex) + val record = Reachability.Record(observer, subject, reachabilityStatusFromInt(s.status), s.version) + recordBuilder += record + } + } + + Reachability.create(recordBuilder.result(), versionsBuilder.result()) + } + def memberFromProto(member: msg.Member) = new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id), member.rolesIndexes.map(roleMapping)(breakOut)) val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut) - val unreachable: immutable.Set[Member] = gossip.overview.unreachable.map(memberFromProto)(breakOut) + + val reachability = reachabilityFromProto(gossip.overview.observerReachability) val seen: Set[UniqueAddress] = gossip.overview.seen.map(addressMapping)(breakOut) - val overview = GossipOverview(seen, unreachable) + val overview = GossipOverview(seen, reachability) Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping)) } diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 048a3087b4..fd2e7c2a08 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -274,7 +274,7 @@ private[akka] class ClusterRouterActor(override val supervisorStrategy: Supervis // re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[UnreachableMember]) + cluster.subscribe(self, classOf[ReachabilityEvent]) } override def postStop(): Unit = cluster.unsubscribe(self) @@ -289,6 +289,13 @@ private[akka] class ClusterRouterActor(override val supervisorStrategy: Supervis def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef) + def registerRoutees(member: Member) = { + routeeProvider.nodes += member.address + // createRoutees will create routees based on + // totalInstances and maxInstancesPerNode + routeeProvider.createRoutees() + } + def unregisterRoutees(member: Member) = { val address = member.address routeeProvider.nodes -= address @@ -309,17 +316,18 @@ private[akka] class ClusterRouterActor(override val supervisorStrategy: Supervis routeeProvider.createRoutees() case m: MemberEvent if routeeProvider.isAvailable(m.member) ⇒ - routeeProvider.nodes += m.member.address - // createRoutees will create routees based on - // totalInstances and maxInstancesPerNode - routeeProvider.createRoutees() + registerRoutees(m.member) case other: MemberEvent ⇒ // other events means that it is no longer interesting, such as - // MemberJoined, MemberLeft, MemberExited, MemberRemoved + // MemberExited, MemberRemoved unregisterRoutees(other.member) case UnreachableMember(m) ⇒ unregisterRoutees(m) + + case ReachableMember(m) ⇒ + if (routeeProvider.isAvailable(m)) + registerRoutees(m) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index d1b6e7cdc4..e11d0da743 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -4,6 +4,7 @@ package akka.cluster import com.typesafe.config.ConfigFactory +import scala.concurrent.duration._ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 2777d1b300..4e0e12a743 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -6,6 +6,7 @@ package akka.cluster import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction import scala.concurrent.duration._ import akka.testkit._ @@ -17,6 +18,8 @@ object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) } class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec @@ -44,6 +47,47 @@ abstract class ClusterAccrualFailureDetectorSpec enterBarrier("after-1") } + "mark node as 'unavailable' when network partition and then back to 'available' when partition is healed" taggedAs + LongRunningTest in { + runOn(first) { + testConductor.blackhole(first, second, Direction.Both).await + } + + enterBarrier("broken") + + runOn(first) { + // detect failure... + awaitCond(!cluster.failureDetector.isAvailable(second), 15.seconds) + // other connections still ok + cluster.failureDetector.isAvailable(third) must be(true) + } + + runOn(second) { + // detect failure... + awaitCond(!cluster.failureDetector.isAvailable(first), 15.seconds) + // other connections still ok + cluster.failureDetector.isAvailable(third) must be(true) + } + + enterBarrier("partitioned") + + runOn(first) { + testConductor.passThrough(first, second, Direction.Both).await + } + + enterBarrier("repaired") + + runOn(first, third) { + awaitCond(cluster.failureDetector.isAvailable(second), 15.seconds) + } + + runOn(second) { + awaitCond(cluster.failureDetector.isAvailable(first), 15.seconds) + } + + enterBarrier("after-2") + } + "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { runOn(first) { testConductor.exit(third, 0).await @@ -59,7 +103,7 @@ abstract class ClusterAccrualFailureDetectorSpec cluster.failureDetector.isAvailable(second) must be(true) } - enterBarrier("after-2") + enterBarrier("after-3") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 31f5a1952c..06ec9dfc74 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -71,7 +71,7 @@ abstract class ClusterDeathWatchSpec } "An actor watching a remote actor in the cluster" must { - "receive Terminated when watched node becomes Down" taggedAs LongRunningTest in within(20 seconds) { + "receive Terminated when watched node becomes Down/Removed" taggedAs LongRunningTest in within(20 seconds) { awaitClusterUp(first, second, third, fourth) enterBarrier("cluster-up") @@ -103,10 +103,10 @@ abstract class ClusterDeathWatchSpec enterBarrier("second-terminated") markNodeAsUnavailable(third) - awaitAssert(clusterView.members.map(_.address) must not contain (address(third))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(third))) cluster.down(third) // removed + awaitAssert(clusterView.members.map(_.address) must not contain (address(third))) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(third))) expectMsg(path3) enterBarrier("third-terminated") @@ -119,10 +119,10 @@ abstract class ClusterDeathWatchSpec enterBarrier("watch-established") runOn(third) { markNodeAsUnavailable(second) - awaitAssert(clusterView.members.map(_.address) must not contain (address(second))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(second))) cluster.down(second) // removed + awaitAssert(clusterView.members.map(_.address) must not contain (address(second))) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(second))) } enterBarrier("second-terminated") @@ -194,11 +194,11 @@ abstract class ClusterDeathWatchSpec runOn(fourth) { markNodeAsUnavailable(fifth) - awaitAssert(clusterView.members.map(_.address) must not contain (address(fifth))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(fifth))) cluster.down(fifth) // removed awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(fifth))) + awaitAssert(clusterView.members.map(_.address) must not contain (address(fifth))) } enterBarrier("fifth-terminated") @@ -226,11 +226,11 @@ abstract class ClusterDeathWatchSpec enterBarrier("hello-deployed") markNodeAsUnavailable(first) - awaitAssert(clusterView.members.map(_.address) must not contain (address(first))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(first))) cluster.down(first) // removed awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(first))) + awaitAssert(clusterView.members.map(_.address) must not contain (address(first))) expectTerminated(hello) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 60b9af29c6..1b932b9a31 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -70,12 +70,11 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) within(28 seconds) { // third becomes unreachable awaitAssert(clusterView.unreachableMembers.size must be(1)) - awaitAssert(clusterView.members.size must be(2)) - awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up))) awaitSeenSameState(first, second) // still one unreachable clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.head.address must be(thirdAddress) + clusterView.members.size must be(3) } } @@ -96,7 +95,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) runOn(first, second, fourth) { for (n ← 1 to 5) { - awaitAssert(clusterView.members.size must be(2)) + awaitAssert(clusterView.members.size must be(3)) awaitSeenSameState(first, second, fourth) memberStatus(first) must be(Some(MemberStatus.Up)) memberStatus(second) must be(Some(MemberStatus.Up)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index e755051a55..914db4ee78 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -95,8 +95,9 @@ abstract class MBeanSpec enterBarrier("after-4") } - "support down" taggedAs LongRunningTest in within(20 seconds) { - val fourthAddress = address(fourth) + val fourthAddress = address(fourth) + + "format cluster status as JSON with full reachability info" taggedAs LongRunningTest in within(30 seconds) { runOn(first) { testConductor.exit(fourth, 0).await } @@ -104,11 +105,62 @@ abstract class MBeanSpec runOn(first, second, third) { awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") must be(fourthAddress.toString)) - val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(",") + val expectedMembers = Seq(first, second, third, fourth).sorted.map(address(_)).mkString(",") awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") must be(expectedMembers)) } enterBarrier("fourth-unreachable") + runOn(first) { + val sortedNodes = Vector(first, second, third, fourth).sorted.map(address(_)) + val unreachableObservedBy = Vector(first, second, third).sorted.map(address(_)) + val expectedJson = + s"""{ + | "self-address": "${address(first)}", + | "members": [ + | { + | "address": "${sortedNodes(0)}", + | "status": "Up" + | }, + | { + | "address": "${sortedNodes(1)}", + | "status": "Up" + | }, + | { + | "address": "${sortedNodes(2)}", + | "status": "Up" + | }, + | { + | "address": "${sortedNodes(3)}", + | "status": "Up" + | } + | ], + | "unreachable": [ + | { + | "node": "${address(fourth)}", + | "observed-by": [ + | "${unreachableObservedBy(0)}", + | "${unreachableObservedBy(1)}", + | "${unreachableObservedBy(2)}" + | ] + | } + | ] + |} + |""".stripMargin + + // awaitAssert to make sure that all nodes detects unreachable + within(5.seconds) { + awaitAssert(mbeanServer.getAttribute(mbeanName, "ClusterStatus") must be(expectedJson)) + } + } + + enterBarrier("after-5") + + } + + "support down" taggedAs LongRunningTest in within(20 seconds) { + + // fourth unreachable in previous step + runOn(second) { mbeanServer.invoke(mbeanName, "down", Array(fourthAddress.toString), Array("java.lang.String")) } @@ -120,7 +172,7 @@ abstract class MBeanSpec awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")) } - enterBarrier("after-5") + enterBarrier("after-6") } "support leave" taggedAs LongRunningTest in within(20 seconds) { @@ -142,7 +194,7 @@ abstract class MBeanSpec }) } - enterBarrier("after-6") + enterBarrier("after-7") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 93ecde01e7..1dbf973521 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -38,12 +38,13 @@ object MultiNodeClusterSpec { jmx.enabled = off gossip-interval = 200 ms leader-actions-interval = 200 ms - unreachable-nodes-reaper-interval = 200 ms + unreachable-nodes-reaper-interval = 500 ms periodic-tasks-initial-delay = 300 ms publish-stats-interval = 0 s # always, when it happens - failure-detector.heartbeat-interval = 400 ms + failure-detector.heartbeat-interval = 500 ms } akka.loglevel = INFO + akka.log-dead-letters = off akka.log-dead-letters-during-shutdown = off akka.remote.log-remote-lifecycle-events = off akka.loggers = ["akka.testkit.TestEventListener"] @@ -114,8 +115,10 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro classOf[InternalClusterAction.Tick], classOf[akka.actor.PoisonPill], classOf[akka.dispatch.sysmsg.DeathWatchNotification], - akka.remote.transport.AssociationHandle.Disassociated.getClass, - akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass, + classOf[akka.remote.transport.AssociationHandle.Disassociated], + // akka.remote.transport.AssociationHandle.Disassociated.getClass, + classOf[akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying], + // akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass, classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys) } @@ -125,6 +128,10 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro if (!sys.log.isDebugEnabled) sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*"))) + def muteMarkingAsReachable(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) + sys.eventStream.publish(Mute(EventFilter.info(pattern = ".*Marking.* as REACHABLE.*"))) + override def afterAll(): Unit = { if (!log.isDebugEnabled) { muteDeadLetters()() @@ -292,7 +299,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro } } - def awaitAllReachable(): Unit = awaitAssert(clusterView.unreachableMembers.isEmpty) + def awaitAllReachable(): Unit = + awaitAssert(clusterView.unreachableMembers must be(Set.empty)) /** * Wait until the specified nodes have seen the same gossip overview. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 4320f406df..fdaeeb55db 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -68,7 +68,6 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) } "detect network partition and mark nodes on other side as unreachable and form new cluster" taggedAs LongRunningTest in within(30 seconds) { - val thirdAddress = address(third) enterBarrier("before-split") runOn(first) { @@ -79,20 +78,15 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) } enterBarrier("after-split") - runOn(side1.last) { - for (role ← side2) markNodeAsUnavailable(role) - } - runOn(side2.last) { - for (role ← side1) markNodeAsUnavailable(role) - } - runOn(side1: _*) { + for (role ← side2) markNodeAsUnavailable(role) // auto-down = on awaitMembersUp(side1.size, side2.toSet map address) assertLeader(side1: _*) } runOn(side2: _*) { + for (role ← side1) markNodeAsUnavailable(role) // auto-down = on awaitMembersUp(side2.size, side1.toSet map address) assertLeader(side2: _*) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 9c4a96fe79..f224ca6b77 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -109,7 +109,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { # by tree-width (number of children for each actor) and # tree-levels, total number of actors can be calculated by # (width * math.pow(width, levels) - 1) / (width - 1) - tree-width = 5 + tree-width = 4 tree-levels = 4 report-metrics-interval = 10s # scale convergence within timeouts with this factor diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala new file mode 100644 index 0000000000..f551e595f1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala @@ -0,0 +1,338 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import scala.concurrent.duration._ +import akka.testkit._ +import akka.testkit.TestEvent._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.remote.testconductor.RoleName +import akka.actor.Props +import akka.actor.Actor +import scala.util.control.NoStackTrace +import akka.remote.QuarantinedEvent +import akka.actor.ExtendedActorSystem +import akka.remote.RemoteActorRefProvider +import akka.actor.ActorRef +import akka.dispatch.sysmsg.Failed + +object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + val sixth = role("sixth") + val seventh = role("seventh") + val eighth = role("eighth") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString("akka.remote.system-message-buffer-size=20")). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) + + deployOn(second, """"/parent/*" { + remote = "@third@" + }""") + + class Parent extends Actor { + def receive = { + case p: Props ⇒ sender ! context.actorOf(p) + } + } + + class RemoteChild extends Actor { + import context.dispatcher + context.system.scheduler.scheduleOnce(500.millis, self, "boom") + def receive = { + case "boom" ⇒ throw new SimulatedException + case x ⇒ sender ! x + } + } + + class SimulatedException extends RuntimeException("Simulated") with NoStackTrace +} + +class SurviveNetworkInstabilityMultiJvmNode1 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode2 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode3 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode4 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode5 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode6 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode7 extends SurviveNetworkInstabilitySpec +class SurviveNetworkInstabilityMultiJvmNode8 extends SurviveNetworkInstabilitySpec + +abstract class SurviveNetworkInstabilitySpec + extends MultiNodeSpec(SurviveNetworkInstabilityMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender { + + import SurviveNetworkInstabilityMultiJvmSpec._ + + // muteMarkingAsUnreachable() + // muteMarkingAsReachable() + + override def expectedTestDuration = 3.minutes + + def assertUnreachable(subjects: RoleName*): Unit = { + val expected = subjects.toSet map address + awaitAssert(clusterView.unreachableMembers.map(_.address) must be(expected)) + } + + "A network partition tolerant cluster" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth, fifth) + + enterBarrier("after-1") + } + + "heal after a broken pair" taggedAs LongRunningTest in within(30.seconds) { + runOn(first) { + testConductor.blackhole(first, second, Direction.Both).await + } + enterBarrier("blackhole-2") + + runOn(first) { assertUnreachable(second) } + runOn(second) { assertUnreachable(first) } + runOn(third, fourth, fifth) { + assertUnreachable(first, second) + } + + enterBarrier("unreachable-2") + + runOn(first) { + testConductor.passThrough(first, second, Direction.Both).await + } + enterBarrier("repair-2") + + // This test illustrates why we can't ignore gossip from unreachable aggregated + // status. If all third, fourth, and fifth has been infected by first and second + // unreachable they must accept gossip from first and second when their + // broken connection has healed, otherwise they will be isolated forever. + + awaitAllReachable() + enterBarrier("after-2") + } + + "heal after one isolated node" taggedAs LongRunningTest in within(30.seconds) { + val others = Vector(second, third, fourth, fifth) + runOn(first) { + for (other ← others) { + testConductor.blackhole(first, other, Direction.Both).await + } + } + enterBarrier("blackhole-3") + + runOn(first) { assertUnreachable(others: _*) } + runOn(others: _*) { + assertUnreachable(first) + } + + enterBarrier("unreachable-3") + + runOn(first) { + for (other ← others) { + testConductor.passThrough(first, other, Direction.Both).await + } + } + enterBarrier("repair-3") + awaitAllReachable() + enterBarrier("after-3") + } + + "heal two isolated islands" taggedAs LongRunningTest in within(30.seconds) { + val island1 = Vector(first, second) + val island2 = Vector(third, fourth, fifth) + runOn(first) { + // split the cluster in two parts (first, second) / (third, fourth, fifth) + for (role1 ← island1; role2 ← island2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("blackhole-4") + + runOn(island1: _*) { + assertUnreachable(island2: _*) + } + runOn(island2: _*) { + assertUnreachable(island1: _*) + } + + enterBarrier("unreachable-4") + + runOn(first) { + for (role1 ← island1; role2 ← island2) { + testConductor.passThrough(role1, role2, Direction.Both).await + } + } + enterBarrier("repair-4") + awaitAllReachable() + enterBarrier("after-4") + } + + "heal after unreachable when ring is changed" taggedAs LongRunningTest in within(45.seconds) { + val joining = Vector(sixth, seventh) + val others = Vector(second, third, fourth, fifth) + runOn(first) { + for (role1 ← (joining :+ first); role2 ← others) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("blackhole-5") + + runOn(first) { assertUnreachable(others: _*) } + runOn(others: _*) { assertUnreachable(first) } + + enterBarrier("unreachable-5") + + runOn(joining: _*) { + cluster.join(first) + + // let them join and stabilize heartbeating + Thread.sleep(5000) + } + + enterBarrier("joined-5") + + runOn((joining :+ first): _*) { assertUnreachable(others: _*) } + // others doesn't know about the joining nodes yet, no gossip passed through + runOn(others: _*) { assertUnreachable(first) } + + enterBarrier("more-unreachable-5") + + runOn(first) { + for (role1 ← (joining :+ first); role2 ← others) { + testConductor.passThrough(role1, role2, Direction.Both).await + } + } + + enterBarrier("repair-5") + runOn((joining ++ others): _*) { + awaitAllReachable() + // eighth not joined yet + awaitMembersUp(roles.size - 1) + } + enterBarrier("after-5") + } + + "down and remove quarantined node" taggedAs LongRunningTest in within(45.seconds) { + val others = Vector(first, third, fourth, fifth, sixth, seventh) + + runOn(second) { + val sysMsgBufferSize = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider]. + remoteSettings.SysMsgBufferSize + val parent = system.actorOf(Props[Parent], "parent") + // fill up the system message redeliver buffer with many failing actors + for (_ ← 1 to sysMsgBufferSize + 1) { + // remote deployment to third + parent ! Props[RemoteChild] + val child = expectMsgType[ActorRef] + child ! "hello" + expectMsg("hello") + lastSender.path.address must be(address(third)) + } + } + runOn(third) { + // undelivered system messages in RemoteChild on third should trigger QuarantinedEvent + system.eventStream.subscribe(testActor, classOf[QuarantinedEvent]) + + // after quarantined it will drop the Failed messages to deadLetters + muteDeadLetters(classOf[Failed])(system) + } + enterBarrier("children-deployed") + + runOn(first) { + for (role ← others) + testConductor.blackhole(second, role, Direction.Send).await + } + enterBarrier("blackhole-6") + + runOn(third) { + // undelivered system messages in RemoteChild on third should trigger QuarantinedEvent + within(10.seconds) { + expectMsgType[QuarantinedEvent].address must be(address(second)) + } + system.eventStream.unsubscribe(testActor, classOf[QuarantinedEvent]) + } + enterBarrier("quarantined") + + runOn(others: _*) { + // second should be removed because of quarantine + awaitAssert(clusterView.members.map(_.address) must not contain (address(second))) + } + + enterBarrier("after-6") + } + + "continue and move Joining to Up after downing of one half" taggedAs LongRunningTest in within(45.seconds) { + // note that second is already removed in previous step + val side1 = Vector(first, third, fourth) + val side1AfterJoin = side1 :+ eighth + val side2 = Vector(fifth, sixth, seventh) + runOn(first) { + for (role1 ← side1AfterJoin; role2 ← side2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("blackhole-7") + + runOn(side1: _*) { assertUnreachable(side2: _*) } + runOn(side2: _*) { assertUnreachable(side1: _*) } + + enterBarrier("unreachable-7") + + runOn(eighth) { + cluster.join(third) + } + runOn(fourth) { + for (role2 ← side2) { + cluster.down(role2) + } + } + + enterBarrier("downed-7") + + runOn(side1AfterJoin: _*) { + // side2 removed + val expected = (side1AfterJoin map address).toSet + awaitAssert(clusterView.members.map(_.address) must be(expected)) + awaitAssert(clusterView.members.collectFirst { case m if m.address == address(eighth) ⇒ m.status } must be( + Some(MemberStatus.Up))) + } + + enterBarrier("side2-removed") + + runOn(first) { + for (role1 ← side1AfterJoin; role2 ← side2) { + testConductor.passThrough(role1, role2, Direction.Both).await + } + } + enterBarrier("repair-7") + + // side2 should not detect side1 as reachable again + Thread.sleep(10000) + + runOn(side1AfterJoin: _*) { + val expected = (side1AfterJoin map address).toSet + clusterView.members.map(_.address) must be(expected) + } + + runOn(side2: _*) { + val expected = ((side2 ++ side1) map address).toSet + clusterView.members.map(_.address) must be(expected) + assertUnreachable(side1: _*) + } + + enterBarrier("after-7") + } + + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index 931018c52f..73d1e34030 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -91,8 +91,6 @@ abstract class UnreachableNodeJoinsAgainSpec awaitAssert { val members = clusterView.members clusterView.unreachableMembers.size must be(roles.size - 1) - members.size must be(1) - members.map(_.status) must be(Set(MemberStatus.Up)) } clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) } @@ -105,13 +103,12 @@ abstract class UnreachableNodeJoinsAgainSpec awaitAssert { val members = clusterView.members clusterView.unreachableMembers.size must be(1) - members.size must be(roles.size - 1) - members.map(_.status) must be(Set(MemberStatus.Up)) } awaitSeenSameState(allButVictim map address: _*) // still one unreachable clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.head.address must be(node(victim).address) + clusterView.unreachableMembers.head.status must be(MemberStatus.Up) } } @@ -123,10 +120,12 @@ abstract class UnreachableNodeJoinsAgainSpec cluster down victim } - runOn(allBut(victim): _*) { - awaitMembersUp(roles.size - 1, Set(victim)) + val allButVictim = allBut(victim, roles) + runOn(allButVictim: _*) { // eventually removed + awaitMembersUp(roles.size - 1, Set(victim)) awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds) + awaitAssert(clusterView.members.map(_.address) must be((allButVictim map address).toSet)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index ecd6428420..c01be91221 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -21,6 +21,7 @@ import akka.routing.RoundRobinRouter import akka.routing.RoutedActorRef import akka.routing.RouterRoutees import akka.testkit._ +import akka.remote.transport.ThrottlerTransportAdapter.Direction object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { @@ -86,6 +87,8 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]""")) nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]""")) + testTransport(on = true) + } class ClusterRoundRobinRoutedActorMultiJvmNode1 extends ClusterRoundRobinRoutedActorSpec @@ -300,6 +303,31 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-8") } + "remove routees for unreachable nodes, and add when reachable again" taggedAs LongRunningTest in within(30.seconds) { + + // myservice is already running + + def routees = currentRoutees(router4) + def routeeAddresses = (routees map fullAddress).toSet + + runOn(first) { + // 4 nodes, 1 routee on each node + awaitAssert(currentRoutees(router4).size must be(4)) + + testConductor.blackhole(first, second, Direction.Both).await + + awaitAssert(routees.size must be(3)) + routeeAddresses must not contain (address(second)) + + testConductor.passThrough(first, second, Direction.Both).await + awaitAssert(routees.size must be(4)) + routeeAddresses must contain(address(second)) + + } + + enterBarrier("after-9") + } + "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in { muteMarkingAsUnreachable() @@ -313,7 +341,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou val downRoutee = routees.find(_.path.address == downAddress).get cluster.down(downAddress) - expectMsgType[Terminated].actor must be(downRoutee) + expectMsgType[Terminated](15.seconds).actor must be(downRoutee) awaitAssert { routeeAddresses must contain(notUsedAddress) routeeAddresses must not contain (downAddress) @@ -330,7 +358,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-9") + enterBarrier("after-10") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 74d0601c69..e4867b03b2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -30,7 +30,6 @@ class ClusterConfigSpec extends AkkaSpec { PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) - NumberOfEndHeartbeats must be(8) MonitoredByNrOfMembers must be(5) HeartbeatRequestDelay must be(10 seconds) HeartbeatExpectedResponseAfter must be(3 seconds) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index bac45879c2..be62d78e1a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -65,13 +65,33 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { } "be produced for members in unreachable" in { - val g1 = Gossip(members = SortedSet(aUp, bUp), overview = GossipOverview(unreachable = Set(cUp, eUp))) - val g2 = Gossip(members = SortedSet(aUp), overview = GossipOverview(unreachable = Set(cUp, bDown, eDown))) + val reachability1 = Reachability.empty. + unreachable(aUp.uniqueAddress, cUp.uniqueAddress). + unreachable(aUp.uniqueAddress, eUp.uniqueAddress) + val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, eUp), overview = GossipOverview(reachability = reachability1)) + val reachability2 = reachability1. + unreachable(aUp.uniqueAddress, bDown.uniqueAddress) + val g2 = Gossip(members = SortedSet(aUp, cUp, bDown, eDown), overview = GossipOverview(reachability = reachability2)) diffUnreachable(g1, g2) must be(Seq(UnreachableMember(bDown))) diffSeen(g1, g2) must be(Seq.empty) } + "be produced for members becoming reachable after unreachable" in { + val reachability1 = Reachability.empty. + unreachable(aUp.uniqueAddress, cUp.uniqueAddress).reachable(aUp.uniqueAddress, cUp.uniqueAddress). + unreachable(aUp.uniqueAddress, eUp.uniqueAddress). + unreachable(aUp.uniqueAddress, bUp.uniqueAddress) + val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, eUp), overview = GossipOverview(reachability = reachability1)) + val reachability2 = reachability1. + unreachable(aUp.uniqueAddress, cUp.uniqueAddress). + reachable(aUp.uniqueAddress, bUp.uniqueAddress) + val g2 = Gossip(members = SortedSet(aUp, cUp, bUp, eUp), overview = GossipOverview(reachability = reachability2)) + + diffUnreachable(g1, g2) must be(Seq(UnreachableMember(cUp))) + diffReachable(g1, g2) must be(Seq(ReachableMember(bUp))) + } + "be produced for removed members" in { val (g1, _) = converge(Gossip(members = SortedSet(aUp, dExiting))) val (g2, s2) = converge(Gossip(members = SortedSet(aUp))) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index 2a61d4577c..adf9edb1c9 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -40,7 +40,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { val s = emptyState.addHeartbeatRequest(aa, Deadline.now - 30.seconds).removeOverdueHeartbeatRequest() s.heartbeatRequest must be(Map.empty) s.active must be(Set.empty) - s.ending must be(Map(aa -> 0)) + s.ending must be(Set(aa)) } "remove heartbeatRequest after reset" in { @@ -56,22 +56,22 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { "remove heartbeatRequest after removeMember" in { val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)).removeMember(aa) s.heartbeatRequest must be(Map.empty) - s.ending must be(Map(aa -> 0)) + s.ending must be(Set(aa)) } "remove from ending after addHeartbeatRequest" in { val s = emptyState.reset(HashSet(aa, bb)).removeMember(aa) - s.ending must be(Map(aa -> 0)) + s.ending must be(Set(aa)) val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds) s2.heartbeatRequest.keySet must be(Set(aa)) - s2.ending must be(Map.empty) + s2.ending must be(Set.empty) } "include nodes from reset in active set" in { val nodes = HashSet(aa, bb, cc) val s = emptyState.reset(nodes) s.current must be(nodes) - s.ending must be(Map.empty) + s.ending must be(Set.empty) s.active must be(nodes) } @@ -85,21 +85,21 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { "move member to ending set when removing member" in { val nodes = HashSet(aa, bb, cc, dd, ee) val s = emptyState.reset(nodes) - s.ending must be(Map.empty) + s.ending must be(Set.empty) val included = s.current.head val s2 = s.removeMember(included) - s2.ending must be(Map(included -> 0)) + s2.ending must be(Set(included)) s2.current must not contain (included) val s3 = s2.addMember(included) s3.current must contain(included) - s3.ending.keySet must not contain (included) + s3.ending must not contain (included) } - "increase ending count correctly" in { + "remove ending correctly" in { val s = emptyState.reset(HashSet(aa)).removeMember(aa) - s.ending must be(Map(aa -> 0)) - val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa) - s2.ending must be(Map(aa -> 2)) + s.ending must be(Set(aa)) + val s2 = s.removeEnding(aa) + s2.ending must be(Set.empty) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index fe89f4536e..72ef78beb2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -47,40 +47,17 @@ class GossipSpec extends WordSpec with MustMatchers { } - "merge unreachable by status priority" in { - val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1))) - val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) + "merge unreachable" in { + val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress).unreachable(b1.uniqueAddress, c1.uniqueAddress) + val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1)) + val r2 = Reachability.empty.unreachable(a1.uniqueAddress, d1.uniqueAddress) + val g2 = Gossip(members = SortedSet(a1, b1, c1, d1), overview = GossipOverview(reachability = r2)) val merged1 = g1 merge g2 - merged1.overview.unreachable must be(Set(a2, b2, c1, d2)) - merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged1.overview.reachability.allUnreachable must be(Set(a1.uniqueAddress, c1.uniqueAddress, d1.uniqueAddress)) val merged2 = g2 merge g1 - merged2.overview.unreachable must be(Set(a2, b2, c1, d2)) - merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) - - } - - "merge by excluding unreachable from members" in { - val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c1, d1))) - val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2))) - - val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a2)) - merged1.members.toSeq.map(_.status) must be(Seq(Up)) - merged1.overview.unreachable must be(Set(b2, c1, d2)) - merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) - - val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a2)) - merged2.members.toSeq.map(_.status) must be(Seq(Up)) - merged2.overview.unreachable must be(Set(b2, c1, d2)) - merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) - - } - - "not have node in both members and unreachable" in intercept[IllegalArgumentException] { - Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2))) + merged2.overview.reachability.allUnreachable must be(merged1.overview.reachability.allUnreachable) } "not have live members with wrong status" in intercept[IllegalArgumentException] { @@ -121,9 +98,11 @@ class GossipSpec extends WordSpec with MustMatchers { "know who is youngest" in { // a2 and e1 is Joining - val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3)), overview = GossipOverview(unreachable = Set(e1))) + val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability = + Reachability.empty.unreachable(a2.uniqueAddress, e1.uniqueAddress))) g1.youngestMember must be(b1) - val g2 = Gossip(members = SortedSet(a2), overview = GossipOverview(unreachable = Set(b1.copyUp(3), e1))) + val g2 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability = + Reachability.empty.unreachable(a2.uniqueAddress, b1.uniqueAddress).unreachable(a2.uniqueAddress, e1.uniqueAddress))) g2.youngestMember must be(b1) val g3 = Gossip(members = SortedSet(a2, b1.copyUp(3), e2.copyUp(4))) g3.youngestMember must be(e2) diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala new file mode 100644 index 0000000000..b217be3440 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.ShouldMatchers +import akka.actor.Address + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ReachabilityPerfSpec extends WordSpec with ShouldMatchers { + + val nodesSize = sys.props.get("akka.cluster.ReachabilityPerfSpec.nodesSize").getOrElse("250").toInt + val iterations = sys.props.get("akka.cluster.ReachabilityPerfSpec.iterations").getOrElse("10000").toInt + + val address = Address("akka.tcp", "sys", "a", 2552) + val node = Address("akka.tcp", "sys", "a", 2552) + + def createReachabilityOfSize(base: Reachability, size: Int): Reachability = + (base /: (1 to size)) { + case (r, i) ⇒ + val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i) + val j = if (i == size) 1 else i + 1 + val subject = UniqueAddress(address.copy(host = Some("node-" + j)), j) + r.unreachable(observer, subject).reachable(observer, subject) + } + + def addUnreachable(base: Reachability, count: Int): Reachability = { + val observers = base.allObservers.take(count) + val subjects = Stream.continually(base.allObservers).flatten.iterator + (base /: observers) { + case (r, o) ⇒ + (r /: (1 to 5)) { case (r, _) ⇒ r.unreachable(o, subjects.next()) } + } + } + + val reachability1 = createReachabilityOfSize(Reachability.empty, nodesSize) + val reachability2 = createReachabilityOfSize(reachability1, nodesSize) + val reachability3 = addUnreachable(reachability1, nodesSize / 2) + val allowed = reachability1.allObservers + + def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) ⇒ Unit, times: Int): Unit = { + for (i ← 1 to times) { + thunk(Reachability(r1.records, r1.versions), Reachability(r2.records, r2.versions)) + } + } + + def checkThunkFor(r1: Reachability, thunk: Reachability ⇒ Unit, times: Int): Unit = { + for (i ← 1 to times) { + thunk(Reachability(r1.records, r1.versions)) + } + } + + def merge(expectedRecords: Int)(r1: Reachability, r2: Reachability): Unit = { + r1.merge(allowed, r2).records.size should be(expectedRecords) + } + + def checkStatus(r1: Reachability): Unit = { + val record = r1.records.head + r1.status(record.observer, record.subject) should be(record.status) + } + + def checkAggregatedStatus(r1: Reachability): Unit = { + val record = r1.records.head + r1.status(record.subject) should be(record.status) + } + + def allUnreachableOrTerminated(r1: Reachability): Unit = { + val record = r1.records.head + r1.allUnreachableOrTerminated.isEmpty should be(false) + } + + def allUnreachable(r1: Reachability): Unit = { + val record = r1.records.head + r1.allUnreachable.isEmpty should be(false) + } + + def recordsFrom(r1: Reachability): Unit = { + r1.allObservers.foreach { o ⇒ + r1.recordsFrom(o) should not be be(null) + } + } + + s"Reachability of size $nodesSize" must { + + s"do a warm up run, $iterations times" in { + checkThunkFor(reachability1, reachability2, merge(0), iterations) + } + + s"merge with same versions, $iterations times" in { + checkThunkFor(reachability1, reachability1, merge(0), iterations) + } + + s"merge with all older versions, $iterations times" in { + checkThunkFor(reachability2, reachability1, merge(0), iterations) + } + + s"merge with all newer versions, $iterations times" in { + checkThunkFor(reachability1, reachability2, merge(0), iterations) + } + + s"merge with half nodes unreachable, $iterations times" in { + checkThunkFor(reachability1, reachability3, merge(5 * nodesSize / 2), iterations) + } + + s"merge with half nodes unreachable opposite $iterations times" in { + checkThunkFor(reachability3, reachability1, merge(5 * nodesSize / 2), iterations) + } + + s"check status with half nodes unreachable, $iterations times" in { + checkThunkFor(reachability3, checkStatus, iterations) + } + + s"check aggregated reachability status with half nodes unreachable, $iterations times" in { + checkThunkFor(reachability3, checkAggregatedStatus, iterations) + } + + s"get allUnreachableOrTerminated with half nodes unreachable, $iterations times" in { + checkThunkFor(reachability3, allUnreachableOrTerminated, iterations) + } + + s"get allUnreachable with half nodes unreachable, $iterations times" in { + checkThunkFor(reachability3, allUnreachable, iterations) + } + + s"get recordsFrom with half nodes unreachable, $iterations times" in { + checkThunkFor(reachability3, recordsFrom, iterations) + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala new file mode 100644 index 0000000000..fba7104118 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ReachabilitySpec extends WordSpec with MustMatchers { + + import Reachability.{ Reachable, Unreachable, Terminated, Record } + + val nodeA = UniqueAddress(Address("akka.tcp", "sys", "a", 2552), 1) + val nodeB = UniqueAddress(Address("akka.tcp", "sys", "b", 2552), 2) + val nodeC = UniqueAddress(Address("akka.tcp", "sys", "c", 2552), 3) + val nodeD = UniqueAddress(Address("akka.tcp", "sys", "d", 2552), 4) + val nodeE = UniqueAddress(Address("akka.tcp", "sys", "e", 2552), 5) + + "Reachability table" must { + + "be reachable when empty" in { + val r = Reachability.empty + r.isReachable(nodeA) must be(true) + r.allUnreachable must be(Set.empty) + } + + "be unreachable when one observed unreachable" in { + val r = Reachability.empty.unreachable(nodeB, nodeA) + r.isReachable(nodeA) must be(false) + r.allUnreachable must be(Set(nodeA)) + } + + "not be reachable when terminated" in { + val r = Reachability.empty.terminated(nodeB, nodeA) + r.isReachable(nodeA) must be(false) + // allUnreachable doesn't include terminated + r.allUnreachable must be(Set.empty) + r.allUnreachableOrTerminated must be(Set(nodeA)) + } + + "not change terminated entry" in { + val r = Reachability.empty.terminated(nodeB, nodeA) + r.reachable(nodeB, nodeA) must be theSameInstanceAs (r) + r.unreachable(nodeB, nodeA) must be theSameInstanceAs (r) + } + + "not change when same status" in { + val r = Reachability.empty.unreachable(nodeB, nodeA) + r.unreachable(nodeB, nodeA) must be theSameInstanceAs (r) + } + + "be unreachable when some observed unreachable and others reachable" in { + val r = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeA).reachable(nodeD, nodeA) + r.isReachable(nodeA) must be(false) + } + + "be reachable when all observed reachable again" in { + val r = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeA). + reachable(nodeB, nodeA).reachable(nodeC, nodeA). + unreachable(nodeB, nodeC).unreachable(nodeC, nodeB) + r.isReachable(nodeA) must be(true) + } + + "be pruned when all records of an observer are Reachable" in { + val r = Reachability.empty. + unreachable(nodeB, nodeA).unreachable(nodeB, nodeC). + unreachable(nodeD, nodeC). + reachable(nodeB, nodeA).reachable(nodeB, nodeC) + r.isReachable(nodeA) must be(true) + r.isReachable(nodeC) must be(false) + r.records must be(Vector(Record(nodeD, nodeC, Unreachable, 1L))) + + val r2 = r.unreachable(nodeB, nodeD).unreachable(nodeB, nodeE) + r2.records.toSet must be(Set( + Record(nodeD, nodeC, Unreachable, 1L), + Record(nodeB, nodeD, Unreachable, 5L), + Record(nodeB, nodeE, Unreachable, 6L))) + } + + "have correct aggregated status" in { + val records = Vector( + Reachability.Record(nodeA, nodeB, Reachable, 2), + Reachability.Record(nodeC, nodeB, Unreachable, 2), + Reachability.Record(nodeA, nodeD, Unreachable, 3), + Reachability.Record(nodeD, nodeB, Terminated, 4)) + val versions = Map(nodeA -> 3L, nodeC -> 3L, nodeD -> 4L) + val r = Reachability(records, versions) + r.status(nodeA) must be(Reachable) + r.status(nodeB) must be(Terminated) + r.status(nodeD) must be(Unreachable) + } + + "have correct status for a mix of nodes" in { + val r = Reachability.empty. + unreachable(nodeB, nodeA).unreachable(nodeC, nodeA).unreachable(nodeD, nodeA). + unreachable(nodeC, nodeB).reachable(nodeC, nodeB).unreachable(nodeD, nodeB). + unreachable(nodeD, nodeC).reachable(nodeD, nodeC). + reachable(nodeE, nodeD). + unreachable(nodeA, nodeE).terminated(nodeB, nodeE) + + r.status(nodeB, nodeA) must be(Unreachable) + r.status(nodeC, nodeA) must be(Unreachable) + r.status(nodeD, nodeA) must be(Unreachable) + + r.status(nodeC, nodeB) must be(Reachable) + r.status(nodeD, nodeB) must be(Unreachable) + + r.status(nodeA, nodeE) must be(Unreachable) + r.status(nodeB, nodeE) must be(Terminated) + + r.isReachable(nodeA) must be(false) + r.isReachable(nodeB) must be(false) + r.isReachable(nodeC) must be(true) + r.isReachable(nodeD) must be(true) + r.isReachable(nodeE) must be(false) + + r.allUnreachable must be(Set(nodeA, nodeB)) + r.allUnreachableFrom(nodeA) must be(Set(nodeE)) + r.allUnreachableFrom(nodeB) must be(Set(nodeA)) + r.allUnreachableFrom(nodeC) must be(Set(nodeA)) + r.allUnreachableFrom(nodeD) must be(Set(nodeA, nodeB)) + + r.observersGroupedByUnreachable must be(Map( + nodeA -> Set(nodeB, nodeC, nodeD), + nodeB -> Set(nodeD), + nodeE -> Set(nodeA))) + } + + "merge by picking latest version of each record" in { + val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD) + val r2 = r1.reachable(nodeB, nodeA).unreachable(nodeD, nodeE).unreachable(nodeC, nodeA) + val merged = r1.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r2) + + merged.status(nodeB, nodeA) must be(Reachable) + merged.status(nodeC, nodeA) must be(Unreachable) + merged.status(nodeC, nodeD) must be(Unreachable) + merged.status(nodeD, nodeE) must be(Unreachable) + merged.status(nodeE, nodeA) must be(Reachable) + + merged.isReachable(nodeA) must be(false) + merged.isReachable(nodeD) must be(false) + merged.isReachable(nodeE) must be(false) + + val merged2 = r2.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r1) + merged2.records.toSet must be(merged.records.toSet) + } + + "merge correctly after pruning" in { + val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD) + val r2 = r1.unreachable(nodeA, nodeE) + val r3 = r1.reachable(nodeB, nodeA) // nodeB pruned + val merged = r2.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r3) + + merged.records.toSet must be(Set( + Record(nodeA, nodeE, Unreachable, 1), + Record(nodeC, nodeD, Unreachable, 1))) + + val merged3 = r3.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r2) + merged3.records.toSet must be(merged.records.toSet) + } + + "merge versions correctly" in { + val r1 = Reachability(Vector.empty, Map(nodeA -> 3L, nodeB -> 5L, nodeC -> 7L)) + val r2 = Reachability(Vector.empty, Map(nodeA -> 6L, nodeB -> 2L, nodeD -> 1L)) + val merged = r1.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r2) + + val expected = Map(nodeA -> 6L, nodeB -> 5L, nodeC -> 7L, nodeD -> 1L) + merged.versions must be(expected) + + val merged2 = r2.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r1) + merged2.versions must be(expected) + } + + "remove node" in { + val r = Reachability.empty. + unreachable(nodeB, nodeA). + unreachable(nodeC, nodeD). + unreachable(nodeB, nodeC). + unreachable(nodeB, nodeE). + remove(Set(nodeA, nodeB)) + + r.status(nodeB, nodeA) must be(Reachable) + r.status(nodeC, nodeD) must be(Unreachable) + r.status(nodeB, nodeC) must be(Reachable) + r.status(nodeB, nodeE) must be(Reachable) + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index 4aeb11d314..a5e0aeff7e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -44,6 +44,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec { checkSerialization(InternalClusterAction.InitJoinNack(address)) checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address)) checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address)) + checkSerialization(ClusterHeartbeatReceiver.EndHeartbeatAck(address)) checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address)) val node1 = VectorClock.Node("node1") @@ -52,7 +53,8 @@ class ClusterMessageSerializerSpec extends AkkaSpec { val node4 = VectorClock.Node("node4") val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1 :+ node2).seen(a1.uniqueAddress).seen(b1.uniqueAddress) val g2 = (g1 :+ node3 :+ node4).seen(a1.uniqueAddress).seen(c1.uniqueAddress) - val g3 = g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1))) + val reachability3 = Reachability.empty.unreachable(a1.uniqueAddress, e1.uniqueAddress).unreachable(b1.uniqueAddress, e1.uniqueAddress) + val g3 = g2.copy(members = SortedSet(a1, b1, c1, d1, e1), overview = g2.overview.copy(reachability = reachability3)) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1)) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2)) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3)) diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala index 8178d5403c..59a77229a3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala @@ -6,7 +6,6 @@ package akka.cluster.routing import akka.testkit._ import akka.actor._ import akka.routing.RoundRobinRouter -import akka.cluster.routing.ClusterRouterConfig import akka.actor.OneForOneStrategy object ClusterRouterSupervisorSpec { diff --git a/akka-docs/rst/common/cluster.rst b/akka-docs/rst/common/cluster.rst index d691990d71..765fa4605a 100644 --- a/akka-docs/rst/common/cluster.rst +++ b/akka-docs/rst/common/cluster.rst @@ -94,8 +94,8 @@ by all other nodes in the cluster. Convergence is implemented by passing a map f node to current state version during gossip. This information is referred to as the gossip overview. When all versions in the overview are equal there is convergence. Gossip convergence cannot occur while any nodes are ``unreachable``. The nodes need -to be moved to the ``down`` or ``removed`` states (see the `Membership Lifecycle`_ -section below). +to become ``reachable`` again, or moved to the ``down`` and ``removed`` states +(see the `Membership Lifecycle`_ section below). Failure Detector @@ -127,9 +127,17 @@ In a cluster each node is monitored by a few (default maximum 5) other nodes, an any of these detects the node as ``unreachable`` that information will spread to the rest of the cluster through the gossip. In other words, only one node needs to mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``. -Right now there is no way for a node to come back from ``unreachable``. This is planned -for the next release of Akka. It also means that the ``unreachable`` node needs to be moved -to the ``down`` or ``removed`` states (see the `Membership Lifecycle`_ section below). + +The failure detector will also detect if the node becomes ``reachable`` again. When +all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again +the cluster, after gossip dissemination, will consider it as ``reachable``. + +If system messages cannot be delivered to a node it will be quarantined and then it +cannot come back from ``unreachable``. This can happen if the there are too many +unacknowledged system messages (e.g. watch, Terminated, remote actor deployment, +failures of actors supervised by remote parent). Then the node needs to be moved +to the ``down`` or ``removed`` states (see the `Membership Lifecycle`_ section below) +and the actor system must be restarted before it can join the cluster again. .. _The Phi Accrual Failure Detector: http://ddg.jaist.ac.jp/pub/HDY+04.pdf @@ -221,8 +229,8 @@ from the cluster, marking it as ``removed``. If a node is ``unreachable`` then gossip convergence is not possible and therefore any ``leader`` actions are also not possible (for instance, allowing a node to become a part of the cluster). To be able to move forward the state of the -``unreachable`` nodes must be changed. Currently the only way forward is to mark the -node as ``down``. If the node is to join the cluster again the actor system must be +``unreachable`` nodes must be changed. It must become ``reachable`` again or marked +as ``down``. If the node is to join the cluster again the actor system must be restarted and go through the joining process again. The cluster can, through the leader, also *auto-down* a node. @@ -292,8 +300,10 @@ Failure Detection and Unreachability causing the monitored node to be marked as unreachable - unreachable* - unreachable is not a real member state but more of a flag in addition - to the state signaling that the cluster is unable to talk to this node + unreachable is not a real member states but more of a flag in addition + to the state signaling that the cluster is unable to talk to this node, + after beeing unreachable the failure detector may detect it as reachable + again and thereby remove the flag Future Cluster Enhancements and Additions @@ -649,4 +659,3 @@ storage on top of the Akka Cluster as described in this document are: * Actor handoff * Actor rebalancing * Stateful actor replication -* Node becoming ``reachable`` after it has been marked as ``unreachable`` diff --git a/akka-docs/rst/images/member-states.drawing b/akka-docs/rst/images/member-states.drawing new file mode 100644 index 0000000000..560ed389c7 Binary files /dev/null and b/akka-docs/rst/images/member-states.drawing differ diff --git a/akka-docs/rst/images/member-states.png b/akka-docs/rst/images/member-states.png index 7dee278dfe..b9870bce1c 100644 Binary files a/akka-docs/rst/images/member-states.png and b/akka-docs/rst/images/member-states.png differ diff --git a/akka-docs/rst/images/member-states.svg b/akka-docs/rst/images/member-states.svg index f009dc1815..28f8c7fc6c 100644 --- a/akka-docs/rst/images/member-states.svg +++ b/akka-docs/rst/images/member-states.svg @@ -8,901 +8,811 @@ xmlns:svg="http://www.w3.org/2000/svg" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" - xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd" - xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape" - id="svg3004" version="1.1" - inkscape:version="0.48.2 r9819" - width="856.9765" - height="590.44275" - xml:space="preserve" - sodipodi:docname="member-states.pdf">image/svg+xmlunreachable* + id="tspan3757" + style="font-size:14px;font-variant:normal;font-weight:normal;writing-mode:lr-tb;fill:#000000;fill-opacity:1;fill-rule:nonzero;stroke:none;font-family:Lucida;-inkscape-font-specification:LucidaGrande">unreachable* joining + id="tspan3803" + style="font-size:14px;font-variant:normal;font-weight:normal;writing-mode:lr-tb;fill:#000000;fill-opacity:1;fill-rule:nonzero;stroke:none;font-family:Lucida;-inkscape-font-specification:LucidaGrande">joining up + id="tspan3849" + style="font-size:14px;font-variant:normal;font-weight:normal;writing-mode:lr-tb;fill:#000000;fill-opacity:1;fill-rule:nonzero;stroke:none;font-family:Lucida;-inkscape-font-specification:LucidaGrande">up leaving + id="tspan3895" + style="font-size:14px;font-variant:normal;font-weight:normal;writing-mode:lr-tb;fill:#000000;fill-opacity:1;fill-rule:nonzero;stroke:none;font-family:Lucida;-inkscape-font-specification:LucidaGrande">leaving exiting + id="tspan3941" + style="font-size:14px;font-variant:normal;font-weight:normal;writing-mode:lr-tb;fill:#000000;fill-opacity:1;fill-rule:nonzero;stroke:none;font-family:Lucida;-inkscape-font-specification:LucidaGrande">exiting down -(leader action) -(fd*) -(fd*) -(fd*) -(fd*) -leave -(leader action) -(leader action) -join -removed + id="tspan3987" + style="font-size:14px;font-variant:normal;font-weight:normal;writing-mode:lr-tb;fill:#000000;fill-opacity:1;fill-rule:nonzero;stroke:none;font-family:Lucida;-inkscape-font-specification:LucidaGrande">down +(leader action) +(fd*) +(fd*) +(fd*) +(fd*) +leave +(leader action) +(leader action) +join +removed \ No newline at end of file diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index df4ab2cf2f..fa398f4ca6 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -141,10 +141,10 @@ Automatic vs. Manual Downing When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of -new joining members to 'Up'. The status of the unreachable member must be -changed to 'Down'. This can be performed automatically or manually. By -default it must be done manually, using using :ref:`cluster_jmx_java` or -:ref:`cluster_command_line_java`. +new joining members to 'Up'. The node must first become reachable again, or the +status of the unreachable member must be changed to 'Down'. Changing status to 'Down' +can be performed automatically or manually. By default it must be done manually, using +:ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`. It can also be performed programatically with ``Cluster.get(system).down(address)``. @@ -194,10 +194,13 @@ receive ``MemberUp`` for that node, and other nodes. The events to track the life-cycle of members are: * ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. -* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``. +* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting`` Note that the node might already have been shutdown when this event is published on another node. * ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster. -* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector. +* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable, detected by the failure detector + of at least one other node. +* ``ClusterEvent.ReachableMember`` - A member is considered as reachable again, after having been unreachable. + All nodes that previously detected it as unreachable has detected it as reachable again. There are more types of change events, consult the API documentation of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` @@ -324,6 +327,22 @@ See :ref:`cluster-client` in the contrib module. Failure Detector ^^^^^^^^^^^^^^^^ +In a cluster each node is monitored by a few (default maximum 5) other nodes, and when +any of these detects the node as ``unreachable`` that information will spread to +the rest of the cluster through the gossip. In other words, only one node needs to +mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``. + +The failure detector will also detect if the node becomes ``reachable`` again. When +all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again +the cluster, after gossip dissemination, will consider it as ``reachable``. + +If system messages cannot be delivered to a node it will be quarantined and then it +cannot come back from ``unreachable``. This can happen if the there are too many +unacknowledged system messages (e.g. watch, Terminated, remote actor deployment, +failures of actors supervised by remote parent). Then the node needs to be moved +to the ``down`` or ``removed`` states and the actor system must be restarted before +it can join the cluster again. + The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. The heartbeat arrival times is interpreted by an implementation of @@ -384,9 +403,10 @@ Cluster Aware Routers All :ref:`routers ` can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. -When a node becomes unavailable or leaves the cluster the routees of that node are +When a node becomes unreachable or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster additional -routees are added to the router, according to the configuration. +routees are added to the router, according to the configuration. Routees are also added +when a node becomes reachable again, after having been unreachable. There are two distinct types of routers. diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index bb0bb55fa9..96d0cc9df3 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -134,10 +134,10 @@ Automatic vs. Manual Downing When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of -new joining members to 'Up'. The status of the unreachable member must be -changed to 'Down'. This can be performed automatically or manually. By -default it must be done manually, using using :ref:`cluster_jmx_scala` or -:ref:`cluster_command_line_scala`. +new joining members to 'Up'. The node must first become reachable again, or the +status of the unreachable member must be changed to 'Down'. Changing status to 'Down' +can be performed automatically or manually. By default it must be done manually, using +:ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`. It can also be performed programatically with ``Cluster(system).down(address)``. @@ -187,10 +187,13 @@ receive ``MemberUp`` for that node, and other nodes. The events to track the life-cycle of members are: * ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. -* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``. +* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting`` Note that the node might already have been shutdown when this event is published on another node. * ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster. -* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector. +* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable, detected by the failure detector + of at least one other node. +* ``ClusterEvent.ReachableMember`` - A member is considered as reachable again, after having been unreachable. + All nodes that previously detected it as unreachable has detected it as reachable again. There are more types of change events, consult the API documentation of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` @@ -312,6 +315,22 @@ See :ref:`cluster-client` in the contrib module. Failure Detector ^^^^^^^^^^^^^^^^ +In a cluster each node is monitored by a few (default maximum 5) other nodes, and when +any of these detects the node as ``unreachable`` that information will spread to +the rest of the cluster through the gossip. In other words, only one node needs to +mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``. + +The failure detector will also detect if the node becomes ``reachable`` again. When +all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again +the cluster, after gossip dissemination, will consider it as ``reachable``. + +If system messages cannot be delivered to a node it will be quarantined and then it +cannot come back from ``unreachable``. This can happen if the there are too many +unacknowledged system messages (e.g. watch, Terminated, remote actor deployment, +failures of actors supervised by remote parent). Then the node needs to be moved +to the ``down`` or ``removed`` states and the actor system must be restarted before +it can join the cluster again. + The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. The heartbeat arrival times is interpreted by an implementation of @@ -375,9 +394,10 @@ Cluster Aware Routers All :ref:`routers ` can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. -When a node becomes unavailable or leaves the cluster the routees of that node are +When a node becomes unreachable or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster additional -routees are added to the router, according to the configuration. +routees are added to the router, according to the configuration. Routees are also added +when a node becomes reachable again, after having been unreachable. There are two distinct types of routers. diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index 336fa0b7d7..161174829c 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -100,7 +100,9 @@ class PhiAccrualFailureDetector( private val state = new AtomicReference[State](State(history = firstHeartbeat, timestamp = None)) - override def isAvailable: Boolean = phi < threshold + override def isAvailable: Boolean = isAvailable(clock()) + + private def isAvailable(timestamp: Long): Boolean = phi(timestamp) < threshold override def isMonitoring: Boolean = state.get.timestamp.nonEmpty @@ -118,7 +120,9 @@ class PhiAccrualFailureDetector( case Some(latestTimestamp) ⇒ // this is a known connection val interval = timestamp - latestTimestamp - oldState.history :+ interval + // don't use the first heartbeat after failure for the history, since a long pause will skew the stats + if (isAvailable(timestamp)) oldState.history :+ interval + else oldState.history } val newState = oldState.copy(history = newHistory, timestamp = Some(timestamp)) // record new timestamp @@ -133,13 +137,15 @@ class PhiAccrualFailureDetector( * If a connection does not have any records in failure detector then it is * considered healthy. */ - def phi: Double = { + def phi: Double = phi(clock()) + + private def phi(timestamp: Long): Double = { val oldState = state.get val oldTimestamp = oldState.timestamp if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { - val timeDiff = clock() - oldTimestamp.get + val timeDiff = timestamp - oldTimestamp.get val history = oldState.history val mean = history.mean diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index fecae6d90a..e4924ec761 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -400,10 +400,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒ settings.QuarantineDuration match { case d: FiniteDuration ⇒ - log.warning("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " + - "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + - "from this situation.", remoteAddress, uid) endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) + eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) case _ ⇒ // disabled } context.system.eventStream.publish(AddressTerminated(remoteAddress)) @@ -488,9 +486,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Some(endpoint) ⇒ context.stop(endpoint) case _ ⇒ // nothing to stop } - log.info("Address [{}] is now quarantined, all messages to this address will be delivered to dead letters.", - address) endpoints.markAsQuarantined(address, uid, Deadline.now + d) + eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) case _ ⇒ // Ignore } diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index 6ed910434f..da51e2823d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -78,6 +78,15 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE override def toString: String = s"Remoting error: [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]" } +@SerialVersionUID(1L) +case class QuarantinedEvent(address: Address, uid: Int) extends RemotingLifecycleEvent { + override def logLevel: Logging.LogLevel = Logging.WarningLevel + override val toString: String = + s"Association to [$address] having UID [$uid] is irrecoverably failed. UID is now quarantined and all " + + "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + + "from this situation." +} + /** * INTERNAL API */ diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 9b50be79e7..a9caa5accb 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -26,7 +26,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { def createFailureDetector( threshold: Double = 8.0, maxSampleSize: Int = 1000, - minStdDeviation: FiniteDuration = 10.millis, + minStdDeviation: FiniteDuration = 100.millis, acceptableLostDuration: FiniteDuration = Duration.Zero, firstHeartbeatEstimate: FiniteDuration = 1.second, clock: Clock = FailureDetector.defaultClock) = @@ -120,19 +120,21 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { - val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100) - val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval)) + // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger unreachable again + val regularIntervals = 0L +: Vector.fill(999)(1000L) + val timeIntervals = regularIntervals :+ (5 * 60 * 1000L) :+ 100L :+ 900L :+ 100L :+ 7000L :+ 100L :+ 900L :+ 100L :+ 900L + val fd = createFailureDetector(threshold = 8, acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeIntervals)) - fd.heartbeat() //0 - fd.heartbeat() //1000 - fd.heartbeat() //1100 - fd.isAvailable must be(true) //1200 - fd.isAvailable must be(false) //8200 - fd.heartbeat() //8300 - fd.heartbeat() //9300 - fd.heartbeat() //9400 - - fd.isAvailable must be(true) //9500 + for (_ ← 0 until 1000) fd.heartbeat() + fd.isAvailable must be(false) // after the long pause + fd.heartbeat() + fd.isAvailable must be(true) + fd.heartbeat() + fd.isAvailable must be(false) // after the 7 seconds pause + fd.heartbeat() + fd.isAvailable must be(true) + fd.heartbeat() + fd.isAvailable must be(true) } "accept some configured missing heartbeats" in { @@ -164,7 +166,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { } "use maxSampleSize heartbeats" in { - val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) + val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 500, 500, 500, 500, 500) val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval)) // 100 ms interval @@ -173,12 +175,12 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { fd.heartbeat() //200 fd.heartbeat() //300 val phi1 = fd.phi //400 - // 1000 ms interval, should become same phi when 100 ms intervals have been dropped + // 500 ms interval, should become same phi when 100 ms intervals have been dropped fd.heartbeat() //1000 + fd.heartbeat() //1500 fd.heartbeat() //2000 - fd.heartbeat() //3000 - fd.heartbeat() //4000 - val phi2 = fd.phi //5000 + fd.heartbeat() //2500 + val phi2 = fd.phi //3000 phi2 must be(phi1.plusOrMinus(0.001)) } @@ -211,7 +213,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { val history5 = history4 :+ 80 history5.mean must be(103.333333 plusOrMinus 0.00001) history5.variance must be(688.88888889 plusOrMinus 0.00001) - } + } }