!clu #2307 Allow transition from unreachable to reachable

* Replace unreachable Set with Reachability table
* Unreachable members stay in member Set
* Downing a live member was moved it to the unreachable Set,
  and then removed from there by the leader. That will not
  work when flipping back to reachable, so a Down member must
  be detected as unreachable before beeing removed. Similar
  to Exiting. Member shuts down itself if it sees itself as
  Down.
* Flip back to reachable when failure detector monitors it as
  available again
* ReachableMember event
* Can't ignore gossip from aggregated unreachable (see SurviveNetworkInstabilitySpec)
* Make use of ReachableMember event in cluster router
* End heartbeat when acknowledged, EndHeartbeatAck
* Remove nr-of-end-heartbeats from conf
* Full reachability info in JMX cluster status
* Don't use interval after unreachable for AccrualFailureDetector history
* Add QuarantinedEvent to remoting, used for Reachability.Terminated
* Prune reachability table when all reachable
* Update documentation
* Performance testing and optimizations
This commit is contained in:
Patrik Nordwall 2013-08-27 15:14:53 +02:00
parent beba5d9f76
commit dc9fe4f19c
43 changed files with 2425 additions and 1169 deletions

View file

@ -69,6 +69,11 @@ message Welcome {
* Sends an Address
*/
/**
* EndHeartbeatAck
* Sends an Address
*/
/**
* HeartbeatRequest
* Sends an Address
@ -114,9 +119,34 @@ message Gossip {
message GossipOverview {
/* This is the address indexes for the nodes that have seen this gossip */
repeated int32 seen = 1;
repeated Member unreachable = 2;
repeated ObserverReachability observerReachability = 2;
}
/**
* Reachability
*/
message ObserverReachability {
required int32 addressIndex = 1;
required int64 version = 4;
repeated SubjectReachability subjectReachability = 2;
}
message SubjectReachability {
required int32 addressIndex = 1;
required ReachabilityStatus status = 3;
required int64 version = 4;
}
/**
* Reachability status
*/
enum ReachabilityStatus {
Reachable = 0;
Unreachable = 1;
Terminated = 2;
}
/**
* Member
*/

View file

@ -128,11 +128,6 @@ akka {
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 5
# When a node stops sending heartbeats to another node it will end that
# with this number of EndHeartbeat messages, which will remove the
# monitoring from the failure detector.
nr-of-end-heartbeats = 8
# When no expected heartbeat message has been received an explicit
# heartbeat request is sent to the node that should emit heartbeats.
heartbeat-request {

View file

@ -15,6 +15,7 @@ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import scala.collection.breakOut
import akka.remote.QuarantinedEvent
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -264,13 +265,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick))
}
override def preStart(): Unit =
override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[QuarantinedEvent])
if (SeedNodes.isEmpty)
logInfo("No seed-nodes configured, manual cluster join required")
else
self ! JoinSeedNodes(SeedNodes)
}
override def postStop(): Unit = {
context.system.eventStream.unsubscribe(self)
gossipTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
@ -323,6 +327,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case ClusterUserAction.Leave(address) leaving(address)
case SendGossipTo(address) sendGossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case QuarantinedEvent(address, uid) quarantined(UniqueAddress(address, uid))
case ClusterUserAction.JoinTo(address)
logInfo("Trying to join [{}] when already part of a cluster, ignoring", address)
case JoinSeedNodes(seedNodes)
@ -419,12 +424,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
selfAddress.system, node.address.system)
else {
val localMembers = latestGossip.members
val localUnreachable = latestGossip.overview.unreachable
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
val alreadyMember = localMembers.exists(_.address == node.address)
val isUnreachable = localUnreachable.exists(_.address == node.address)
val isUnreachable = !latestGossip.overview.reachability.isReachable(node)
if (alreadyMember)
logInfo("Existing member [{}] is trying to join, ignoring", node)
@ -488,14 +492,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
/**
* This method is called when a member sees itself as Exiting.
* This method is called when a member sees itself as Exiting or Down.
*/
def shutdown(): Unit = cluster.shutdown()
/**
* State transition to DOW.
* The node to DOWN is removed from the `members` set and put in the `unreachable` set (if not already there)
* and its status is set to DOWN. The node is also removed from the `seen` table.
* State transition to DOWN.
* Its status is set to DOWN. The node is also removed from the `seen` table.
*
* The node will eventually be removed by the leader, and only after removal a new node with same address can
* join the cluster through the normal joining procedure.
@ -505,46 +508,50 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val localReachability = localOverview.reachability
// 1. check if the node to DOWN is in the `members` set
val downedMember: Option[Member] =
localMembers.collectFirst { case m if m.address == address m.copy(status = Down) }
val newMembers = downedMember match {
// check if the node to DOWN is in the `members` set
localMembers.collectFirst { case m if m.address == address m.copy(status = Down) } match {
case Some(m)
logInfo("Marking node [{}] as [{}]", m.address, Down)
localMembers - m
case None localMembers
if (localReachability.isReachable(m.uniqueAddress))
logInfo("Marking node [{}] as [{}]", m.address, Down)
else
logInfo("Marking unreachable node [{}] as [{}]", m.address, Down)
// replace member (changed status)
val newMembers = localMembers - m + m
// remove nodes marked as DOWN from the `seen` table
val newSeen = localSeen - m.uniqueAddress
// update gossip overview
val newOverview = localOverview copy (seen = newSeen)
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
updateLatestGossip(newGossip)
publish(latestGossip)
case None
logInfo("Ignoring down of unknown node [{}] as [{}]", address)
}
// 2. check if the node to DOWN is in the `unreachable` set
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.address == address && member.status != Down) {
logInfo("Marking unreachable node [{}] as [{}]", member.address, Down)
member copy (status = Down)
} else member
}
}
// 3. add the newly DOWNED members from the `members` (in step 1.) to the `newUnreachableMembers` set.
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the `seen` table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down m.uniqueAddress }
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
updateLatestGossip(newGossip)
publish(latestGossip)
def quarantined(node: UniqueAddress): Unit = {
val localGossip = latestGossip
if (localGossip.hasMember(node)) {
val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, node)
val newOverview = localGossip.overview copy (reachability = newReachability)
val newGossip = localGossip copy (overview = newOverview)
updateLatestGossip(newGossip)
log.warning("Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine",
selfAddress, node.address)
publish(latestGossip)
downing(node.address)
}
}
def receiveGossipStatus(status: GossipStatus): Unit = {
val from = status.from
if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from))
if (!latestGossip.overview.reachability.isReachable(selfUniqueAddress, from))
logInfo("Ignoring received gossip status from unreachable [{}] ", from)
else if (latestGossip.members.forall(_.uniqueAddress != from))
log.debug("Cluster Node [{}] - Ignoring received gossip status from unknown [{}]", selfAddress, from)
@ -578,10 +585,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
Ignored
} else if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
} else if (!remoteGossip.overview.reachability.isReachable(selfUniqueAddress)) {
logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address)
Ignored
} else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) {
} else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
@ -634,7 +641,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
publish(latestGossip)
if (latestGossip.member(selfUniqueAddress).status == Exiting)
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (selfStatus == Exiting || selfStatus == Down)
shutdown()
else if (talkback) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
@ -653,23 +661,26 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def gossip(): Unit = {
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (!isSingletonCluster && isAvailable) {
if (!isSingletonCluster) {
val localGossip = latestGossip
val preferredGossipTargets: Vector[UniqueAddress] =
if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older gossip version
localGossip.members.collect { case m if !localGossip.seenByNode(m.uniqueAddress) m.uniqueAddress }(breakOut)
} else Vector.empty[UniqueAddress]
localGossip.members.collect {
case m if !localGossip.seenByNode(m.uniqueAddress) && validNodeForGossip(m.uniqueAddress)
m.uniqueAddress
}(breakOut)
} else Vector.empty
if (preferredGossipTargets.nonEmpty) {
val peer = selectRandomNode(preferredGossipTargets filterNot (_ == selfUniqueAddress))
val peer = selectRandomNode(preferredGossipTargets)
// send full gossip because it has different view
peer foreach gossipTo
} else {
// Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect {
case m if m.uniqueAddress != selfUniqueAddress m.uniqueAddress
case m if validNodeForGossip(m.uniqueAddress) m.uniqueAddress
})
peer foreach { node
if (localGossip.seenByNode(node)) gossipStatusTo(node)
@ -684,8 +695,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* assigning partitions etc.
*/
def leaderActions(): Unit =
if (latestGossip.isLeader(selfUniqueAddress) && isAvailable) {
// only run the leader actions if we are the LEADER and available
if (latestGossip.isLeader(selfUniqueAddress)) {
// only run the leader actions if we are the LEADER
if (AutoDown)
leaderAutoDownActions()
@ -712,7 +723,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// TODO implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
@ -726,9 +736,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
val (removedUnreachable, newUnreachable) = localUnreachableMembers partition { m
Gossip.removeUnreachableWithMemberStatus(m.status)
}
val removedUnreachable = for {
node localOverview.reachability.allUnreachableOrTerminated
m = localGossip.member(node)
if Gossip.removeUnreachableWithMemberStatus(m.status)
} yield m
val changedMembers = localMembers collect {
var upNumber = 0
@ -758,12 +770,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// handle changes
// replace changed members
val newMembers = localMembers -- changedMembers ++ changedMembers
val newMembers = changedMembers ++ localMembers -- removedUnreachable
// removing REMOVED nodes from the `seen` table
val newSeen = localSeen -- removedUnreachable.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
val removed = removedUnreachable.map(_.uniqueAddress)
val newSeen = localSeen -- removed
// removing REMOVED nodes from the `reachability` table
val newReachability = localOverview.reachability.remove(removed)
val newOverview = localOverview copy (seen = newSeen, reachability = newReachability)
val newGossip = localGossip copy (members = newMembers, overview = newOverview)
updateLatestGossip(newGossip)
@ -802,25 +817,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
*/
def leaderAutoDownActions(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val changedUnreachableMembers = localUnreachableMembers collect {
case m if !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status) m copy (status = Down)
}
val changedUnreachableMembers = for {
node localOverview.reachability.allUnreachableOrTerminated
m = localGossip.member(node)
if m.status != Removed && !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)
} yield m.copy(status = Down)
if (changedUnreachableMembers.nonEmpty) {
// handle changes
// replace changed unreachable
val newUnreachableMembers = localUnreachableMembers -- changedUnreachableMembers ++ changedUnreachableMembers
val newMembers = localMembers -- changedUnreachableMembers ++ changedUnreachableMembers
// removing nodes marked as Down/Exiting from the `seen` table
val newSeen = localSeen -- changedUnreachableMembers.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
val newOverview = localOverview copy (seen = newSeen) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
updateLatestGossip(newGossip)
@ -834,39 +851,54 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
/**
* Reaps the unreachable members (moves them to the `unreachable` list in the cluster overview) according to the failure detector's verdict.
* Reaps the unreachable members according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {
if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available
if (!isSingletonCluster) {
// only scrutinize if we are a non-singleton cluster
val localGossip = latestGossip
val localOverview = localGossip.overview
val localMembers = localGossip.members
val localUnreachableMembers = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address)
member.uniqueAddress == selfUniqueAddress ||
localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Unreachable ||
localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Terminated ||
failureDetector.isAvailable(member.address)
}
if (newlyDetectedUnreachableMembers.nonEmpty) {
val newlyDetectedReachableMembers = localOverview.reachability.allUnreachableFrom(selfUniqueAddress) collect {
case node if node != selfUniqueAddress && failureDetector.isAvailable(node.address)
localGossip.member(node)
}
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
if (newlyDetectedUnreachableMembers.nonEmpty || newlyDetectedReachableMembers.nonEmpty) {
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val newReachability1 = (localOverview.reachability /: newlyDetectedUnreachableMembers) {
(reachability, m) reachability.unreachable(selfUniqueAddress, m.uniqueAddress)
}
val newReachability2 = (newReachability1 /: newlyDetectedReachableMembers) {
(reachability, m) reachability.reachable(selfUniqueAddress, m.uniqueAddress)
}
updateLatestGossip(newGossip)
if (newReachability2 ne localOverview.reachability) {
val newOverview = localOverview copy (reachability = newReachability2)
val newGossip = localGossip copy (overview = newOverview)
val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting)
if (nonExiting.nonEmpty)
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", "))
if (exiting.nonEmpty)
logInfo("Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
exiting.mkString(", "))
updateLatestGossip(newGossip)
publish(latestGossip)
val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting)
if (nonExiting.nonEmpty)
log.warning("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", "))
if (exiting.nonEmpty)
logInfo("Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
exiting.mkString(", "))
if (newlyDetectedReachableMembers.nonEmpty)
logInfo("Marking node(s) as REACHABLE [{}]", newlyDetectedReachableMembers.mkString(", "))
publish(latestGossip)
}
}
}
}
@ -877,8 +909,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
def isAvailable: Boolean = !latestGossip.isUnreachable(selfUniqueAddress)
// needed for tests
def sendGossipTo(address: Address): Unit = {
latestGossip.members.foreach(m
@ -906,7 +936,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
(node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node))
(node != selfUniqueAddress && latestGossip.hasMember(node) &&
latestGossip.overview.reachability.isReachable(node))
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes

View file

@ -10,7 +10,6 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.actor.AddressTerminated
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
/**
@ -144,10 +143,23 @@ object ClusterEvent {
def getLeader: Address = leader orNull
}
/**
* Marker interface to facilitate subscription of
* both [[UnreachableMember]] and [[ReachableMember]].
*/
sealed trait ReachabilityEvent extends ClusterDomainEvent
/**
* A member is considered as unreachable by the failure detector.
*/
case class UnreachableMember(member: Member) extends ClusterDomainEvent
case class UnreachableMember(member: Member) extends ReachabilityEvent
/**
* A member is considered as reachable by the failure detector
* after having been unreachable.
* @see [[UnreachableMember]]
*/
case class ReachableMember(member: Member) extends ReachabilityEvent
/**
* Current snapshot of cluster node metrics. Published to subscribers.
@ -166,6 +178,11 @@ object ClusterEvent {
*/
private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] case class ReachabilityChanged(reachability: Reachability) extends ClusterDomainEvent
/**
* INTERNAL API
*/
@ -179,10 +196,24 @@ object ClusterEvent {
private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] =
if (newGossip eq oldGossip) Nil
else {
val newUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable
val unreachableEvents = newUnreachable map UnreachableMember
val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated
(newGossip.overview.reachability.allUnreachableOrTerminated.collect {
case node if !oldUnreachableNodes.contains(node)
UnreachableMember(newGossip.member(node))
})(collection.breakOut)
}
/**
* INTERNAL API
*/
private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachableMember] =
if (newGossip eq oldGossip) Nil
else {
(oldGossip.overview.reachability.allUnreachable.collect {
case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node)
ReachableMember(newGossip.member(node))
})(collection.breakOut)
immutable.Seq.empty ++ unreachableEvents
}
/**
@ -202,10 +233,7 @@ object ClusterEvent {
// no events for other transitions
}
val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable
val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++
(oldGossip.overview.unreachable -- newGossip.overview.unreachable)
val removedMembers = oldGossip.members -- newGossip.members
val removedEvents = removedMembers.map(m MemberRemoved(m.copy(status = Removed), m.status))
(new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result()
@ -243,6 +271,14 @@ object ClusterEvent {
List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
else Nil
}
/**
* INTERNAL API
*/
private[cluster] def diffReachability(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachabilityChanged] =
if (newGossip.overview.reachability eq oldGossip.overview.reachability) Nil
else List(ReachabilityChanged(newGossip.overview.reachability))
}
/**
@ -283,7 +319,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
val state = CurrentClusterState(
members = latestGossip.members,
unreachable = latestGossip.overview.unreachable,
unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member,
seenBy = latestGossip.seenBy.map(_.address),
leader = latestGossip.leader.map(_.address),
roleLeaderMap = latestGossip.allRoles.map(r r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
@ -307,13 +343,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
val oldGossip = latestGossip
// keep the latestGossip to be sent to new subscribers
latestGossip = newGossip
// first publish the diffUnreachable between the last two gossips
diffUnreachable(oldGossip, newGossip) foreach publish
diffReachable(oldGossip, newGossip) foreach publish
diffMemberEvents(oldGossip, newGossip) foreach publish
diffLeader(oldGossip, newGossip) foreach publish
diffRolesLeader(oldGossip, newGossip) foreach publish
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach publish
diffReachability(oldGossip, newGossip) foreach publish
}
def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)

View file

@ -26,6 +26,12 @@ private[akka] object ClusterHeartbeatReceiver {
* this node.
*/
case class EndHeartbeat(from: Address) extends ClusterMessage
/**
* Acknowledgment that `EndHeartbeat` was received and heartbeating
* can stop.
*/
case class EndHeartbeatAck(from: Address) extends ClusterMessage
}
/**
@ -39,10 +45,13 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
import ClusterHeartbeatReceiver._
val failureDetector = Cluster(context.system).failureDetector
val selfEndHeartbeatAck = EndHeartbeatAck(Cluster(context.system).selfAddress)
def receive = {
case Heartbeat(from) failureDetector heartbeat from
case EndHeartbeat(from) failureDetector remove from
case Heartbeat(from) failureDetector heartbeat from
case EndHeartbeat(from)
failureDetector remove from
sender ! selfEndHeartbeatAck
}
}
@ -103,7 +112,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
}
override def postStop(): Unit = {
@ -126,7 +134,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def receive = {
case HeartbeatTick heartbeat()
case MemberUp(m) addMember(m)
case UnreachableMember(m) removeMember(m)
case MemberRemoved(m, _) removeMember(m)
case s: CurrentClusterState reset(s)
case MemberExited(m) memberExited(m)
@ -134,6 +141,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case HeartbeatRequest(from) addHeartbeatRequest(from)
case SendHeartbeatRequest(to) sendHeartbeatRequest(to)
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
case EndHeartbeatAck(from) ackEndHeartbeat(from)
}
def reset(snapshot: CurrentClusterState): Unit =
@ -183,15 +191,12 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
heartbeatReceiver(to) ! selfHeartbeat
}
// When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
// sent to notify it that no more heartbeats will be sent.
for ((to, count) state.ending) {
// When sending heartbeats to a node is stopped a `EndHeartbeat` messages are
// sent to notify it that no more heartbeats will be sent. This will continue
// until `EndHeartbeatAck` is received.
for (to state.ending) {
log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", cluster.selfAddress, to)
heartbeatReceiver(to) ! selfEndHeartbeat
if (count == NumberOfEndHeartbeats)
state = state.removeEnding(to)
else
state = state.increaseEndingCount(to)
}
// request heartbeats from expected sender node if no heartbeat messages has been received
@ -202,6 +207,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
def ackEndHeartbeat(from: Address): Unit = {
state.removeEnding(from)
}
}
/**
@ -225,7 +234,7 @@ private[cluster] object ClusterHeartbeatSenderState {
val curr = ring.myReceivers
// start ending process for nodes not selected any more
// abort ending process for nodes that have been selected again
val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
val end = old.ending ++ (old.current -- curr) -- curr
old.copy(ring = ring, current = curr, ending = end, heartbeatRequest = old.heartbeatRequest -- curr)
}
@ -243,14 +252,14 @@ private[cluster] object ClusterHeartbeatSenderState {
private[cluster] case class ClusterHeartbeatSenderState private (
ring: HeartbeatNodeRing,
current: Set[Address] = Set.empty,
ending: Map[Address, Int] = Map.empty,
ending: Set[Address] = Set.empty,
heartbeatRequest: Map[Address, Deadline] = Map.empty) {
// TODO can be disabled as optimization
assertInvariants()
private def assertInvariants(): Unit = {
val currentAndEnding = current.intersect(ending.keySet)
val currentAndEnding = current.intersect(ending)
require(currentAndEnding.isEmpty,
s"Same nodes in current and ending not allowed, got [${currentAndEnding}]")
@ -282,7 +291,7 @@ private[cluster] case class ClusterHeartbeatSenderState private (
private def removeHeartbeatRequest(address: Address): ClusterHeartbeatSenderState = {
if (heartbeatRequest contains address)
copy(heartbeatRequest = heartbeatRequest - address, ending = ending + (address -> 0))
copy(heartbeatRequest = heartbeatRequest - address, ending = ending + address)
else this
}
@ -298,13 +307,11 @@ private[cluster] case class ClusterHeartbeatSenderState private (
val overdue = heartbeatRequest collect { case (address, deadline) if deadline.isOverdue address }
if (overdue.isEmpty) this
else
copy(ending = ending ++ overdue.map(_ -> 0), heartbeatRequest = heartbeatRequest -- overdue)
copy(ending = ending ++ overdue, heartbeatRequest = heartbeatRequest -- overdue)
}
def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1)))
}
/**

View file

@ -35,13 +35,45 @@ trait ClusterNodeMBean {
def getUnreachable: String
/*
* String that will list all nodes in the node ring as follows:
* JSON format of the status of all nodes in the cluster as follows:
* {{{
* Members:
* Member(address = akka://system0@localhost:5550, status = Up)
* Member(address = akka://system1@localhost:5551, status = Up)
* Unreachable:
* Member(address = akka://system2@localhost:5553, status = Down)
* {
* "self-address": "akka://system@host1:2552",
* "members": [
* {
* "address": "akka://system@host1:2552",
* "status": "Up"
* },
* {
* "address": "akka://system@host2:2552",
* "status": "Up"
* },
* {
* "address": "akka://system@host3:2552",
* "status": "Down"
* },
* {
* "address": "akka://system@host4:2552",
* "status": "Joining"
* }
* ],
* "unreachable": [
* {
* "node": "akka://system@host2:2552",
* "observed-by": [
* "akka://system@host1:2552",
* "akka://system@host3:2552"
* ]
* },
* {
* "node": "akka://system@host3:2552",
* "observed-by": [
* "akka://system@host1:2552",
* "akka://system@host2:2552"
* ]
* }
* ]
* }
* }}}
*/
def getClusterStatus: String
@ -102,9 +134,33 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
// JMX attributes (bean-style)
def getClusterStatus: String = {
val unreachable = clusterView.unreachableMembers
"\nMembers:\n\t" + clusterView.members.mkString("\n\t") +
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" }
val members = clusterView.members.toSeq.sorted(Member.ordering).map { m
s"""{
| "address": "${m.address}",
| "status": "${m.status}"
| }""".stripMargin
} mkString (",\n ")
val unreachable = clusterView.reachability.observersGroupedByUnreachable.toSeq.sortBy(_._1).map {
case (subject, observers)
s"""{
| "node": "${subject.address}",
| "observed-by": [
| ${observers.toSeq.sorted.map(_.address).mkString("\"", "\",\n \"", "\"")}
| ]
| }""".stripMargin
} mkString (",\n")
s"""{
| "self-address": "${clusterView.selfAddress}",
| "members": [
| ${members}
| ],
| "unreachable": [
| ${unreachable}
| ]
|}
|""".stripMargin
}
def getMembers: String =

View file

@ -78,7 +78,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
cluster.subscribe(self, classOf[ReachabilityEvent])
logInfo("Metrics collection has started successfully")
}
@ -91,6 +91,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case MemberRemoved(m, _) removeMember(m)
case MemberExited(m) removeMember(m)
case UnreachableMember(m) removeMember(m)
case ReachableMember(m) if (m.status == Up) addMember(m)
case _: MemberEvent // not interested in other types of MemberEvent
}

View file

@ -26,6 +26,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
@volatile
private var state: CurrentClusterState = CurrentClusterState()
@volatile
private var _reachability: Reachability = Reachability.empty
/**
* Current internal cluster stats, updated periodically via event bus.
*/
@ -50,15 +53,22 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
case e: ClusterDomainEvent e match {
case SeenChanged(convergence, seenBy)
state = state.copy(seenBy = seenBy)
case ReachabilityChanged(reachability)
_reachability = reachability
case MemberRemoved(member, _)
state = state.copy(members = state.members - member, unreachable = state.unreachable - member)
case UnreachableMember(member)
// replace current member with new member (might have different status, only address is used in equals)
state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member)
state = state.copy(unreachable = state.unreachable - member + member)
case ReachableMember(member)
state = state.copy(unreachable = state.unreachable - member)
case event: MemberEvent
// replace current member with new member (might have different status, only address is used in equals)
val newUnreachable =
if (state.unreachable.contains(event.member)) state.unreachable - event.member + event.member
else state.unreachable
state = state.copy(members = state.members - event.member + event.member,
unreachable = state.unreachable - event.member)
unreachable = newUnreachable)
case LeaderChanged(leader)
state = state.copy(leader = leader)
case RoleLeaderChanged(role, leader)
@ -73,7 +83,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def self: Member = {
import cluster.selfUniqueAddress
state.members.find(_.uniqueAddress == selfUniqueAddress).orElse(state.unreachable.find(_.uniqueAddress == selfUniqueAddress)).
state.members.find(_.uniqueAddress == selfUniqueAddress).
getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed))
}
@ -127,6 +137,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
myself.status != MemberStatus.Removed
}
def reachability: Reachability = _reachability
/**
* Current cluster metrics.
*/

View file

@ -34,9 +34,6 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val HeartbeatRequestTimeToLive: FiniteDuration = {
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0")
val NumberOfEndHeartbeats: Int = {
FailureDetectorConfig.getInt("nr-of-end-heartbeats")
} requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0")
val MonitoredByNrOfMembers: Int = {
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")

View file

@ -67,24 +67,25 @@ private[cluster] case class Gossip(
assertInvariants()
private def assertInvariants(): Unit = {
val unreachableAndLive = members.intersect(overview.unreachable)
if (unreachableAndLive.nonEmpty)
throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]"
format unreachableAndLive.mkString(", "))
val allowedLiveMemberStatus: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting)
def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatus(m.status)
if (members exists hasNotAllowedLiveMemberStatus)
throw new IllegalArgumentException("Live members must have status [%s], got [%s]"
format (allowedLiveMemberStatus.mkString(", "),
(members filter hasNotAllowedLiveMemberStatus).mkString(", ")))
if (members.exists(_.status == Removed))
throw new IllegalArgumentException(s"Live members must have status [${Removed}], " +
s"got [${members.filter(_.status == Removed)}]")
val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress)
val inReachabilityButNotMember = overview.reachability.allObservers -- members.map(_.uniqueAddress)
if (inReachabilityButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster in reachability table, got [%s]"
format inReachabilityButNotMember.mkString(", "))
val seenButNotMember = overview.seen -- members.map(_.uniqueAddress)
if (seenButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember.mkString(", "))
}
@transient private lazy val membersMap: Map[UniqueAddress, Member] =
members.map(m m.uniqueAddress -> m)(collection.breakOut)
/**
* Increments the version for this 'Node'.
*/
@ -138,17 +139,17 @@ private[cluster] case class Gossip(
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
// 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members)
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 3. merge reachability table by picking records with highest version
val mergedReachability = this.overview.reachability.merge(mergedMembers.map(_.uniqueAddress),
that.overview.reachability)
// 4. Nobody can have seen this new gossip yet
val mergedSeen = Set.empty[UniqueAddress]
Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock)
Gossip(mergedMembers, GossipOverview(mergedSeen, mergedReachability), mergedVClock)
}
/**
@ -165,7 +166,8 @@ private[cluster] case class Gossip(
// When that is done we check that all members with a convergence
// status is in the seen table and has the latest vector clock
// version
overview.unreachable.forall(m Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) &&
val unreachable = overview.reachability.allUnreachableOrTerminated map member
unreachable.forall(m Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) &&
!members.exists(m Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress))
}
@ -176,34 +178,28 @@ private[cluster] case class Gossip(
def roleLeader(role: String): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role)))
private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = {
if (mbrs.isEmpty) None
else mbrs.find(m Gossip.leaderMemberStatus(m.status)).
orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.uniqueAddress)
val reachableMembers =
if (overview.reachability.isAllReachable) mbrs
else mbrs.filter(m overview.reachability.isReachable(m.uniqueAddress))
if (reachableMembers.isEmpty) None
else reachableMembers.find(m Gossip.leaderMemberStatus(m.status)).
orElse(Some(reachableMembers.min(Member.leaderStatusOrdering))).map(_.uniqueAddress)
}
def allRoles: Set[String] = members.flatMap(_.roles)
def isSingletonCluster: Boolean = members.size == 1
/**
* Returns true if the node is in the unreachable set
*/
def isUnreachable(node: UniqueAddress): Boolean =
overview.unreachable exists { _.uniqueAddress == node }
def member(node: UniqueAddress): Member = {
members.find(_.uniqueAddress == node).orElse(overview.unreachable.find(_.uniqueAddress == node)).
getOrElse(Member.removed(node)) // placeholder for removed member
membersMap.getOrElse(node,
Member.removed(node)) // placeholder for removed member
}
def hasMember(node: UniqueAddress): Boolean = membersMap.contains(node)
def youngestMember: Member = {
require(members.nonEmpty, "No youngest when no members")
def maxByUpNumber(mbrs: Iterable[Member]): Member =
mbrs.maxBy(m if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
if (overview.unreachable.isEmpty)
maxByUpNumber(members)
else
maxByUpNumber(members ++ overview.unreachable)
members.maxBy(m if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
}
override def toString =
@ -217,10 +213,10 @@ private[cluster] case class Gossip(
@SerialVersionUID(1L)
private[cluster] case class GossipOverview(
seen: Set[UniqueAddress] = Set.empty,
unreachable: Set[Member] = Set.empty) {
reachability: Reachability = Reachability.empty) {
override def toString =
s"GossipOverview(unreachable = [${unreachable.mkString(", ")}], seen = [${seen.mkString(", ")}])"
s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])"
}
/**

View file

@ -213,10 +213,9 @@ object MemberStatus {
*/
@SerialVersionUID(1L)
private[cluster] case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] {
@transient
override lazy val hashCode = scala.util.hashing.MurmurHash3.productHash(this)
override def hashCode = uid
override def compare(that: UniqueAddress): Int = {
def compare(that: UniqueAddress): Int = {
val result = Member.addressOrdering.compare(this.address, that.address)
if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1
else result

View file

@ -0,0 +1,294 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.breakOut
import akka.actor.Address
/**
* INTERNAL API
*/
private[cluster] object Reachability {
val empty = new Reachability(Vector.empty, Map.empty)
def apply(records: immutable.IndexedSeq[Record], versions: Map[UniqueAddress, Long]): Reachability =
new Reachability(records, versions)
def create(records: immutable.Seq[Record], versions: Map[UniqueAddress, Long]): Reachability = records match {
case r: immutable.IndexedSeq[Record] apply(r, versions)
case _ apply(records.toVector, versions)
}
@SerialVersionUID(1L)
case class Record(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus, version: Long)
sealed trait ReachabilityStatus
@SerialVersionUID(1L) case object Reachable extends ReachabilityStatus
@SerialVersionUID(1L) case object Unreachable extends ReachabilityStatus
@SerialVersionUID(1L) case object Terminated extends ReachabilityStatus
}
/**
* INTERNAL API
*
* Immutable data structure that holds the reachability status of subject nodes as seen
* from observer nodes. Failure detector for the subject nodes exist on the
* observer nodes. Changes (reachable, unreachable, terminated) are only performed
* by observer nodes to its own records. Each change bumps the version number of the
* record, and thereby it is always possible to determine which record is newest when
* merging two instances.
*
* Aggregated status of a subject node is defined as (in this order):
* - Terminated if any observer node considers it as Terminated
* - Unreachable if any observer node considers it as Unreachable
* - Reachable otherwise, i.e. no observer node considers it as Unreachable
*/
@SerialVersionUID(1L)
private[cluster] class Reachability private (
val records: immutable.IndexedSeq[Reachability.Record],
val versions: Map[UniqueAddress, Long]) extends Serializable {
import Reachability._
private class Cache {
val (observerRowsMap, allUnreachable, allTerminated) = {
if (records.isEmpty) {
val observerRowsMap = Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]]
val allTerminated = Set.empty[UniqueAddress]
val allUnreachable = Set.empty[UniqueAddress]
(observerRowsMap, allUnreachable, allTerminated)
} else {
val mapBuilder = scala.collection.mutable.Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]]
import scala.collection.mutable.SetBuilder
val terminatedBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty)
val unreachableBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty)
records foreach { r
val m = mapBuilder.get(r.observer) match {
case None Map(r.subject -> r)
case Some(m) m.updated(r.subject, r)
}
mapBuilder += (r.observer -> m)
if (r.status == Unreachable) unreachableBuilder += r.subject
else if (r.status == Terminated) terminatedBuilder += r.subject
}
val observerRowsMap: Map[UniqueAddress, Map[UniqueAddress, Reachability.Record]] = mapBuilder.toMap
val allTerminated: Set[UniqueAddress] = terminatedBuilder.result()
val allUnreachable: Set[UniqueAddress] = unreachableBuilder.result() -- allTerminated
(observerRowsMap, allUnreachable, allTerminated)
}
}
val allUnreachableOrTerminated: Set[UniqueAddress] =
if (allTerminated.isEmpty) allUnreachable
else allUnreachable ++ allTerminated
}
@transient private lazy val cache = new Cache
private def observerRows(observer: UniqueAddress): Option[Map[UniqueAddress, Reachability.Record]] =
cache.observerRowsMap.get(observer)
def unreachable(observer: UniqueAddress, subject: UniqueAddress): Reachability =
change(observer, subject, Unreachable)
def reachable(observer: UniqueAddress, subject: UniqueAddress): Reachability =
change(observer, subject, Reachable)
def terminated(observer: UniqueAddress, subject: UniqueAddress): Reachability =
change(observer, subject, Terminated)
private def currentVersion(observer: UniqueAddress): Long = versions.get(observer) match {
case None 0
case Some(v) v
}
private def nextVersion(observer: UniqueAddress): Long = currentVersion(observer) + 1
private def change(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus): Reachability = {
val v = nextVersion(observer)
val newVersions = versions.updated(observer, v)
val newRecord = Record(observer, subject, status, v)
observerRows(observer) match {
case None if status == Reachable this
case None
new Reachability(records :+ newRecord, newVersions)
case Some(oldObserverRows)
oldObserverRows.get(subject) match {
case None
if (status == Reachable && oldObserverRows.forall { case (_, r) r.status == Reachable }) {
// all Reachable, prune by removing the records of the observer, and bump the version
new Reachability(records.filterNot(_.observer == observer), newVersions)
} else
new Reachability(records :+ newRecord, newVersions)
case Some(oldRecord)
if (oldRecord.status == Terminated || oldRecord.status == status)
this
else {
if (status == Reachable && oldObserverRows.forall { case (_, r) r.status == Reachable || r.subject == subject }) {
// all Reachable, prune by removing the records of the observer, and bump the version
new Reachability(records.filterNot(_.observer == observer), newVersions)
} else {
val newRecords = records.updated(records.indexOf(oldRecord), newRecord)
new Reachability(newRecords, newVersions)
}
}
}
}
}
def merge(allowed: immutable.Set[UniqueAddress], other: Reachability): Reachability = {
val recordBuilder = new immutable.VectorBuilder[Record]
recordBuilder.sizeHint(math.max(this.records.size, other.records.size))
var newVersions = versions
allowed foreach { observer
val observerVersion1 = this.currentVersion(observer)
val observerVersion2 = other.currentVersion(observer)
(this.observerRows(observer), other.observerRows(observer)) match {
case (None, None)
case (Some(rows1), Some(rows2))
mergeObserverRows(rows1, rows2, observerVersion1, observerVersion2, recordBuilder)
case (Some(rows1), None)
recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 r }
case (None, Some(rows2))
recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 r }
}
if (observerVersion2 > observerVersion1)
newVersions += (observer -> observerVersion2)
}
new Reachability(recordBuilder.result(), newVersions)
}
private def mergeObserverRows(
rows1: Map[UniqueAddress, Reachability.Record], rows2: Map[UniqueAddress, Reachability.Record],
observerVersion1: Long, observerVersion2: Long,
recordBuilder: immutable.VectorBuilder[Record]): Unit = {
val allSubjects = rows1.keySet ++ rows2.keySet
allSubjects foreach { subject
(rows1.get(subject), rows2.get(subject)) match {
case (Some(r1), Some(r2))
recordBuilder += (if (r1.version > r2.version) r1 else r2)
case (Some(r1), None)
if (r1.version > observerVersion2)
recordBuilder += r1
case (None, Some(r2))
if (r2.version > observerVersion1)
recordBuilder += r2
case (None, None)
throw new IllegalStateException(s"Unexpected [$subject]")
}
}
}
def remove(nodes: Iterable[UniqueAddress]): Reachability = {
val nodesSet = nodes.to[immutable.HashSet]
val newRecords = records.filterNot(r nodesSet(r.observer) || nodesSet(r.subject))
if (newRecords.size == records.size) this
else {
val newVersions = versions -- nodes
Reachability(newRecords, newVersions)
}
}
def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus =
observerRows(observer) match {
case None Reachable
case Some(observerRows) observerRows.get(subject) match {
case None Reachable
case Some(record) record.status
}
}
def status(node: UniqueAddress): ReachabilityStatus =
if (cache.allTerminated(node)) Terminated
else if (cache.allUnreachable(node)) Unreachable
else Reachable
def isReachable(node: UniqueAddress): Boolean = isAllReachable || !allUnreachableOrTerminated.contains(node)
def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean =
status(observer, subject) == Reachable
def isAllReachable: Boolean = records.isEmpty
/**
* Doesn't include terminated
*/
def allUnreachable: Set[UniqueAddress] = cache.allUnreachable
def allUnreachableOrTerminated: Set[UniqueAddress] = cache.allUnreachableOrTerminated
/**
* Doesn't include terminated
*/
def allUnreachableFrom(observer: UniqueAddress): Set[UniqueAddress] =
observerRows(observer) match {
case None Set.empty
case Some(observerRows)
observerRows.collect {
case (subject, record) if record.status == Unreachable subject
}(breakOut)
}
def observersGroupedByUnreachable: Map[UniqueAddress, Set[UniqueAddress]] = {
records.groupBy(_.subject).collect {
case (subject, records) if records.exists(_.status == Unreachable)
val observers: Set[UniqueAddress] =
records.collect { case r if r.status == Unreachable r.observer }(breakOut)
(subject -> observers)
}
}
def allObservers: Set[UniqueAddress] = versions.keySet
def recordsFrom(observer: UniqueAddress): immutable.IndexedSeq[Record] = {
observerRows(observer) match {
case None Vector.empty
case Some(rows) rows.valuesIterator.toVector
}
}
// only used for testing
override def hashCode: Int = versions.hashCode
// only used for testing
override def equals(obj: Any): Boolean = obj match {
case other: Reachability
records.size == other.records.size && versions == versions &&
cache.observerRowsMap == other.cache.observerRowsMap
case _ false
}
override def toString: String = {
val rows = for {
observer versions.keys.toSeq.sorted
rowsOption = observerRows(observer)
if rowsOption.isDefined // compilation err for subject <- rowsOption
rows = rowsOption.get
subject rows.keys.toSeq.sorted
} yield {
val record = rows(subject)
val aggregated = status(subject)
s"${observer.address} -> ${subject.address}: ${record.status} [$aggregated] (${record.version})"
""
}
rows.mkString(", ")
}
}

View file

@ -44,6 +44,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
classOf[InternalClusterAction.InitJoinNack] -> (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.EndHeartbeatAck] -> (bytes ClusterHeartbeatReceiver.EndHeartbeatAck(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))),
classOf[GossipStatus] -> gossipStatusFromBinary,
classOf[GossipEnvelope] -> gossipEnvelopeFromBinary,
@ -67,6 +68,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
case InternalClusterAction.InitJoinAck(address) addressToProto(address).toByteArray
case InternalClusterAction.InitJoinNack(address) addressToProto(address).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeat(from) addressToProto(from).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeatAck(from) addressToProto(from).toByteArray
case ClusterHeartbeatSender.HeartbeatRequest(from) addressToProto(from).toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}
@ -132,6 +134,13 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) (b, a) }
private val reachabilityStatusToInt = scala.collection.immutable.HashMap[Reachability.ReachabilityStatus, Int](
Reachability.Reachable -> msg.ReachabilityStatus.Reachable_VALUE,
Reachability.Unreachable -> msg.ReachabilityStatus.Unreachable_VALUE,
Reachability.Terminated -> msg.ReachabilityStatus.Terminated_VALUE)
private val reachabilityStatusFromInt = reachabilityStatusToInt.map { case (a, b) (b, a) }
private def mapWithErrorMessage[T](map: Map[T, Int], value: T, unknown: String): Int = map.get(value) match {
case Some(x) x
case _ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message")
@ -139,8 +148,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
private def gossipToProto(gossip: Gossip): msg.Gossip = {
import scala.collection.breakOut
val allMembers = (gossip.members.iterator ++ gossip.overview.unreachable.iterator).toIndexedSeq
val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)(breakOut)
val allMembers = gossip.members.toVector
val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress)
val addressMapping = allAddresses.zipWithIndex.toMap
val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) acc ++ m.roles).to[Vector]
val roleMapping = allRoles.zipWithIndex.toMap
@ -154,11 +163,21 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber,
msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut))
val unreachable: Vector[msg.Member] = gossip.overview.unreachable.map(memberToProto)(breakOut)
def reachabilityToProto(reachability: Reachability): Vector[msg.ObserverReachability] = {
reachability.versions.map {
case (observer, version)
val subjectReachability = reachability.recordsFrom(observer).map(r
msg.SubjectReachability(mapUniqueAddress(r.subject),
msg.ReachabilityStatus.valueOf(reachabilityStatusToInt(r.status)), r.version))
msg.ObserverReachability(mapUniqueAddress(observer), version, subjectReachability)
}(breakOut)
}
val reachability = reachabilityToProto(gossip.overview.reachability)
val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut)
val seen: Vector[Int] = gossip.overview.seen.map(mapUniqueAddress)(breakOut)
val overview = msg.GossipOverview(seen, unreachable)
val overview = msg.GossipOverview(seen, reachability)
msg.Gossip(allAddresses.map(uniqueAddressToProto),
allRoles, allHashes, members, overview, vectorClockToProto(gossip.version, hashMapping))
@ -192,14 +211,31 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val roleMapping = gossip.allRoles
val hashMapping = gossip.allHashes
def reachabilityFromProto(observerReachability: immutable.Seq[msg.ObserverReachability]): Reachability = {
val recordBuilder = new immutable.VectorBuilder[Reachability.Record]
val versionsBuilder = new scala.collection.mutable.MapBuilder[UniqueAddress, Long, Map[UniqueAddress, Long]](Map.empty)
for (o observerReachability) {
val observer = addressMapping(o.addressIndex)
versionsBuilder += ((observer, o.version))
for (s o.subjectReachability) {
val subject = addressMapping(s.addressIndex)
val record = Reachability.Record(observer, subject, reachabilityStatusFromInt(s.status), s.version)
recordBuilder += record
}
}
Reachability.create(recordBuilder.result(), versionsBuilder.result())
}
def memberFromProto(member: msg.Member) =
new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id),
member.rolesIndexes.map(roleMapping)(breakOut))
val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut)
val unreachable: immutable.Set[Member] = gossip.overview.unreachable.map(memberFromProto)(breakOut)
val reachability = reachabilityFromProto(gossip.overview.observerReachability)
val seen: Set[UniqueAddress] = gossip.overview.seen.map(addressMapping)(breakOut)
val overview = GossipOverview(seen, unreachable)
val overview = GossipOverview(seen, reachability)
Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping))
}

View file

@ -274,7 +274,7 @@ private[akka] class ClusterRouterActor(override val supervisorStrategy: Supervis
// re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
cluster.subscribe(self, classOf[ReachabilityEvent])
}
override def postStop(): Unit = cluster.unsubscribe(self)
@ -289,6 +289,13 @@ private[akka] class ClusterRouterActor(override val supervisorStrategy: Supervis
def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef)
def registerRoutees(member: Member) = {
routeeProvider.nodes += member.address
// createRoutees will create routees based on
// totalInstances and maxInstancesPerNode
routeeProvider.createRoutees()
}
def unregisterRoutees(member: Member) = {
val address = member.address
routeeProvider.nodes -= address
@ -309,17 +316,18 @@ private[akka] class ClusterRouterActor(override val supervisorStrategy: Supervis
routeeProvider.createRoutees()
case m: MemberEvent if routeeProvider.isAvailable(m.member)
routeeProvider.nodes += m.member.address
// createRoutees will create routees based on
// totalInstances and maxInstancesPerNode
routeeProvider.createRoutees()
registerRoutees(m.member)
case other: MemberEvent
// other events means that it is no longer interesting, such as
// MemberJoined, MemberLeft, MemberExited, MemberRemoved
// MemberExited, MemberRemoved
unregisterRoutees(other.member)
case UnreachableMember(m)
unregisterRoutees(m)
case ReachableMember(m)
if (routeeProvider.isAvailable(m))
registerRoutees(m)
}
}

View file

@ -4,6 +4,7 @@
package akka.cluster
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._

View file

@ -6,6 +6,7 @@ package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import scala.concurrent.duration._
import akka.testkit._
@ -17,6 +18,8 @@ object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")).
withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)
}
class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec
@ -44,6 +47,47 @@ abstract class ClusterAccrualFailureDetectorSpec
enterBarrier("after-1")
}
"mark node as 'unavailable' when network partition and then back to 'available' when partition is healed" taggedAs
LongRunningTest in {
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
enterBarrier("broken")
runOn(first) {
// detect failure...
awaitCond(!cluster.failureDetector.isAvailable(second), 15.seconds)
// other connections still ok
cluster.failureDetector.isAvailable(third) must be(true)
}
runOn(second) {
// detect failure...
awaitCond(!cluster.failureDetector.isAvailable(first), 15.seconds)
// other connections still ok
cluster.failureDetector.isAvailable(third) must be(true)
}
enterBarrier("partitioned")
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
}
enterBarrier("repaired")
runOn(first, third) {
awaitCond(cluster.failureDetector.isAvailable(second), 15.seconds)
}
runOn(second) {
awaitCond(cluster.failureDetector.isAvailable(first), 15.seconds)
}
enterBarrier("after-2")
}
"mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
runOn(first) {
testConductor.exit(third, 0).await
@ -59,7 +103,7 @@ abstract class ClusterAccrualFailureDetectorSpec
cluster.failureDetector.isAvailable(second) must be(true)
}
enterBarrier("after-2")
enterBarrier("after-3")
}
}
}

View file

@ -71,7 +71,7 @@ abstract class ClusterDeathWatchSpec
}
"An actor watching a remote actor in the cluster" must {
"receive Terminated when watched node becomes Down" taggedAs LongRunningTest in within(20 seconds) {
"receive Terminated when watched node becomes Down/Removed" taggedAs LongRunningTest in within(20 seconds) {
awaitClusterUp(first, second, third, fourth)
enterBarrier("cluster-up")
@ -103,10 +103,10 @@ abstract class ClusterDeathWatchSpec
enterBarrier("second-terminated")
markNodeAsUnavailable(third)
awaitAssert(clusterView.members.map(_.address) must not contain (address(third)))
awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(third)))
cluster.down(third)
// removed
awaitAssert(clusterView.members.map(_.address) must not contain (address(third)))
awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(third)))
expectMsg(path3)
enterBarrier("third-terminated")
@ -119,10 +119,10 @@ abstract class ClusterDeathWatchSpec
enterBarrier("watch-established")
runOn(third) {
markNodeAsUnavailable(second)
awaitAssert(clusterView.members.map(_.address) must not contain (address(second)))
awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(second)))
cluster.down(second)
// removed
awaitAssert(clusterView.members.map(_.address) must not contain (address(second)))
awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(second)))
}
enterBarrier("second-terminated")
@ -194,11 +194,11 @@ abstract class ClusterDeathWatchSpec
runOn(fourth) {
markNodeAsUnavailable(fifth)
awaitAssert(clusterView.members.map(_.address) must not contain (address(fifth)))
awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(fifth)))
cluster.down(fifth)
// removed
awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(fifth)))
awaitAssert(clusterView.members.map(_.address) must not contain (address(fifth)))
}
enterBarrier("fifth-terminated")
@ -226,11 +226,11 @@ abstract class ClusterDeathWatchSpec
enterBarrier("hello-deployed")
markNodeAsUnavailable(first)
awaitAssert(clusterView.members.map(_.address) must not contain (address(first)))
awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(first)))
cluster.down(first)
// removed
awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(first)))
awaitAssert(clusterView.members.map(_.address) must not contain (address(first)))
expectTerminated(hello)

View file

@ -70,12 +70,11 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
within(28 seconds) {
// third becomes unreachable
awaitAssert(clusterView.unreachableMembers.size must be(1))
awaitAssert(clusterView.members.size must be(2))
awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up)))
awaitSeenSameState(first, second)
// still one unreachable
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(thirdAddress)
clusterView.members.size must be(3)
}
}
@ -96,7 +95,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
runOn(first, second, fourth) {
for (n 1 to 5) {
awaitAssert(clusterView.members.size must be(2))
awaitAssert(clusterView.members.size must be(3))
awaitSeenSameState(first, second, fourth)
memberStatus(first) must be(Some(MemberStatus.Up))
memberStatus(second) must be(Some(MemberStatus.Up))

View file

@ -95,8 +95,9 @@ abstract class MBeanSpec
enterBarrier("after-4")
}
"support down" taggedAs LongRunningTest in within(20 seconds) {
val fourthAddress = address(fourth)
val fourthAddress = address(fourth)
"format cluster status as JSON with full reachability info" taggedAs LongRunningTest in within(30 seconds) {
runOn(first) {
testConductor.exit(fourth, 0).await
}
@ -104,11 +105,62 @@ abstract class MBeanSpec
runOn(first, second, third) {
awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") must be(fourthAddress.toString))
val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(",")
val expectedMembers = Seq(first, second, third, fourth).sorted.map(address(_)).mkString(",")
awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") must be(expectedMembers))
}
enterBarrier("fourth-unreachable")
runOn(first) {
val sortedNodes = Vector(first, second, third, fourth).sorted.map(address(_))
val unreachableObservedBy = Vector(first, second, third).sorted.map(address(_))
val expectedJson =
s"""{
| "self-address": "${address(first)}",
| "members": [
| {
| "address": "${sortedNodes(0)}",
| "status": "Up"
| },
| {
| "address": "${sortedNodes(1)}",
| "status": "Up"
| },
| {
| "address": "${sortedNodes(2)}",
| "status": "Up"
| },
| {
| "address": "${sortedNodes(3)}",
| "status": "Up"
| }
| ],
| "unreachable": [
| {
| "node": "${address(fourth)}",
| "observed-by": [
| "${unreachableObservedBy(0)}",
| "${unreachableObservedBy(1)}",
| "${unreachableObservedBy(2)}"
| ]
| }
| ]
|}
|""".stripMargin
// awaitAssert to make sure that all nodes detects unreachable
within(5.seconds) {
awaitAssert(mbeanServer.getAttribute(mbeanName, "ClusterStatus") must be(expectedJson))
}
}
enterBarrier("after-5")
}
"support down" taggedAs LongRunningTest in within(20 seconds) {
// fourth unreachable in previous step
runOn(second) {
mbeanServer.invoke(mbeanName, "down", Array(fourthAddress.toString), Array("java.lang.String"))
}
@ -120,7 +172,7 @@ abstract class MBeanSpec
awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") must be(""))
}
enterBarrier("after-5")
enterBarrier("after-6")
}
"support leave" taggedAs LongRunningTest in within(20 seconds) {
@ -142,7 +194,7 @@ abstract class MBeanSpec
})
}
enterBarrier("after-6")
enterBarrier("after-7")
}
}

View file

@ -38,12 +38,13 @@ object MultiNodeClusterSpec {
jmx.enabled = off
gossip-interval = 200 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms
unreachable-nodes-reaper-interval = 500 ms
periodic-tasks-initial-delay = 300 ms
publish-stats-interval = 0 s # always, when it happens
failure-detector.heartbeat-interval = 400 ms
failure-detector.heartbeat-interval = 500 ms
}
akka.loglevel = INFO
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote.log-remote-lifecycle-events = off
akka.loggers = ["akka.testkit.TestEventListener"]
@ -114,8 +115,10 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
classOf[InternalClusterAction.Tick],
classOf[akka.actor.PoisonPill],
classOf[akka.dispatch.sysmsg.DeathWatchNotification],
akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
classOf[akka.remote.transport.AssociationHandle.Disassociated],
// akka.remote.transport.AssociationHandle.Disassociated.getClass,
classOf[akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying],
// akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)
}
@ -125,6 +128,10 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
def muteMarkingAsReachable(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.info(pattern = ".*Marking.* as REACHABLE.*")))
override def afterAll(): Unit = {
if (!log.isDebugEnabled) {
muteDeadLetters()()
@ -292,7 +299,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
}
}
def awaitAllReachable(): Unit = awaitAssert(clusterView.unreachableMembers.isEmpty)
def awaitAllReachable(): Unit =
awaitAssert(clusterView.unreachableMembers must be(Set.empty))
/**
* Wait until the specified nodes have seen the same gossip overview.

View file

@ -68,7 +68,6 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
}
"detect network partition and mark nodes on other side as unreachable and form new cluster" taggedAs LongRunningTest in within(30 seconds) {
val thirdAddress = address(third)
enterBarrier("before-split")
runOn(first) {
@ -79,20 +78,15 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
}
enterBarrier("after-split")
runOn(side1.last) {
for (role side2) markNodeAsUnavailable(role)
}
runOn(side2.last) {
for (role side1) markNodeAsUnavailable(role)
}
runOn(side1: _*) {
for (role side2) markNodeAsUnavailable(role)
// auto-down = on
awaitMembersUp(side1.size, side2.toSet map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
for (role side1) markNodeAsUnavailable(role)
// auto-down = on
awaitMembersUp(side2.size, side1.toSet map address)
assertLeader(side2: _*)

View file

@ -109,7 +109,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
# by tree-width (number of children for each actor) and
# tree-levels, total number of actors can be calculated by
# (width * math.pow(width, levels) - 1) / (width - 1)
tree-width = 5
tree-width = 4
tree-levels = 4
report-metrics-interval = 10s
# scale convergence within timeouts with this factor

View file

@ -0,0 +1,338 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import scala.concurrent.duration._
import akka.testkit._
import akka.testkit.TestEvent._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.remote.testconductor.RoleName
import akka.actor.Props
import akka.actor.Actor
import scala.util.control.NoStackTrace
import akka.remote.QuarantinedEvent
import akka.actor.ExtendedActorSystem
import akka.remote.RemoteActorRefProvider
import akka.actor.ActorRef
import akka.dispatch.sysmsg.Failed
object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
val seventh = role("seventh")
val eighth = role("eighth")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("akka.remote.system-message-buffer-size=20")).
withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)
deployOn(second, """"/parent/*" {
remote = "@third@"
}""")
class Parent extends Actor {
def receive = {
case p: Props sender ! context.actorOf(p)
}
}
class RemoteChild extends Actor {
import context.dispatcher
context.system.scheduler.scheduleOnce(500.millis, self, "boom")
def receive = {
case "boom" throw new SimulatedException
case x sender ! x
}
}
class SimulatedException extends RuntimeException("Simulated") with NoStackTrace
}
class SurviveNetworkInstabilityMultiJvmNode1 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode2 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode3 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode4 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode5 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode6 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode7 extends SurviveNetworkInstabilitySpec
class SurviveNetworkInstabilityMultiJvmNode8 extends SurviveNetworkInstabilitySpec
abstract class SurviveNetworkInstabilitySpec
extends MultiNodeSpec(SurviveNetworkInstabilityMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender {
import SurviveNetworkInstabilityMultiJvmSpec._
// muteMarkingAsUnreachable()
// muteMarkingAsReachable()
override def expectedTestDuration = 3.minutes
def assertUnreachable(subjects: RoleName*): Unit = {
val expected = subjects.toSet map address
awaitAssert(clusterView.unreachableMembers.map(_.address) must be(expected))
}
"A network partition tolerant cluster" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third, fourth, fifth)
enterBarrier("after-1")
}
"heal after a broken pair" taggedAs LongRunningTest in within(30.seconds) {
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
enterBarrier("blackhole-2")
runOn(first) { assertUnreachable(second) }
runOn(second) { assertUnreachable(first) }
runOn(third, fourth, fifth) {
assertUnreachable(first, second)
}
enterBarrier("unreachable-2")
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
}
enterBarrier("repair-2")
// This test illustrates why we can't ignore gossip from unreachable aggregated
// status. If all third, fourth, and fifth has been infected by first and second
// unreachable they must accept gossip from first and second when their
// broken connection has healed, otherwise they will be isolated forever.
awaitAllReachable()
enterBarrier("after-2")
}
"heal after one isolated node" taggedAs LongRunningTest in within(30.seconds) {
val others = Vector(second, third, fourth, fifth)
runOn(first) {
for (other others) {
testConductor.blackhole(first, other, Direction.Both).await
}
}
enterBarrier("blackhole-3")
runOn(first) { assertUnreachable(others: _*) }
runOn(others: _*) {
assertUnreachable(first)
}
enterBarrier("unreachable-3")
runOn(first) {
for (other others) {
testConductor.passThrough(first, other, Direction.Both).await
}
}
enterBarrier("repair-3")
awaitAllReachable()
enterBarrier("after-3")
}
"heal two isolated islands" taggedAs LongRunningTest in within(30.seconds) {
val island1 = Vector(first, second)
val island2 = Vector(third, fourth, fifth)
runOn(first) {
// split the cluster in two parts (first, second) / (third, fourth, fifth)
for (role1 island1; role2 island2) {
testConductor.blackhole(role1, role2, Direction.Both).await
}
}
enterBarrier("blackhole-4")
runOn(island1: _*) {
assertUnreachable(island2: _*)
}
runOn(island2: _*) {
assertUnreachable(island1: _*)
}
enterBarrier("unreachable-4")
runOn(first) {
for (role1 island1; role2 island2) {
testConductor.passThrough(role1, role2, Direction.Both).await
}
}
enterBarrier("repair-4")
awaitAllReachable()
enterBarrier("after-4")
}
"heal after unreachable when ring is changed" taggedAs LongRunningTest in within(45.seconds) {
val joining = Vector(sixth, seventh)
val others = Vector(second, third, fourth, fifth)
runOn(first) {
for (role1 (joining :+ first); role2 others) {
testConductor.blackhole(role1, role2, Direction.Both).await
}
}
enterBarrier("blackhole-5")
runOn(first) { assertUnreachable(others: _*) }
runOn(others: _*) { assertUnreachable(first) }
enterBarrier("unreachable-5")
runOn(joining: _*) {
cluster.join(first)
// let them join and stabilize heartbeating
Thread.sleep(5000)
}
enterBarrier("joined-5")
runOn((joining :+ first): _*) { assertUnreachable(others: _*) }
// others doesn't know about the joining nodes yet, no gossip passed through
runOn(others: _*) { assertUnreachable(first) }
enterBarrier("more-unreachable-5")
runOn(first) {
for (role1 (joining :+ first); role2 others) {
testConductor.passThrough(role1, role2, Direction.Both).await
}
}
enterBarrier("repair-5")
runOn((joining ++ others): _*) {
awaitAllReachable()
// eighth not joined yet
awaitMembersUp(roles.size - 1)
}
enterBarrier("after-5")
}
"down and remove quarantined node" taggedAs LongRunningTest in within(45.seconds) {
val others = Vector(first, third, fourth, fifth, sixth, seventh)
runOn(second) {
val sysMsgBufferSize = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].
remoteSettings.SysMsgBufferSize
val parent = system.actorOf(Props[Parent], "parent")
// fill up the system message redeliver buffer with many failing actors
for (_ 1 to sysMsgBufferSize + 1) {
// remote deployment to third
parent ! Props[RemoteChild]
val child = expectMsgType[ActorRef]
child ! "hello"
expectMsg("hello")
lastSender.path.address must be(address(third))
}
}
runOn(third) {
// undelivered system messages in RemoteChild on third should trigger QuarantinedEvent
system.eventStream.subscribe(testActor, classOf[QuarantinedEvent])
// after quarantined it will drop the Failed messages to deadLetters
muteDeadLetters(classOf[Failed])(system)
}
enterBarrier("children-deployed")
runOn(first) {
for (role others)
testConductor.blackhole(second, role, Direction.Send).await
}
enterBarrier("blackhole-6")
runOn(third) {
// undelivered system messages in RemoteChild on third should trigger QuarantinedEvent
within(10.seconds) {
expectMsgType[QuarantinedEvent].address must be(address(second))
}
system.eventStream.unsubscribe(testActor, classOf[QuarantinedEvent])
}
enterBarrier("quarantined")
runOn(others: _*) {
// second should be removed because of quarantine
awaitAssert(clusterView.members.map(_.address) must not contain (address(second)))
}
enterBarrier("after-6")
}
"continue and move Joining to Up after downing of one half" taggedAs LongRunningTest in within(45.seconds) {
// note that second is already removed in previous step
val side1 = Vector(first, third, fourth)
val side1AfterJoin = side1 :+ eighth
val side2 = Vector(fifth, sixth, seventh)
runOn(first) {
for (role1 side1AfterJoin; role2 side2) {
testConductor.blackhole(role1, role2, Direction.Both).await
}
}
enterBarrier("blackhole-7")
runOn(side1: _*) { assertUnreachable(side2: _*) }
runOn(side2: _*) { assertUnreachable(side1: _*) }
enterBarrier("unreachable-7")
runOn(eighth) {
cluster.join(third)
}
runOn(fourth) {
for (role2 side2) {
cluster.down(role2)
}
}
enterBarrier("downed-7")
runOn(side1AfterJoin: _*) {
// side2 removed
val expected = (side1AfterJoin map address).toSet
awaitAssert(clusterView.members.map(_.address) must be(expected))
awaitAssert(clusterView.members.collectFirst { case m if m.address == address(eighth) m.status } must be(
Some(MemberStatus.Up)))
}
enterBarrier("side2-removed")
runOn(first) {
for (role1 side1AfterJoin; role2 side2) {
testConductor.passThrough(role1, role2, Direction.Both).await
}
}
enterBarrier("repair-7")
// side2 should not detect side1 as reachable again
Thread.sleep(10000)
runOn(side1AfterJoin: _*) {
val expected = (side1AfterJoin map address).toSet
clusterView.members.map(_.address) must be(expected)
}
runOn(side2: _*) {
val expected = ((side2 ++ side1) map address).toSet
clusterView.members.map(_.address) must be(expected)
assertUnreachable(side1: _*)
}
enterBarrier("after-7")
}
}
}

View file

@ -91,8 +91,6 @@ abstract class UnreachableNodeJoinsAgainSpec
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size must be(roles.size - 1)
members.size must be(1)
members.map(_.status) must be(Set(MemberStatus.Up))
}
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
}
@ -105,13 +103,12 @@ abstract class UnreachableNodeJoinsAgainSpec
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size must be(1)
members.size must be(roles.size - 1)
members.map(_.status) must be(Set(MemberStatus.Up))
}
awaitSeenSameState(allButVictim map address: _*)
// still one unreachable
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(node(victim).address)
clusterView.unreachableMembers.head.status must be(MemberStatus.Up)
}
}
@ -123,10 +120,12 @@ abstract class UnreachableNodeJoinsAgainSpec
cluster down victim
}
runOn(allBut(victim): _*) {
awaitMembersUp(roles.size - 1, Set(victim))
val allButVictim = allBut(victim, roles)
runOn(allButVictim: _*) {
// eventually removed
awaitMembersUp(roles.size - 1, Set(victim))
awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds)
awaitAssert(clusterView.members.map(_.address) must be((allButVictim map address).toSet))
}

View file

@ -21,6 +21,7 @@ import akka.routing.RoundRobinRouter
import akka.routing.RoutedActorRef
import akka.routing.RouterRoutees
import akka.testkit._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig {
@ -86,6 +87,8 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig {
nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]"""))
nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]"""))
testTransport(on = true)
}
class ClusterRoundRobinRoutedActorMultiJvmNode1 extends ClusterRoundRobinRoutedActorSpec
@ -300,6 +303,31 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
enterBarrier("after-8")
}
"remove routees for unreachable nodes, and add when reachable again" taggedAs LongRunningTest in within(30.seconds) {
// myservice is already running
def routees = currentRoutees(router4)
def routeeAddresses = (routees map fullAddress).toSet
runOn(first) {
// 4 nodes, 1 routee on each node
awaitAssert(currentRoutees(router4).size must be(4))
testConductor.blackhole(first, second, Direction.Both).await
awaitAssert(routees.size must be(3))
routeeAddresses must not contain (address(second))
testConductor.passThrough(first, second, Direction.Both).await
awaitAssert(routees.size must be(4))
routeeAddresses must contain(address(second))
}
enterBarrier("after-9")
}
"deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
muteMarkingAsUnreachable()
@ -313,7 +341,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
val downRoutee = routees.find(_.path.address == downAddress).get
cluster.down(downAddress)
expectMsgType[Terminated].actor must be(downRoutee)
expectMsgType[Terminated](15.seconds).actor must be(downRoutee)
awaitAssert {
routeeAddresses must contain(notUsedAddress)
routeeAddresses must not contain (downAddress)
@ -330,7 +358,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
replies.values.sum must be(iterationCount)
}
enterBarrier("after-9")
enterBarrier("after-10")
}
}

View file

@ -30,7 +30,6 @@ class ClusterConfigSpec extends AkkaSpec {
PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second)
HeartbeatInterval must be(1 second)
NumberOfEndHeartbeats must be(8)
MonitoredByNrOfMembers must be(5)
HeartbeatRequestDelay must be(10 seconds)
HeartbeatExpectedResponseAfter must be(3 seconds)

View file

@ -65,13 +65,33 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
}
"be produced for members in unreachable" in {
val g1 = Gossip(members = SortedSet(aUp, bUp), overview = GossipOverview(unreachable = Set(cUp, eUp)))
val g2 = Gossip(members = SortedSet(aUp), overview = GossipOverview(unreachable = Set(cUp, bDown, eDown)))
val reachability1 = Reachability.empty.
unreachable(aUp.uniqueAddress, cUp.uniqueAddress).
unreachable(aUp.uniqueAddress, eUp.uniqueAddress)
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, eUp), overview = GossipOverview(reachability = reachability1))
val reachability2 = reachability1.
unreachable(aUp.uniqueAddress, bDown.uniqueAddress)
val g2 = Gossip(members = SortedSet(aUp, cUp, bDown, eDown), overview = GossipOverview(reachability = reachability2))
diffUnreachable(g1, g2) must be(Seq(UnreachableMember(bDown)))
diffSeen(g1, g2) must be(Seq.empty)
}
"be produced for members becoming reachable after unreachable" in {
val reachability1 = Reachability.empty.
unreachable(aUp.uniqueAddress, cUp.uniqueAddress).reachable(aUp.uniqueAddress, cUp.uniqueAddress).
unreachable(aUp.uniqueAddress, eUp.uniqueAddress).
unreachable(aUp.uniqueAddress, bUp.uniqueAddress)
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, eUp), overview = GossipOverview(reachability = reachability1))
val reachability2 = reachability1.
unreachable(aUp.uniqueAddress, cUp.uniqueAddress).
reachable(aUp.uniqueAddress, bUp.uniqueAddress)
val g2 = Gossip(members = SortedSet(aUp, cUp, bUp, eUp), overview = GossipOverview(reachability = reachability2))
diffUnreachable(g1, g2) must be(Seq(UnreachableMember(cUp)))
diffReachable(g1, g2) must be(Seq(ReachableMember(bUp)))
}
"be produced for removed members" in {
val (g1, _) = converge(Gossip(members = SortedSet(aUp, dExiting)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp)))

View file

@ -40,7 +40,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now - 30.seconds).removeOverdueHeartbeatRequest()
s.heartbeatRequest must be(Map.empty)
s.active must be(Set.empty)
s.ending must be(Map(aa -> 0))
s.ending must be(Set(aa))
}
"remove heartbeatRequest after reset" in {
@ -56,22 +56,22 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
"remove heartbeatRequest after removeMember" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)).removeMember(aa)
s.heartbeatRequest must be(Map.empty)
s.ending must be(Map(aa -> 0))
s.ending must be(Set(aa))
}
"remove from ending after addHeartbeatRequest" in {
val s = emptyState.reset(HashSet(aa, bb)).removeMember(aa)
s.ending must be(Map(aa -> 0))
s.ending must be(Set(aa))
val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds)
s2.heartbeatRequest.keySet must be(Set(aa))
s2.ending must be(Map.empty)
s2.ending must be(Set.empty)
}
"include nodes from reset in active set" in {
val nodes = HashSet(aa, bb, cc)
val s = emptyState.reset(nodes)
s.current must be(nodes)
s.ending must be(Map.empty)
s.ending must be(Set.empty)
s.active must be(nodes)
}
@ -85,21 +85,21 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
"move member to ending set when removing member" in {
val nodes = HashSet(aa, bb, cc, dd, ee)
val s = emptyState.reset(nodes)
s.ending must be(Map.empty)
s.ending must be(Set.empty)
val included = s.current.head
val s2 = s.removeMember(included)
s2.ending must be(Map(included -> 0))
s2.ending must be(Set(included))
s2.current must not contain (included)
val s3 = s2.addMember(included)
s3.current must contain(included)
s3.ending.keySet must not contain (included)
s3.ending must not contain (included)
}
"increase ending count correctly" in {
"remove ending correctly" in {
val s = emptyState.reset(HashSet(aa)).removeMember(aa)
s.ending must be(Map(aa -> 0))
val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa)
s2.ending must be(Map(aa -> 2))
s.ending must be(Set(aa))
val s2 = s.removeEnding(aa)
s2.ending must be(Set.empty)
}
}

View file

@ -47,40 +47,17 @@ class GossipSpec extends WordSpec with MustMatchers {
}
"merge unreachable by status priority" in {
val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1)))
val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2)))
"merge unreachable" in {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress).unreachable(b1.uniqueAddress, c1.uniqueAddress)
val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1))
val r2 = Reachability.empty.unreachable(a1.uniqueAddress, d1.uniqueAddress)
val g2 = Gossip(members = SortedSet(a1, b1, c1, d1), overview = GossipOverview(reachability = r2))
val merged1 = g1 merge g2
merged1.overview.unreachable must be(Set(a2, b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
merged1.overview.reachability.allUnreachable must be(Set(a1.uniqueAddress, c1.uniqueAddress, d1.uniqueAddress))
val merged2 = g2 merge g1
merged2.overview.unreachable must be(Set(a2, b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
}
"merge by excluding unreachable from members" in {
val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c1, d1)))
val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2)))
val merged1 = g1 merge g2
merged1.members must be(SortedSet(a2))
merged1.members.toSeq.map(_.status) must be(Seq(Up))
merged1.overview.unreachable must be(Set(b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
val merged2 = g2 merge g1
merged2.members must be(SortedSet(a2))
merged2.members.toSeq.map(_.status) must be(Seq(Up))
merged2.overview.unreachable must be(Set(b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
}
"not have node in both members and unreachable" in intercept[IllegalArgumentException] {
Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2)))
merged2.overview.reachability.allUnreachable must be(merged1.overview.reachability.allUnreachable)
}
"not have live members with wrong status" in intercept[IllegalArgumentException] {
@ -121,9 +98,11 @@ class GossipSpec extends WordSpec with MustMatchers {
"know who is youngest" in {
// a2 and e1 is Joining
val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3)), overview = GossipOverview(unreachable = Set(e1)))
val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability =
Reachability.empty.unreachable(a2.uniqueAddress, e1.uniqueAddress)))
g1.youngestMember must be(b1)
val g2 = Gossip(members = SortedSet(a2), overview = GossipOverview(unreachable = Set(b1.copyUp(3), e1)))
val g2 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability =
Reachability.empty.unreachable(a2.uniqueAddress, b1.uniqueAddress).unreachable(a2.uniqueAddress, e1.uniqueAddress)))
g2.youngestMember must be(b1)
val g3 = Gossip(members = SortedSet(a2, b1.copyUp(3), e2.copyUp(4)))
g3.youngestMember must be(e2)

View file

@ -0,0 +1,130 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.ShouldMatchers
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReachabilityPerfSpec extends WordSpec with ShouldMatchers {
val nodesSize = sys.props.get("akka.cluster.ReachabilityPerfSpec.nodesSize").getOrElse("250").toInt
val iterations = sys.props.get("akka.cluster.ReachabilityPerfSpec.iterations").getOrElse("10000").toInt
val address = Address("akka.tcp", "sys", "a", 2552)
val node = Address("akka.tcp", "sys", "a", 2552)
def createReachabilityOfSize(base: Reachability, size: Int): Reachability =
(base /: (1 to size)) {
case (r, i)
val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i)
val j = if (i == size) 1 else i + 1
val subject = UniqueAddress(address.copy(host = Some("node-" + j)), j)
r.unreachable(observer, subject).reachable(observer, subject)
}
def addUnreachable(base: Reachability, count: Int): Reachability = {
val observers = base.allObservers.take(count)
val subjects = Stream.continually(base.allObservers).flatten.iterator
(base /: observers) {
case (r, o)
(r /: (1 to 5)) { case (r, _) r.unreachable(o, subjects.next()) }
}
}
val reachability1 = createReachabilityOfSize(Reachability.empty, nodesSize)
val reachability2 = createReachabilityOfSize(reachability1, nodesSize)
val reachability3 = addUnreachable(reachability1, nodesSize / 2)
val allowed = reachability1.allObservers
def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) Unit, times: Int): Unit = {
for (i 1 to times) {
thunk(Reachability(r1.records, r1.versions), Reachability(r2.records, r2.versions))
}
}
def checkThunkFor(r1: Reachability, thunk: Reachability Unit, times: Int): Unit = {
for (i 1 to times) {
thunk(Reachability(r1.records, r1.versions))
}
}
def merge(expectedRecords: Int)(r1: Reachability, r2: Reachability): Unit = {
r1.merge(allowed, r2).records.size should be(expectedRecords)
}
def checkStatus(r1: Reachability): Unit = {
val record = r1.records.head
r1.status(record.observer, record.subject) should be(record.status)
}
def checkAggregatedStatus(r1: Reachability): Unit = {
val record = r1.records.head
r1.status(record.subject) should be(record.status)
}
def allUnreachableOrTerminated(r1: Reachability): Unit = {
val record = r1.records.head
r1.allUnreachableOrTerminated.isEmpty should be(false)
}
def allUnreachable(r1: Reachability): Unit = {
val record = r1.records.head
r1.allUnreachable.isEmpty should be(false)
}
def recordsFrom(r1: Reachability): Unit = {
r1.allObservers.foreach { o
r1.recordsFrom(o) should not be be(null)
}
}
s"Reachability of size $nodesSize" must {
s"do a warm up run, $iterations times" in {
checkThunkFor(reachability1, reachability2, merge(0), iterations)
}
s"merge with same versions, $iterations times" in {
checkThunkFor(reachability1, reachability1, merge(0), iterations)
}
s"merge with all older versions, $iterations times" in {
checkThunkFor(reachability2, reachability1, merge(0), iterations)
}
s"merge with all newer versions, $iterations times" in {
checkThunkFor(reachability1, reachability2, merge(0), iterations)
}
s"merge with half nodes unreachable, $iterations times" in {
checkThunkFor(reachability1, reachability3, merge(5 * nodesSize / 2), iterations)
}
s"merge with half nodes unreachable opposite $iterations times" in {
checkThunkFor(reachability3, reachability1, merge(5 * nodesSize / 2), iterations)
}
s"check status with half nodes unreachable, $iterations times" in {
checkThunkFor(reachability3, checkStatus, iterations)
}
s"check aggregated reachability status with half nodes unreachable, $iterations times" in {
checkThunkFor(reachability3, checkAggregatedStatus, iterations)
}
s"get allUnreachableOrTerminated with half nodes unreachable, $iterations times" in {
checkThunkFor(reachability3, allUnreachableOrTerminated, iterations)
}
s"get allUnreachable with half nodes unreachable, $iterations times" in {
checkThunkFor(reachability3, allUnreachable, iterations)
}
s"get recordsFrom with half nodes unreachable, $iterations times" in {
checkThunkFor(reachability3, recordsFrom, iterations)
}
}
}

View file

@ -0,0 +1,192 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReachabilitySpec extends WordSpec with MustMatchers {
import Reachability.{ Reachable, Unreachable, Terminated, Record }
val nodeA = UniqueAddress(Address("akka.tcp", "sys", "a", 2552), 1)
val nodeB = UniqueAddress(Address("akka.tcp", "sys", "b", 2552), 2)
val nodeC = UniqueAddress(Address("akka.tcp", "sys", "c", 2552), 3)
val nodeD = UniqueAddress(Address("akka.tcp", "sys", "d", 2552), 4)
val nodeE = UniqueAddress(Address("akka.tcp", "sys", "e", 2552), 5)
"Reachability table" must {
"be reachable when empty" in {
val r = Reachability.empty
r.isReachable(nodeA) must be(true)
r.allUnreachable must be(Set.empty)
}
"be unreachable when one observed unreachable" in {
val r = Reachability.empty.unreachable(nodeB, nodeA)
r.isReachable(nodeA) must be(false)
r.allUnreachable must be(Set(nodeA))
}
"not be reachable when terminated" in {
val r = Reachability.empty.terminated(nodeB, nodeA)
r.isReachable(nodeA) must be(false)
// allUnreachable doesn't include terminated
r.allUnreachable must be(Set.empty)
r.allUnreachableOrTerminated must be(Set(nodeA))
}
"not change terminated entry" in {
val r = Reachability.empty.terminated(nodeB, nodeA)
r.reachable(nodeB, nodeA) must be theSameInstanceAs (r)
r.unreachable(nodeB, nodeA) must be theSameInstanceAs (r)
}
"not change when same status" in {
val r = Reachability.empty.unreachable(nodeB, nodeA)
r.unreachable(nodeB, nodeA) must be theSameInstanceAs (r)
}
"be unreachable when some observed unreachable and others reachable" in {
val r = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeA).reachable(nodeD, nodeA)
r.isReachable(nodeA) must be(false)
}
"be reachable when all observed reachable again" in {
val r = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeA).
reachable(nodeB, nodeA).reachable(nodeC, nodeA).
unreachable(nodeB, nodeC).unreachable(nodeC, nodeB)
r.isReachable(nodeA) must be(true)
}
"be pruned when all records of an observer are Reachable" in {
val r = Reachability.empty.
unreachable(nodeB, nodeA).unreachable(nodeB, nodeC).
unreachable(nodeD, nodeC).
reachable(nodeB, nodeA).reachable(nodeB, nodeC)
r.isReachable(nodeA) must be(true)
r.isReachable(nodeC) must be(false)
r.records must be(Vector(Record(nodeD, nodeC, Unreachable, 1L)))
val r2 = r.unreachable(nodeB, nodeD).unreachable(nodeB, nodeE)
r2.records.toSet must be(Set(
Record(nodeD, nodeC, Unreachable, 1L),
Record(nodeB, nodeD, Unreachable, 5L),
Record(nodeB, nodeE, Unreachable, 6L)))
}
"have correct aggregated status" in {
val records = Vector(
Reachability.Record(nodeA, nodeB, Reachable, 2),
Reachability.Record(nodeC, nodeB, Unreachable, 2),
Reachability.Record(nodeA, nodeD, Unreachable, 3),
Reachability.Record(nodeD, nodeB, Terminated, 4))
val versions = Map(nodeA -> 3L, nodeC -> 3L, nodeD -> 4L)
val r = Reachability(records, versions)
r.status(nodeA) must be(Reachable)
r.status(nodeB) must be(Terminated)
r.status(nodeD) must be(Unreachable)
}
"have correct status for a mix of nodes" in {
val r = Reachability.empty.
unreachable(nodeB, nodeA).unreachable(nodeC, nodeA).unreachable(nodeD, nodeA).
unreachable(nodeC, nodeB).reachable(nodeC, nodeB).unreachable(nodeD, nodeB).
unreachable(nodeD, nodeC).reachable(nodeD, nodeC).
reachable(nodeE, nodeD).
unreachable(nodeA, nodeE).terminated(nodeB, nodeE)
r.status(nodeB, nodeA) must be(Unreachable)
r.status(nodeC, nodeA) must be(Unreachable)
r.status(nodeD, nodeA) must be(Unreachable)
r.status(nodeC, nodeB) must be(Reachable)
r.status(nodeD, nodeB) must be(Unreachable)
r.status(nodeA, nodeE) must be(Unreachable)
r.status(nodeB, nodeE) must be(Terminated)
r.isReachable(nodeA) must be(false)
r.isReachable(nodeB) must be(false)
r.isReachable(nodeC) must be(true)
r.isReachable(nodeD) must be(true)
r.isReachable(nodeE) must be(false)
r.allUnreachable must be(Set(nodeA, nodeB))
r.allUnreachableFrom(nodeA) must be(Set(nodeE))
r.allUnreachableFrom(nodeB) must be(Set(nodeA))
r.allUnreachableFrom(nodeC) must be(Set(nodeA))
r.allUnreachableFrom(nodeD) must be(Set(nodeA, nodeB))
r.observersGroupedByUnreachable must be(Map(
nodeA -> Set(nodeB, nodeC, nodeD),
nodeB -> Set(nodeD),
nodeE -> Set(nodeA)))
}
"merge by picking latest version of each record" in {
val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD)
val r2 = r1.reachable(nodeB, nodeA).unreachable(nodeD, nodeE).unreachable(nodeC, nodeA)
val merged = r1.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r2)
merged.status(nodeB, nodeA) must be(Reachable)
merged.status(nodeC, nodeA) must be(Unreachable)
merged.status(nodeC, nodeD) must be(Unreachable)
merged.status(nodeD, nodeE) must be(Unreachable)
merged.status(nodeE, nodeA) must be(Reachable)
merged.isReachable(nodeA) must be(false)
merged.isReachable(nodeD) must be(false)
merged.isReachable(nodeE) must be(false)
val merged2 = r2.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r1)
merged2.records.toSet must be(merged.records.toSet)
}
"merge correctly after pruning" in {
val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD)
val r2 = r1.unreachable(nodeA, nodeE)
val r3 = r1.reachable(nodeB, nodeA) // nodeB pruned
val merged = r2.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r3)
merged.records.toSet must be(Set(
Record(nodeA, nodeE, Unreachable, 1),
Record(nodeC, nodeD, Unreachable, 1)))
val merged3 = r3.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r2)
merged3.records.toSet must be(merged.records.toSet)
}
"merge versions correctly" in {
val r1 = Reachability(Vector.empty, Map(nodeA -> 3L, nodeB -> 5L, nodeC -> 7L))
val r2 = Reachability(Vector.empty, Map(nodeA -> 6L, nodeB -> 2L, nodeD -> 1L))
val merged = r1.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r2)
val expected = Map(nodeA -> 6L, nodeB -> 5L, nodeC -> 7L, nodeD -> 1L)
merged.versions must be(expected)
val merged2 = r2.merge(Set(nodeA, nodeB, nodeC, nodeD, nodeE), r1)
merged2.versions must be(expected)
}
"remove node" in {
val r = Reachability.empty.
unreachable(nodeB, nodeA).
unreachable(nodeC, nodeD).
unreachable(nodeB, nodeC).
unreachable(nodeB, nodeE).
remove(Set(nodeA, nodeB))
r.status(nodeB, nodeA) must be(Reachable)
r.status(nodeC, nodeD) must be(Unreachable)
r.status(nodeB, nodeC) must be(Reachable)
r.status(nodeB, nodeE) must be(Reachable)
}
}
}

View file

@ -44,6 +44,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec {
checkSerialization(InternalClusterAction.InitJoinNack(address))
checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address))
checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address))
checkSerialization(ClusterHeartbeatReceiver.EndHeartbeatAck(address))
checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address))
val node1 = VectorClock.Node("node1")
@ -52,7 +53,8 @@ class ClusterMessageSerializerSpec extends AkkaSpec {
val node4 = VectorClock.Node("node4")
val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1 :+ node2).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
val g2 = (g1 :+ node3 :+ node4).seen(a1.uniqueAddress).seen(c1.uniqueAddress)
val g3 = g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1)))
val reachability3 = Reachability.empty.unreachable(a1.uniqueAddress, e1.uniqueAddress).unreachable(b1.uniqueAddress, e1.uniqueAddress)
val g3 = g2.copy(members = SortedSet(a1, b1, c1, d1, e1), overview = g2.overview.copy(reachability = reachability3))
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1))
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2))
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3))

View file

@ -6,7 +6,6 @@ package akka.cluster.routing
import akka.testkit._
import akka.actor._
import akka.routing.RoundRobinRouter
import akka.cluster.routing.ClusterRouterConfig
import akka.actor.OneForOneStrategy
object ClusterRouterSupervisorSpec {

View file

@ -94,8 +94,8 @@ by all other nodes in the cluster. Convergence is implemented by passing a map f
node to current state version during gossip. This information is referred to as the
gossip overview. When all versions in the overview are equal there is convergence.
Gossip convergence cannot occur while any nodes are ``unreachable``. The nodes need
to be moved to the ``down`` or ``removed`` states (see the `Membership Lifecycle`_
section below).
to become ``reachable`` again, or moved to the ``down`` and ``removed`` states
(see the `Membership Lifecycle`_ section below).
Failure Detector
@ -127,9 +127,17 @@ In a cluster each node is monitored by a few (default maximum 5) other nodes, an
any of these detects the node as ``unreachable`` that information will spread to
the rest of the cluster through the gossip. In other words, only one node needs to
mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``.
Right now there is no way for a node to come back from ``unreachable``. This is planned
for the next release of Akka. It also means that the ``unreachable`` node needs to be moved
to the ``down`` or ``removed`` states (see the `Membership Lifecycle`_ section below).
The failure detector will also detect if the node becomes ``reachable`` again. When
all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again
the cluster, after gossip dissemination, will consider it as ``reachable``.
If system messages cannot be delivered to a node it will be quarantined and then it
cannot come back from ``unreachable``. This can happen if the there are too many
unacknowledged system messages (e.g. watch, Terminated, remote actor deployment,
failures of actors supervised by remote parent). Then the node needs to be moved
to the ``down`` or ``removed`` states (see the `Membership Lifecycle`_ section below)
and the actor system must be restarted before it can join the cluster again.
.. _The Phi Accrual Failure Detector: http://ddg.jaist.ac.jp/pub/HDY+04.pdf
@ -221,8 +229,8 @@ from the cluster, marking it as ``removed``.
If a node is ``unreachable`` then gossip convergence is not possible and therefore
any ``leader`` actions are also not possible (for instance, allowing a node to
become a part of the cluster). To be able to move forward the state of the
``unreachable`` nodes must be changed. Currently the only way forward is to mark the
node as ``down``. If the node is to join the cluster again the actor system must be
``unreachable`` nodes must be changed. It must become ``reachable`` again or marked
as ``down``. If the node is to join the cluster again the actor system must be
restarted and go through the joining process again. The cluster can, through the
leader, also *auto-down* a node.
@ -292,8 +300,10 @@ Failure Detection and Unreachability
causing the monitored node to be marked as unreachable
- unreachable*
unreachable is not a real member state but more of a flag in addition
to the state signaling that the cluster is unable to talk to this node
unreachable is not a real member states but more of a flag in addition
to the state signaling that the cluster is unable to talk to this node,
after beeing unreachable the failure detector may detect it as reachable
again and thereby remove the flag
Future Cluster Enhancements and Additions
@ -649,4 +659,3 @@ storage on top of the Akka Cluster as described in this document are:
* Actor handoff
* Actor rebalancing
* Stateful actor replication
* Node becoming ``reachable`` after it has been marked as ``unreachable``

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

After

Width:  |  Height:  |  Size: 29 KiB

Before After
Before After

File diff suppressed because it is too large Load diff

Before

Width:  |  Height:  |  Size: 78 KiB

After

Width:  |  Height:  |  Size: 76 KiB

Before After
Before After

View file

@ -141,10 +141,10 @@ Automatic vs. Manual Downing
When a member is considered by the failure detector to be unreachable the
leader is not allowed to perform its duties, such as changing status of
new joining members to 'Up'. The status of the unreachable member must be
changed to 'Down'. This can be performed automatically or manually. By
default it must be done manually, using using :ref:`cluster_jmx_java` or
:ref:`cluster_command_line_java`.
new joining members to 'Up'. The node must first become reachable again, or the
status of the unreachable member must be changed to 'Down'. Changing status to 'Down'
can be performed automatically or manually. By default it must be done manually, using
:ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`.
It can also be performed programatically with ``Cluster.get(system).down(address)``.
@ -194,10 +194,13 @@ receive ``MemberUp`` for that node, and other nodes.
The events to track the life-cycle of members are:
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``
Note that the node might already have been shutdown when this event is published on another node.
* ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster.
* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector.
* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable, detected by the failure detector
of at least one other node.
* ``ClusterEvent.ReachableMember`` - A member is considered as reachable again, after having been unreachable.
All nodes that previously detected it as unreachable has detected it as reachable again.
There are more types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
@ -324,6 +327,22 @@ See :ref:`cluster-client` in the contrib module.
Failure Detector
^^^^^^^^^^^^^^^^
In a cluster each node is monitored by a few (default maximum 5) other nodes, and when
any of these detects the node as ``unreachable`` that information will spread to
the rest of the cluster through the gossip. In other words, only one node needs to
mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``.
The failure detector will also detect if the node becomes ``reachable`` again. When
all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again
the cluster, after gossip dissemination, will consider it as ``reachable``.
If system messages cannot be delivered to a node it will be quarantined and then it
cannot come back from ``unreachable``. This can happen if the there are too many
unacknowledged system messages (e.g. watch, Terminated, remote actor deployment,
failures of actors supervised by remote parent). Then the node needs to be moved
to the ``down`` or ``removed`` states and the actor system must be restarted before
it can join the cluster again.
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
unreachable from the rest of the cluster. The heartbeat arrival times is interpreted
by an implementation of
@ -384,9 +403,10 @@ Cluster Aware Routers
All :ref:`routers <routing-java>` can be made aware of member nodes in the cluster, i.e.
deploying new routees or looking up routees on nodes in the cluster.
When a node becomes unavailable or leaves the cluster the routees of that node are
When a node becomes unreachable or leaves the cluster the routees of that node are
automatically unregistered from the router. When new nodes join the cluster additional
routees are added to the router, according to the configuration.
routees are added to the router, according to the configuration. Routees are also added
when a node becomes reachable again, after having been unreachable.
There are two distinct types of routers.

View file

@ -134,10 +134,10 @@ Automatic vs. Manual Downing
When a member is considered by the failure detector to be unreachable the
leader is not allowed to perform its duties, such as changing status of
new joining members to 'Up'. The status of the unreachable member must be
changed to 'Down'. This can be performed automatically or manually. By
default it must be done manually, using using :ref:`cluster_jmx_scala` or
:ref:`cluster_command_line_scala`.
new joining members to 'Up'. The node must first become reachable again, or the
status of the unreachable member must be changed to 'Down'. Changing status to 'Down'
can be performed automatically or manually. By default it must be done manually, using
:ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`.
It can also be performed programatically with ``Cluster(system).down(address)``.
@ -187,10 +187,13 @@ receive ``MemberUp`` for that node, and other nodes.
The events to track the life-cycle of members are:
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``
Note that the node might already have been shutdown when this event is published on another node.
* ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster.
* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector.
* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable, detected by the failure detector
of at least one other node.
* ``ClusterEvent.ReachableMember`` - A member is considered as reachable again, after having been unreachable.
All nodes that previously detected it as unreachable has detected it as reachable again.
There are more types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
@ -312,6 +315,22 @@ See :ref:`cluster-client` in the contrib module.
Failure Detector
^^^^^^^^^^^^^^^^
In a cluster each node is monitored by a few (default maximum 5) other nodes, and when
any of these detects the node as ``unreachable`` that information will spread to
the rest of the cluster through the gossip. In other words, only one node needs to
mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``.
The failure detector will also detect if the node becomes ``reachable`` again. When
all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again
the cluster, after gossip dissemination, will consider it as ``reachable``.
If system messages cannot be delivered to a node it will be quarantined and then it
cannot come back from ``unreachable``. This can happen if the there are too many
unacknowledged system messages (e.g. watch, Terminated, remote actor deployment,
failures of actors supervised by remote parent). Then the node needs to be moved
to the ``down`` or ``removed`` states and the actor system must be restarted before
it can join the cluster again.
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
unreachable from the rest of the cluster. The heartbeat arrival times is interpreted
by an implementation of
@ -375,9 +394,10 @@ Cluster Aware Routers
All :ref:`routers <routing-scala>` can be made aware of member nodes in the cluster, i.e.
deploying new routees or looking up routees on nodes in the cluster.
When a node becomes unavailable or leaves the cluster the routees of that node are
When a node becomes unreachable or leaves the cluster the routees of that node are
automatically unregistered from the router. When new nodes join the cluster additional
routees are added to the router, according to the configuration.
routees are added to the router, according to the configuration. Routees are also added
when a node becomes reachable again, after having been unreachable.
There are two distinct types of routers.

View file

@ -100,7 +100,9 @@ class PhiAccrualFailureDetector(
private val state = new AtomicReference[State](State(history = firstHeartbeat, timestamp = None))
override def isAvailable: Boolean = phi < threshold
override def isAvailable: Boolean = isAvailable(clock())
private def isAvailable(timestamp: Long): Boolean = phi(timestamp) < threshold
override def isMonitoring: Boolean = state.get.timestamp.nonEmpty
@ -118,7 +120,9 @@ class PhiAccrualFailureDetector(
case Some(latestTimestamp)
// this is a known connection
val interval = timestamp - latestTimestamp
oldState.history :+ interval
// don't use the first heartbeat after failure for the history, since a long pause will skew the stats
if (isAvailable(timestamp)) oldState.history :+ interval
else oldState.history
}
val newState = oldState.copy(history = newHistory, timestamp = Some(timestamp)) // record new timestamp
@ -133,13 +137,15 @@ class PhiAccrualFailureDetector(
* If a connection does not have any records in failure detector then it is
* considered healthy.
*/
def phi: Double = {
def phi: Double = phi(clock())
private def phi(timestamp: Long): Double = {
val oldState = state.get
val oldTimestamp = oldState.timestamp
if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timeDiff = clock() - oldTimestamp.get
val timeDiff = timestamp - oldTimestamp.get
val history = oldState.history
val mean = history.mean

View file

@ -400,10 +400,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case HopelessAssociation(localAddress, remoteAddress, Some(uid), _)
settings.QuarantineDuration match {
case d: FiniteDuration
log.warning("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
"from this situation.", remoteAddress, uid)
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
case _ // disabled
}
context.system.eventStream.publish(AddressTerminated(remoteAddress))
@ -488,9 +486,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Some(endpoint) context.stop(endpoint)
case _ // nothing to stop
}
log.info("Address [{}] is now quarantined, all messages to this address will be delivered to dead letters.",
address)
endpoints.markAsQuarantined(address, uid, Deadline.now + d)
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
case _ // Ignore
}

View file

@ -78,6 +78,15 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE
override def toString: String = s"Remoting error: [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]"
}
@SerialVersionUID(1L)
case class QuarantinedEvent(address: Address, uid: Int) extends RemotingLifecycleEvent {
override def logLevel: Logging.LogLevel = Logging.WarningLevel
override val toString: String =
s"Association to [$address] having UID [$uid] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
"from this situation."
}
/**
* INTERNAL API
*/

View file

@ -26,7 +26,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") {
def createFailureDetector(
threshold: Double = 8.0,
maxSampleSize: Int = 1000,
minStdDeviation: FiniteDuration = 10.millis,
minStdDeviation: FiniteDuration = 100.millis,
acceptableLostDuration: FiniteDuration = Duration.Zero,
firstHeartbeatEstimate: FiniteDuration = 1.second,
clock: Clock = FailureDetector.defaultClock) =
@ -120,19 +120,21 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") {
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 7000, 100, 1000, 100, 100)
val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval))
// 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger unreachable again
val regularIntervals = 0L +: Vector.fill(999)(1000L)
val timeIntervals = regularIntervals :+ (5 * 60 * 1000L) :+ 100L :+ 900L :+ 100L :+ 7000L :+ 100L :+ 900L :+ 100L :+ 900L
val fd = createFailureDetector(threshold = 8, acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeIntervals))
fd.heartbeat() //0
fd.heartbeat() //1000
fd.heartbeat() //1100
fd.isAvailable must be(true) //1200
fd.isAvailable must be(false) //8200
fd.heartbeat() //8300
fd.heartbeat() //9300
fd.heartbeat() //9400
fd.isAvailable must be(true) //9500
for (_ 0 until 1000) fd.heartbeat()
fd.isAvailable must be(false) // after the long pause
fd.heartbeat()
fd.isAvailable must be(true)
fd.heartbeat()
fd.isAvailable must be(false) // after the 7 seconds pause
fd.heartbeat()
fd.isAvailable must be(true)
fd.heartbeat()
fd.isAvailable must be(true)
}
"accept some configured missing heartbeats" in {
@ -164,7 +166,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") {
}
"use maxSampleSize heartbeats" in {
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 500, 500, 500, 500, 500)
val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval))
// 100 ms interval
@ -173,12 +175,12 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") {
fd.heartbeat() //200
fd.heartbeat() //300
val phi1 = fd.phi //400
// 1000 ms interval, should become same phi when 100 ms intervals have been dropped
// 500 ms interval, should become same phi when 100 ms intervals have been dropped
fd.heartbeat() //1000
fd.heartbeat() //1500
fd.heartbeat() //2000
fd.heartbeat() //3000
fd.heartbeat() //4000
val phi2 = fd.phi //5000
fd.heartbeat() //2500
val phi2 = fd.phi //3000
phi2 must be(phi1.plusOrMinus(0.001))
}
@ -211,7 +213,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") {
val history5 = history4 :+ 80
history5.mean must be(103.333333 plusOrMinus 0.00001)
history5.variance must be(688.88888889 plusOrMinus 0.00001)
}
}
}