diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f6055f0a23..a2c64b75cd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -4,1347 +4,33 @@ package akka.cluster -import akka.actor._ -import akka.actor.Status._ +import java.io.Closeable +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.immutable.SortedSet + import akka.ConfigurationException +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl +import akka.actor.Address +import akka.actor.Cancellable +import akka.actor.DefaultScheduler +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.Props +import akka.actor.Scheduler import akka.dispatch.Await import akka.dispatch.MonitorableThreadFactory import akka.event.Logging -import akka.jsr166y.ThreadLocalRandom -import akka.pattern._ -import akka.remote._ -import akka.routing._ -import akka.util._ -import akka.util.duration._ +import akka.pattern.ask +import akka.remote.RemoteActorRefProvider +import akka.util.Duration import akka.util.internal.HashedWheelTimer -import com.google.protobuf.ByteString -import java.io.Closeable -import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } -import java.util.concurrent.TimeoutException -import java.util.concurrent.TimeUnit._ -import MemberStatus._ -import scala.annotation.tailrec -import scala.collection.immutable.{ Map, SortedSet } -import scala.collection.GenTraversableOnce -import java.util.concurrent.atomic.AtomicLong -import java.security.MessageDigest - -/** - * Interface for membership change listener. - */ -trait MembershipChangeListener { - def notify(members: SortedSet[Member]): Unit -} - -/** - * Interface for meta data change listener. - */ -trait MetaDataChangeListener { - def notify(meta: Map[String, Array[Byte]]): Unit -} - -/** - * Base trait for all cluster messages. All ClusterMessage's are serializable. - * - * FIXME Protobuf all ClusterMessages - */ -sealed trait ClusterMessage extends Serializable - -/** - * Cluster commands sent by the USER. - */ -object ClusterUserAction { - - /** - * Command to join the cluster. Sent when a node (represented by 'address') - * wants to join another node (the receiver). - */ - case class Join(address: Address) extends ClusterMessage - - /** - * Command to leave the cluster. - */ - case class Leave(address: Address) extends ClusterMessage - - /** - * Command to mark node as temporary down. - */ - case class Down(address: Address) extends ClusterMessage - -} - -/** - * INTERNAL API - */ -private[cluster] object InternalClusterAction { - - /** - * Command to initiate join another node (represented by 'address'). - * Join will be sent to the other node. - */ - case class JoinTo(address: Address) extends ClusterMessage - - /** - * Start message of the process to join one of the seed nodes. - * The node sends `InitJoin` to all seed nodes, which replies - * with `InitJoinAck`. The first reply is used others are discarded. - * The node sends `Join` command to the seed node that replied first. - */ - case object JoinSeedNode extends ClusterMessage - - /** - * @see JoinSeedNode - */ - case object InitJoin extends ClusterMessage - - /** - * @see JoinSeedNode - */ - case class InitJoinAck(address: Address) extends ClusterMessage - - /** - * - * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] - * to the other node. - * Local only, no need to serialize. - */ - case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) - - case object GossipTick - - case object HeartbeatTick - - case object ReapUnreachableTick - - case object LeaderActionsTick - - case object PublishStateTick - - case class SendClusterMessage(to: Address, msg: ClusterMessage) - - case class SendGossipTo(address: Address) - - case object GetClusterCoreRef - - case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage - case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage - -} - -/** - * INTERNAL API. - * - * Cluster commands sent by the LEADER. - */ -private[cluster] object ClusterLeaderAction { - - /** - * Command to mark a node to be removed from the cluster immediately. - * Can only be sent by the leader. - */ - case class Exit(address: Address) extends ClusterMessage - - /** - * Command to remove a node from the cluster immediately. - */ - case class Remove(address: Address) extends ClusterMessage -} - -/** - * Represents the address and the current status of a cluster member node. - * - * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`. - */ -class Member(val address: Address, val status: MemberStatus) extends ClusterMessage { - override def hashCode = address.## - override def equals(other: Any) = Member.unapply(this) == Member.unapply(other) - override def toString = "Member(address = %s, status = %s)" format (address, status) - def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status) -} - -/** - * Module with factory and ordering methods for Member instances. - */ -object Member { - - /** - * `Address` ordering type class, sorts addresses by host and port. - */ - implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ - if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 - else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0) - else false - } - - /** - * `Member` ordering type class, sorts members by host and port with the exception that - * it puts all members that are in MemberStatus.EXITING last. - */ - implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒ - if (a.status == Exiting && b.status != Exiting) false - else if (a.status != Exiting && b.status == Exiting) true - else addressOrdering.compare(a.address, b.address) < 0 - } - - def apply(address: Address, status: MemberStatus): Member = new Member(address, status) - - def unapply(other: Any) = other match { - case m: Member ⇒ Some(m.address) - case _ ⇒ None - } - - def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { - // group all members by Address => Seq[Member] - val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) - // pick highest MemberStatus - (Set.empty[Member] /: groupedByAddress) { - case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) - } - } - - /** - * Picks the Member with the highest "priority" MemberStatus. - */ - def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match { - case (Removed, _) ⇒ m1 - case (_, Removed) ⇒ m2 - case (Down, _) ⇒ m1 - case (_, Down) ⇒ m2 - case (Exiting, _) ⇒ m1 - case (_, Exiting) ⇒ m2 - case (Leaving, _) ⇒ m1 - case (_, Leaving) ⇒ m2 - case (Up, Joining) ⇒ m2 - case (Joining, Up) ⇒ m1 - case (Joining, Joining) ⇒ m1 - case (Up, Up) ⇒ m1 - } - - // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 - // SortedSet + and ++ operators replaces existing element - // Use these :+ and :++ operators for the Gossip members - implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) - class SortedSetWorkaround(sortedSet: SortedSet[Member]) { - implicit def :+(elem: Member): SortedSet[Member] = { - if (sortedSet.contains(elem)) sortedSet - else sortedSet + elem - } - - implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = - sortedSet ++ (elems.toSet diff sortedSet) - } -} - -/** - * Envelope adding a sender address to the gossip. - */ -case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage - -/** - * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected - * it's forwarded to the leader for conflict resolution. - */ -case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage - -/** - * Defines the current status of a cluster member node - * - * Can be one of: Joining, Up, Leaving, Exiting and Down. - */ -sealed trait MemberStatus extends ClusterMessage { - - /** - * Using the same notion for 'unavailable' as 'non-convergence': DOWN - */ - def isUnavailable: Boolean = this == Down -} - -object MemberStatus { - case object Joining extends MemberStatus - case object Up extends MemberStatus - case object Leaving extends MemberStatus - case object Exiting extends MemberStatus - case object Down extends MemberStatus - case object Removed extends MemberStatus -} - -/** - * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. - */ -case class GossipOverview( - seen: Map[Address, VectorClock] = Map.empty, - unreachable: Set[Member] = Set.empty) { - - def isNonDownUnreachable(address: Address): Boolean = - unreachable.exists { m ⇒ m.address == address && m.status != Down } - - override def toString = - "GossipOverview(seen = [" + seen.mkString(", ") + - "], unreachable = [" + unreachable.mkString(", ") + - "])" -} - -object Gossip { - val emptyMembers: SortedSet[Member] = SortedSet.empty - -} - -/** - * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - - * all versioned by a vector clock. - * - * When a node is joining the `Member`, with status `Joining`, is added to `members`. - * If the joining node was downed it is moved from `overview.unreachable` (status `Down`) - * to `members` (status `Joining`). It cannot rejoin if not first downed. - * - * When convergence is reached the leader change status of `members` from `Joining` - * to `Up`. - * - * When failure detector consider a node as unavailable it will be moved from - * `members` to `overview.unreachable`. - * - * When a node is downed, either manually or automatically, its status is changed to `Down`. - * It is also removed from `overview.seen` table. The node will reside as `Down` in the - * `overview.unreachable` set until joining again and it will then go through the normal - * joining procedure. - * - * When a `Gossip` is received the version (vector clock) is used to determine if the - * received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip` - * and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without - * same history. When merged the seen table is cleared. - * - * When a node is told by the user to leave the cluster the leader will move it to `Leaving` - * and then rebalance and repartition the cluster and start hand-off by migrating the actors - * from the leaving node to the new partitions. Once this process is complete the leader will - * move the node to the `Exiting` state and once a convergence is complete move the node to - * `Removed` by removing it from the `members` set and sending a `Removed` command to the - * removed node telling it to shut itself down. - */ -case class Gossip( - overview: GossipOverview = GossipOverview(), - members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address - meta: Map[String, Array[Byte]] = Map.empty, - version: VectorClock = VectorClock()) // vector clock version - extends ClusterMessage // is a serializable cluster message - with Versioned[Gossip] { - - // FIXME can be disabled as optimization - assertInvariants - - private def assertInvariants: Unit = { - val unreachableAndLive = members.intersect(overview.unreachable) - if (unreachableAndLive.nonEmpty) - throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" - format unreachableAndLive.mkString(", ")) - - val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) - def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) - if (members exists hasNotAllowedLiveMemberStatus) - throw new IllegalArgumentException("Live members must have status [%s], got [%s]" - format (allowedLiveMemberStatuses.mkString(", "), - (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) - - val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address) - if (seenButNotMember.nonEmpty) - throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" - format seenButNotMember.mkString(", ")) - - } - - /** - * Increments the version for this 'Node'. - */ - def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node) - - /** - * Adds a member to the member node ring. - */ - def :+(member: Member): Gossip = { - if (members contains member) this - else this copy (members = members :+ member) - } - - /** - * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen' - * Map with the VectorClock (version) for the new gossip. - */ - def seen(address: Address): Gossip = { - if (overview.seen.contains(address) && overview.seen(address) == version) this - else this copy (overview = overview copy (seen = overview.seen + (address -> version))) - } - - /** - * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories. - */ - def merge(that: Gossip): Gossip = { - import Member.ordering - - // 1. merge vector clocks - val mergedVClock = this.version merge that.version - - // 2. merge meta-data - val mergedMeta = this.meta ++ that.meta - - // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) - - // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, - // and exclude unreachable - val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) - - // 5. fresh seen table - val mergedSeen = Map.empty[Address, VectorClock] - - Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) - } - - /** - * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence - - * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down). - * - * @returns Some(convergedGossip) if convergence have been reached and None if not - */ - def convergence: Boolean = { - val unreachable = overview.unreachable - val seen = overview.seen - - // First check that: - // 1. we don't have any members that are unreachable, or - // 2. all unreachable members in the set have status DOWN - // Else we can't continue to check for convergence - // When that is done we check that all the entries in the 'seen' table have the same vector clock version - // and that all members exists in seen table - val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down } - def allMembersInSeen = members.forall(m ⇒ seen.contains(m.address)) - - def seenSame: Boolean = - if (seen.isEmpty) false - else { - val values = seen.values - val seenHead = values.head - values.forall(_ == seenHead) - } - - !hasUnreachable && allMembersInSeen && seenSame - } - - def isLeader(address: Address): Boolean = - members.nonEmpty && (address == members.head.address) - - def leader: Option[Address] = members.headOption.map(_.address) - - def isSingletonCluster: Boolean = members.size == 1 - - /** - * Returns true if the node is UP or JOINING. - */ - def isAvailable(address: Address): Boolean = !isUnavailable(address) - - def isUnavailable(address: Address): Boolean = { - val isUnreachable = overview.unreachable exists { _.address == address } - val hasUnavailableMemberStatus = members exists { m ⇒ m.status.isUnavailable && m.address == address } - isUnreachable || hasUnavailableMemberStatus - } - - def member(address: Address): Member = { - members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)). - getOrElse(Member(address, Removed)) - } - - override def toString = - "Gossip(" + - "overview = " + overview + - ", members = [" + members.mkString(", ") + - "], meta = [" + meta.mkString(", ") + - "], version = " + version + - ")" -} - -/** - * Sent at regular intervals for failure detection. - */ -case class Heartbeat(from: Address) extends ClusterMessage - -/** - * INTERNAL API - */ -private[cluster] case class ClusterStats( - receivedGossipCount: Long = 0L, - mergeConflictCount: Long = 0L, - mergeCount: Long = 0L, - mergeDetectedCount: Long = 0L) { - - def incrementReceivedGossipCount(): ClusterStats = - copy(receivedGossipCount = receivedGossipCount + 1) - - def incrementMergeConflictCount(): ClusterStats = - copy(mergeConflictCount = mergeConflictCount + 1) - - def incrementMergeCount(): ClusterStats = - copy(mergeCount = mergeCount + 1) - - def incrementMergeDetectedCount(): ClusterStats = - copy(mergeDetectedCount = mergeDetectedCount + 1) -} - -/** - * INTERNAL API. - * - * Receives Heartbeat messages and delegates to Cluster. - * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized - * to Cluster message after message, but concurrent with other types of messages. - */ -private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Actor with ActorLogging { - - def receive = { - case Heartbeat(from) ⇒ cluster.failureDetector heartbeat from - } - -} - -/* - * This actor is responsible for sending the heartbeat messages to - * other nodes. Netty blocks when sending to broken connections. This actor - * isolates sending to different nodes by using child workers for each target - * address and thereby reduce the risk of irregular heartbeats to healty - * nodes due to broken connections to other nodes. - */ -private[cluster] final class ClusterHeartbeatSender(cluster: Cluster) extends Actor with ActorLogging { - - import InternalClusterAction._ - - /** - * Looks up and returns the remote cluster heartbeat connection for the specific address. - */ - def clusterHeartbeatConnectionFor(address: Address): ActorRef = - context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") - - val digester = MessageDigest.getInstance("MD5") - - /** - * Child name is MD5 hash of the address. - * FIXME Change to URLEncode when ticket #2123 has been fixed - */ - def encodeChildName(name: String): String = { - digester update name.getBytes("UTF-8") - digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString - } - - def receive = { - case msg @ SendHeartbeat(from, to, deadline) ⇒ - val workerName = encodeChildName(to.toString) - val worker = context.actorFor(workerName) match { - case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker( - cluster.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) - case child ⇒ child - } - worker ! msg - } - -} - -/** - * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. - * - * Netty blocks when sending to broken connections, and this actor uses - * a configurable circuit breaker to reduce connect attempts to broken - * connections. - * - * @see ClusterHeartbeatSender - */ -private[cluster] final class ClusterHeartbeatSenderWorker( - cbSettings: CircuitBreakerSettings, toRef: ActorRef) - extends Actor with ActorLogging { - - import InternalClusterAction._ - - val breaker = CircuitBreaker(context.system.scheduler, - cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). - onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). - onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). - onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) - - context.setReceiveTimeout(30 seconds) - - def receive = { - case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ - if (!deadline.isOverdue) { - // the CircuitBreaker will measure elapsed time and open if too many long calls - try breaker.withSyncCircuitBreaker { - log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) - toRef ! heartbeatMsg - if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) - } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } - - // make sure it will cleanup when not used any more - context.setReceiveTimeout(30 seconds) - } - - case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used - - } -} - -/** - * INTERNAL API. - */ -private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging { - import InternalClusterAction._ - - /** - * Looks up and returns the remote cluster command connection for the specific address. - */ - private def clusterCoreConnectionFor(address: Address): ActorRef = - context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "core") - - def receive = { - case SendClusterMessage(to, msg) ⇒ - log.debug("Cluster Node [{}] - Trying to send [{}] to [{}]", selfAddress, msg.getClass.getSimpleName, to) - clusterCoreConnectionFor(to) ! msg - } -} - -/** - * INTERNAL API. - */ -private[cluster] final class ClusterCore(cluster: Cluster) extends Actor with ActorLogging { - // FIXME break up the cluster constructor parameter into something that is easier to test without Cluster - import ClusterLeaderAction._ - import InternalClusterAction._ - - import cluster.settings._ - import cluster.selfAddress - import cluster.clusterScheduler - - val vclockNode = VectorClock.Node(selfAddress.toString) - val selfHeartbeat = Heartbeat(selfAddress) - - // note that self is not initially member, - // and the Gossip is not versioned for this 'Node' yet - var latestGossip: Gossip = Gossip() - var joinInProgress: Map[Address, Deadline] = Map.empty - - var stats = ClusterStats() - - val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(cluster)). - withDispatcher(UseDispatcher), name = "heartbeatSender") - val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). - withDispatcher(UseDispatcher), name = "coreSender") - - // start periodic gossip to random nodes in cluster - val gossipTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { - self ! GossipTick - } - - // start periodic heartbeat to all nodes in cluster - val heartbeatTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { - self ! HeartbeatTick - } - - // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - val failureDetectorReaperTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { - self ! ReapUnreachableTick - } - - // start periodic leader action management (only applies for the current leader) - private val leaderActionsTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { - self ! LeaderActionsTick - } - - // start periodic publish of current state - private val publishStateTask: Option[Cancellable] = - if (PublishStateInterval == Duration.Zero) None - else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) { - self ! PublishStateTick - }) - - override def preStart(): Unit = { - if (AutoJoin) self ! InternalClusterAction.JoinSeedNode - } - - override def postStop(): Unit = { - gossipTask.cancel() - heartbeatTask.cancel() - failureDetectorReaperTask.cancel() - leaderActionsTask.cancel() - publishStateTask foreach { _.cancel() } - } - - def receive = { - case JoinSeedNode ⇒ joinSeedNode() - case InitJoin ⇒ initJoin() - case InitJoinAck(address) ⇒ join(address) - case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() - case JoinTo(address) ⇒ join(address) - case ClusterUserAction.Join(address) ⇒ joining(address) - case ClusterUserAction.Down(address) ⇒ downing(address) - case ClusterUserAction.Leave(address) ⇒ leaving(address) - case Exit(address) ⇒ exiting(address) - case Remove(address) ⇒ removing(address) - case msg: GossipEnvelope ⇒ receiveGossip(msg) - case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) - case GossipTick ⇒ gossip() - case HeartbeatTick ⇒ heartbeat() - case ReapUnreachableTick ⇒ reapUnreachableMembers() - case LeaderActionsTick ⇒ leaderActions() - case SendGossipTo(address) ⇒ gossipTo(address) - case PublishStateTick ⇒ publishState() - case p: Ping ⇒ ping(p) - - } - - def joinSeedNode(): Unit = { - val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) - yield self.path.toStringWithAddress(address) - if (seedRoutees.isEmpty) { - cluster join cluster.selfAddress - } else { - implicit val within = Timeout(cluster.settings.SeedNodeTimeout) - val seedRouter = context.actorOf( - Props.empty.withRouter(ScatterGatherFirstCompletedRouter( - routees = seedRoutees, within = within.duration))) - seedRouter ? InitJoin pipeTo self - seedRouter ! PoisonPill - } - } - - def initJoin(): Unit = sender ! InitJoinAck(cluster.selfAddress) - - def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress - - /** - * Try to join this cluster node with the node specified by 'address'. - * A 'Join(thisNodeAddress)' command is sent to the node to join. - */ - def join(address: Address): Unit = { - val localGossip = latestGossip - // wipe our state since a node that joins a cluster must be empty - latestGossip = Gossip() - joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout)) - - // wipe the failure detector since we are starting fresh and shouldn't care about the past - cluster.failureDetector.reset() - - notifyListeners(localGossip) - - val command = ClusterUserAction.Join(selfAddress) - coreSender ! SendClusterMessage(address, command) - } - - /** - * State transition to JOINING - new node joining. - */ - def joining(node: Address): Unit = { - val localGossip = latestGossip - val localMembers = localGossip.members - val localUnreachable = localGossip.overview.unreachable - - val alreadyMember = localMembers.exists(_.address == node) - val isUnreachable = localGossip.overview.isNonDownUnreachable(node) - - if (!alreadyMember && !isUnreachable) { - - // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } - val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - - // remove the node from the failure detector if it is a DOWN node that is rejoining cluster - if (rejoiningMember.nonEmpty) cluster.failureDetector.remove(node) - - // add joining node as Joining - // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) - val newGossip = localGossip copy (overview = newOverview, members = newMembers) - - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress - - latestGossip = seenVersionedGossip - - log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) - // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens - if (node != selfAddress) { - cluster.failureDetector heartbeat node - gossipTo(node) - } - - notifyListeners(localGossip) - } - } - - /** - * State transition to LEAVING. - */ - def leaving(address: Address): Unit = { - val localGossip = latestGossip - if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) - val newMembers = localGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING - val newGossip = localGossip copy (members = newMembers) - - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress - - latestGossip = seenVersionedGossip - - log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) - notifyListeners(localGossip) - } - } - - /** - * State transition to EXITING. - */ - def exiting(address: Address): Unit = { - log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address) - // FIXME implement when we implement hand-off - } - - /** - * State transition to REMOVED. - * - * This method is for now only called after the LEADER have sent a Removed message - telling the node - * to shut down himself. - * - * In the future we might change this to allow the USER to send a Removed(address) message telling an - * arbitrary node to be moved direcly from UP -> REMOVED. - */ - def removing(address: Address): Unit = { - log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) - val localGossip = latestGossip - // just cleaning up the gossip state - latestGossip = Gossip() - // make sure the final (removed) state is always published - notifyListeners(localGossip) - cluster.shutdown() - } - - /** - * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there) - * and its status is set to DOWN. The node is also removed from the 'seen' table. - * - * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly - * to this node and it will then go through the normal JOINING procedure. - */ - def downing(address: Address): Unit = { - val localGossip = latestGossip - val localMembers = localGossip.members - val localOverview = localGossip.overview - val localSeen = localOverview.seen - val localUnreachableMembers = localOverview.unreachable - - // 1. check if the node to DOWN is in the 'members' set - val downedMember: Option[Member] = localMembers.collectFirst { - case m if m.address == address ⇒ m.copy(status = Down) - } - val newMembers = downedMember match { - case Some(m) ⇒ - log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) - localMembers - m - case None ⇒ localMembers - } - - // 2. check if the node to DOWN is in the 'unreachable' set - val newUnreachableMembers = - localUnreachableMembers.map { member ⇒ - // no need to DOWN members already DOWN - if (member.address == address && member.status != Down) { - log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) - member copy (status = Down) - } else member - } - - // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set. - val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember - - // 4. remove nodes marked as DOWN from the 'seen' table - val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { - case m if m.status == Down ⇒ m.address - } - - // update gossip overview - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) - val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip - val versionedGossip = newGossip :+ vclockNode - latestGossip = versionedGossip seen selfAddress - - notifyListeners(localGossip) - } - - /** - * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected - * it's forwarded to the leader for conflict resolution. Trying to simultaneously - * resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves - * conflicts to limit divergence. To avoid overload there is also a configurable rate - * limit of how many conflicts that are handled by second. If the limit is - * exceeded the conflicting gossip messages are dropped and will reappear later. - */ - def receiveGossipMerge(merge: GossipMergeConflict): Unit = { - stats = stats.incrementMergeConflictCount - val rate = mergeRate(stats.mergeConflictCount) - if (rate <= MaxGossipMergeRate) { - receiveGossip(merge.a.copy(conversation = false)) - receiveGossip(merge.b.copy(conversation = false)) - - // use one-way gossip from leader to reduce load of leader - def sendBack(to: Address): Unit = { - if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to)) - oneWayGossipTo(to) - } - - sendBack(merge.a.from) - sendBack(merge.b.from) - - } else { - log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate) - } - } - - /** - * Receive new gossip. - */ - def receiveGossip(envelope: GossipEnvelope): Unit = { - val from = envelope.from - val remoteGossip = envelope.gossip - val localGossip = latestGossip - - if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { - // FIXME how should we handle this situation? - log.debug("Received gossip with self as unreachable, from [{}]", from) - - } else if (!localGossip.overview.isNonDownUnreachable(from)) { - - // leader handles merge conflicts, or when they have different views of how is leader - val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader - val conflict = remoteGossip.version <> localGossip.version - - if (conflict && !handleMerge) { - // delegate merge resolution to leader to reduce number of simultaneous resolves, - // which will result in new conflicts - - stats = stats.incrementMergeDetectedCount - log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from) - - stats = stats.incrementMergeConflictCount - val rate = mergeRate(stats.mergeConflictCount) - if (rate <= MaxGossipMergeRate) { - coreSender ! SendClusterMessage( - to = localGossip.leader.get, - msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) - } else { - log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) - } - - } else { - - val winningGossip = - - if (conflict) { - // conflicting versions, merge, and new version - val mergedGossip = remoteGossip merge localGossip - mergedGossip :+ vclockNode - - } else if (remoteGossip.version < localGossip.version) { - // local gossip is newer - localGossip - - } else if (!remoteGossip.members.exists(_.address == selfAddress)) { - // FIXME This is a very strange. It can happen when many nodes join at the same time. - // It's not detected as an ordinary version conflict <> - // If we don't handle this situation there will be IllegalArgumentException when marking this as seen - // merge, and new version - val mergedGossip = remoteGossip merge (localGossip :+ Member(selfAddress, Joining)) - mergedGossip :+ vclockNode - - } else { - // remote gossip is newer - remoteGossip - - } - - val newJoinInProgress = - if (joinInProgress.isEmpty) joinInProgress - else joinInProgress -- - winningGossip.members.map(_.address) -- - winningGossip.overview.unreachable.map(_.address) - - latestGossip = winningGossip seen selfAddress - joinInProgress = newJoinInProgress - - // for all new joining nodes we remove them from the failure detector - (latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node ⇒ - cluster.failureDetector.remove(node.address) - } - - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - - if (conflict) { - stats = stats.incrementMergeCount - log.debug( - """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", - remoteGossip, localGossip, winningGossip) - } - - stats = stats.incrementReceivedGossipCount - notifyListeners(localGossip) - - if (envelope.conversation && - (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { - // send back gossip to sender when sender had different view, i.e. merge, or sender had - // older or sender had newer - gossipTo(from) - } - } - } - } - - def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis - - /** - * Initiates a new round of gossip. - */ - def gossip(): Unit = { - stats = stats.copy(mergeConflictCount = 0) - - log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) - - if (!isSingletonCluster && isAvailable) { - val localGossip = latestGossip - // important to not accidentally use `map` of the SortedSet, since the original order is not preserved - val localMembers = localGossip.members.toIndexedSeq - val localMembersSize = localMembers.size - val localMemberAddresses = localMembers map { _.address } - - val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq - val localUnreachableSize = localUnreachableMembers.size - - // gossip to a random alive member with preference to a member - // with older or newer gossip version - val nodesWithdifferentView = { - val localMemberAddressesSet = localGossip.members map { _.address } - for { - (address, version) ← localGossip.overview.seen - if localMemberAddressesSet contains address - if version != localGossip.version - } yield address - } - val gossipedToAlive = - if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) - gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) - else - gossipToRandomNodeOf(localMemberAddresses) - - } - } - - /** - * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. - */ - def leaderActions(): Unit = { - val localGossip = latestGossip - val localMembers = localGossip.members - - val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) - - if (isLeader && isAvailable) { - // only run the leader actions if we are the LEADER and available - - val localOverview = localGossip.overview - val localSeen = localOverview.seen - val localUnreachableMembers = localOverview.unreachable - val hasPartionHandoffCompletedSuccessfully: Boolean = { - // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully - true - } - - // Leader actions are as follows: - // 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table - // 2. Move JOINING => UP -- When a node joins the cluster - // 3. Move LEAVING => EXITING -- When all partition handoff has completed - // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader - // 5. Store away all stuff needed for the side-effecting processing in 10. - // 6. Updating the vclock version for the changes - // 7. Updating the 'seen' table - // 8. Try to update the state with the new gossip - // 9. If failure - retry - // 10. If success - run all the side-effecting processing - - val ( - newGossip: Gossip, - hasChangedState: Boolean, - upMembers, - exitingMembers, - removedMembers, - unreachableButNotDownedMembers) = - - if (localGossip.convergence) { - // we have convergence - so we can't have unreachable nodes - - // transform the node member ring - filterNot/map/map - val newMembers = - localMembers filterNot { member ⇒ - // ---------------------- - // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table - // ---------------------- - member.status == MemberStatus.Exiting - - } map { member ⇒ - // ---------------------- - // 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) - // ---------------------- - if (member.status == Joining) member copy (status = Up) - else member - - } map { member ⇒ - // ---------------------- - // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) - // ---------------------- - if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting) - else member - } - - // ---------------------- - // 5. Store away all stuff needed for the side-effecting processing in 10. - // ---------------------- - - // Check for the need to do side-effecting on successful state change - // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED - // to check for state-changes and to store away removed and exiting members for later notification - // 1. check for state-changes to update - // 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending - val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting) - - val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) - - val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) - - val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty - - // removing REMOVED nodes from the 'seen' table - val newSeen = localSeen -- removedMembers.map(_.address) - - // removing REMOVED nodes from the 'unreachable' set - val newUnreachableMembers = localUnreachableMembers -- removedMembers - - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview - val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip - - (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member]) - - } else if (AutoDown) { - // we don't have convergence - so we might have unreachable nodes - - // if 'auto-down' is turned on, then try to auto-down any unreachable nodes - val newUnreachableMembers = localUnreachableMembers.map { member ⇒ - // ---------------------- - // 5. Move UNREACHABLE => DOWN (auto-downing by leader) - // ---------------------- - if (member.status == Down) member // no need to DOWN members already DOWN - else member copy (status = Down) - } - - // Check for the need to do side-effecting on successful state change - val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down) - - // removing nodes marked as DOWN from the 'seen' table - val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address } - - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview - val newGossip = localGossip copy (overview = newOverview) // update gossip - - (newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers) - - } else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member]) - - if (hasChangedState) { // we have a change of state - version it and try to update - // ---------------------- - // 6. Updating the vclock version for the changes - // ---------------------- - val versionedGossip = newGossip :+ vclockNode - - // ---------------------- - // 7. Updating the 'seen' table - // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED - // ---------------------- - val seenVersionedGossip = - if (removedMembers.exists(_.address == selfAddress)) versionedGossip - else versionedGossip seen selfAddress - - // ---------------------- - // 8. Update the state with the new gossip - // ---------------------- - latestGossip = seenVersionedGossip - - // ---------------------- - // 9. Run all the side-effecting processing - // ---------------------- - - // log the move of members from joining to up - upMembers foreach { member ⇒ log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) } - - // tell all removed members to remove and shut down themselves - removedMembers foreach { member ⇒ - val address = member.address - log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address) - coreSender ! SendClusterMessage( - to = address, - msg = ClusterLeaderAction.Remove(address)) - } - - // tell all exiting members to exit - exitingMembers foreach { member ⇒ - val address = member.address - log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address) - coreSender ! SendClusterMessage( - to = address, - msg = ClusterLeaderAction.Exit(address)) // FIXME should use ? to await completion of handoff? - } - - // log the auto-downing of the unreachable nodes - unreachableButNotDownedMembers foreach { member ⇒ - log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) - } - - notifyListeners(localGossip) - } - } - } - - def heartbeat(): Unit = { - removeOverdueJoinInProgress() - - val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys - - val deadline = Deadline.now + HeartbeatInterval - for (address ← beatTo; if address != selfAddress) - heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) - } - - /** - * Removes overdue joinInProgress from State. - */ - def removeOverdueJoinInProgress(): Unit = { - val overdueJoins = joinInProgress collect { - case (address, deadline) if deadline.isOverdue ⇒ address - } - if (overdueJoins.nonEmpty) { - joinInProgress = joinInProgress -- overdueJoins - } - } - - /** - * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. - */ - def reapUnreachableMembers(): Unit = { - - if (!isSingletonCluster && isAvailable) { - // only scrutinize if we are a non-singleton cluster and available - - val localGossip = latestGossip - val localOverview = localGossip.overview - val localMembers = localGossip.members - val localUnreachableMembers = localGossip.overview.unreachable - - val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ - member.address == selfAddress || cluster.failureDetector.isAvailable(member.address) - } - - if (newlyDetectedUnreachableMembers.nonEmpty) { - - val newMembers = localMembers -- newlyDetectedUnreachableMembers - val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers - - val newOverview = localOverview copy (unreachable = newUnreachableMembers) - val newGossip = localGossip copy (overview = newOverview, members = newMembers) - - // updating vclock and 'seen' table - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress - - latestGossip = seenVersionedGossip - - log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) - - notifyListeners(localGossip) - } - } - } - - def seedNodes: IndexedSeq[Address] = cluster.seedNodes - - def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = - if (addresses.isEmpty) None - else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) - - def isSingletonCluster: Boolean = latestGossip.isSingletonCluster - - def isAvailable: Boolean = latestGossip.isAvailable(selfAddress) - - /** - * Gossips latest gossip to a random member in the set of members passed in as argument. - * - * @return the used [[akka.actor.Address] if any - */ - private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = { - log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", ")) - val peers = addresses filterNot (_ == selfAddress) // filter out myself - val peer = selectRandomNode(peers) - peer foreach gossipTo - peer - } - - /** - * Gossips latest gossip to an address. - */ - def gossipTo(address: Address): Unit = - gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true)) - - def oneWayGossipTo(address: Address): Unit = - gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) - - def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) { - coreSender ! SendClusterMessage(address, gossipMsg) - } - - def notifyListeners(oldGossip: Gossip): Unit = { - if (PublishStateInterval == Duration.Zero) publishState() - - val oldMembersStatus = oldGossip.members.map(m ⇒ (m.address, m.status)) - val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) - if (newMembersStatus != oldMembersStatus) - cluster notifyMembershipChangeListeners latestGossip.members - } - - def publishState(): Unit = { - cluster._latestGossip = latestGossip - cluster._latestStats = stats - } - - def ping(p: Ping): Unit = sender ! Pong(p) -} - -/** - * INTERNAL API. - * - * Supervisor managing the different Cluster daemons. - */ -private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor with ActorLogging { - - val configuredDispatcher = cluster.settings.UseDispatcher - val core = context.actorOf(Props(new ClusterCore(cluster)). - withDispatcher(configuredDispatcher), name = "core") - val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). - withDispatcher(configuredDispatcher), name = "heartbeat") - - def receive = { - case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core - } - -} /** * Cluster Extension Id and factory for creating Cluster extension. @@ -1387,7 +73,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode ⇒ +class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { /** * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. @@ -1400,7 +86,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider] - val remoteSettings = new RemoteSettings(system.settings.config, system.name) val settings = new ClusterSettings(system.settings.config, system.name) import settings._ @@ -1415,7 +100,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * Read only view of cluster state, updated periodically by - * ClusterCore. Access with `latestGossip`. + * ClusterCoreDaemon. Access with `latestGossip`. */ @volatile private[cluster] var _latestGossip: Gossip = Gossip() @@ -1423,7 +108,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * INTERNAL API * Read only view of internal cluster stats, updated periodically by - * ClusterCore. Access with `latestStats`. + * ClusterCoreDaemon. Access with `latestStats`. */ @volatile private[cluster] var _latestStats = ClusterStats() @@ -1435,8 +120,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * INTERNAL API */ - private[cluster] val clusterScheduler: Scheduler with Closeable = { - // FIXME consider moving clusterScheduler to ClusterCore actor + private[cluster] val scheduler: Scheduler with Closeable = { if (system.settings.SchedulerTickDuration > SchedulerTickDuration) { log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", @@ -1473,7 +157,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemonSupervisor(this)). + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)). withDispatcher(UseDispatcher), name = "cluster") } @@ -1618,7 +302,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!clusterDaemons.isTerminated) system.stop(clusterDaemons) - clusterScheduler.close() + scheduler.close() clusterJmx.unregisterMBean() @@ -1639,4 +323,28 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private[cluster] def latestStats: ClusterStats = _latestStats + /** + * INTERNAL API + */ + private[cluster] def publishLatestGossip(gossip: Gossip): Unit = _latestGossip = gossip + + /** + * INTERNAL API + */ + private[cluster] def publishLatestStats(stats: ClusterStats): Unit = _latestStats = stats + } + +/** + * Interface for membership change listener. + */ +trait MembershipChangeListener { + def notify(members: SortedSet[Member]): Unit +} + +/** + * Interface for meta data change listener. + */ +trait MetaDataChangeListener { + def notify(meta: Map[String, Array[Byte]]): Unit +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala new file mode 100644 index 0000000000..95fab750ca --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -0,0 +1,926 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Cancellable +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.Status.Failure +import akka.actor.PoisonPill +import akka.actor.Scheduler +import akka.routing.ScatterGatherFirstCompletedRouter +import akka.util.Deadline +import akka.util.Duration +import akka.util.Timeout +import akka.jsr166y.ThreadLocalRandom +import akka.pattern.AskTimeoutException +import akka.pattern.ask +import akka.pattern.pipe +import MemberStatus._ + +/** + * Base trait for all cluster messages. All ClusterMessage's are serializable. + * + * FIXME Protobuf all ClusterMessages + */ +trait ClusterMessage extends Serializable + +/** + * Cluster commands sent by the USER. + */ +object ClusterUserAction { + + /** + * Command to join the cluster. Sent when a node (represented by 'address') + * wants to join another node (the receiver). + */ + case class Join(address: Address) extends ClusterMessage + + /** + * Command to leave the cluster. + */ + case class Leave(address: Address) extends ClusterMessage + + /** + * Command to mark node as temporary down. + */ + case class Down(address: Address) extends ClusterMessage + +} + +/** + * INTERNAL API + */ +private[cluster] object InternalClusterAction { + + /** + * Command to initiate join another node (represented by 'address'). + * Join will be sent to the other node. + */ + case class JoinTo(address: Address) extends ClusterMessage + + /** + * Start message of the process to join one of the seed nodes. + * The node sends `InitJoin` to all seed nodes, which replies + * with `InitJoinAck`. The first reply is used others are discarded. + * The node sends `Join` command to the seed node that replied first. + */ + case object JoinSeedNode extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case object InitJoin extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case class InitJoinAck(address: Address) extends ClusterMessage + + case object GossipTick + + case object HeartbeatTick + + case object ReapUnreachableTick + + case object LeaderActionsTick + + case object PublishStateTick + + case class SendClusterMessage(to: Address, msg: ClusterMessage) + + case class SendGossipTo(address: Address) + + case object GetClusterCoreRef + + case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage + case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage + +} + +/** + * INTERNAL API. + * + * Cluster commands sent by the LEADER. + */ +private[cluster] object ClusterLeaderAction { + + /** + * Command to mark a node to be removed from the cluster immediately. + * Can only be sent by the leader. + */ + case class Exit(address: Address) extends ClusterMessage + + /** + * Command to remove a node from the cluster immediately. + */ + case class Remove(address: Address) extends ClusterMessage +} + +/** + * INTERNAL API + * + * The contextual pieces that ClusterDaemon actors need. + * Makes it easier to test the actors without using the Cluster extension. + */ +private[cluster] trait ClusterEnvironment { + private[cluster] def settings: ClusterSettings + private[cluster] def failureDetector: FailureDetector + private[cluster] def selfAddress: Address + private[cluster] def scheduler: Scheduler + private[cluster] def seedNodes: IndexedSeq[Address] + private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit + private[cluster] def publishLatestGossip(gossip: Gossip): Unit + private[cluster] def publishLatestStats(stats: ClusterStats): Unit + private[cluster] def shutdown(): Unit +} + +/** + * INTERNAL API. + * + * Supervisor managing the different Cluster daemons. + */ +private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { + + val configuredDispatcher = environment.settings.UseDispatcher + val core = context.actorOf(Props(new ClusterCoreDaemon(environment)). + withDispatcher(configuredDispatcher), name = "core") + val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)). + withDispatcher(configuredDispatcher), name = "heartbeat") + + def receive = { + case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core + } + +} + +/** + * INTERNAL API. + */ +private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { + // FIXME break up the cluster constructor parameter into something that is easier to test without Cluster + import ClusterLeaderAction._ + import InternalClusterAction._ + import ClusterHeartbeatSender._ + + def selfAddress = environment.selfAddress + def clusterScheduler = environment.scheduler + def failureDetector = environment.failureDetector + val settings = environment.settings + import settings._ + + val vclockNode = VectorClock.Node(selfAddress.toString) + val selfHeartbeat = Heartbeat(selfAddress) + + // note that self is not initially member, + // and the Gossip is not versioned for this 'Node' yet + var latestGossip: Gossip = Gossip() + var joinInProgress: Map[Address, Deadline] = Map.empty + + var stats = ClusterStats() + + val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)). + withDispatcher(UseDispatcher), name = "heartbeatSender") + val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). + withDispatcher(UseDispatcher), name = "coreSender") + + // start periodic gossip to random nodes in cluster + val gossipTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { + self ! GossipTick + } + + // start periodic heartbeat to all nodes in cluster + val heartbeatTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { + self ! HeartbeatTick + } + + // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) + val failureDetectorReaperTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { + self ! ReapUnreachableTick + } + + // start periodic leader action management (only applies for the current leader) + private val leaderActionsTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { + self ! LeaderActionsTick + } + + // start periodic publish of current state + private val publishStateTask: Option[Cancellable] = + if (PublishStateInterval == Duration.Zero) None + else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) { + self ! PublishStateTick + }) + + override def preStart(): Unit = { + if (AutoJoin) self ! InternalClusterAction.JoinSeedNode + } + + override def postStop(): Unit = { + gossipTask.cancel() + heartbeatTask.cancel() + failureDetectorReaperTask.cancel() + leaderActionsTask.cancel() + publishStateTask foreach { _.cancel() } + } + + def receive = { + case msg: GossipEnvelope ⇒ receiveGossip(msg) + case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) + case GossipTick ⇒ gossip() + case HeartbeatTick ⇒ heartbeat() + case ReapUnreachableTick ⇒ reapUnreachableMembers() + case LeaderActionsTick ⇒ leaderActions() + case PublishStateTick ⇒ publishState() + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ initJoin() + case InitJoinAck(address) ⇒ join(address) + case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() + case JoinTo(address) ⇒ join(address) + case ClusterUserAction.Join(address) ⇒ joining(address) + case ClusterUserAction.Down(address) ⇒ downing(address) + case ClusterUserAction.Leave(address) ⇒ leaving(address) + case Exit(address) ⇒ exiting(address) + case Remove(address) ⇒ removing(address) + case SendGossipTo(address) ⇒ gossipTo(address) + case p: Ping ⇒ ping(p) + + } + + def joinSeedNode(): Unit = { + val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } + if (seedRoutees.isEmpty) { + join(selfAddress) + } else { + implicit val within = Timeout(SeedNodeTimeout) + val seedRouter = context.actorOf( + Props.empty.withRouter(ScatterGatherFirstCompletedRouter( + routees = seedRoutees, within = within.duration))) + seedRouter ? InitJoin pipeTo self + seedRouter ! PoisonPill + } + } + + def initJoin(): Unit = sender ! InitJoinAck(selfAddress) + + def joinSeedNodeTimeout(): Unit = join(selfAddress) + + /** + * Try to join this cluster node with the node specified by 'address'. + * A 'Join(thisNodeAddress)' command is sent to the node to join. + */ + def join(address: Address): Unit = { + val localGossip = latestGossip + // wipe our state since a node that joins a cluster must be empty + latestGossip = Gossip() + joinInProgress = Map(address -> (Deadline.now + JoinTimeout)) + + // wipe the failure detector since we are starting fresh and shouldn't care about the past + failureDetector.reset() + + notifyListeners(localGossip) + + val command = ClusterUserAction.Join(selfAddress) + coreSender ! SendClusterMessage(address, command) + } + + /** + * State transition to JOINING - new node joining. + */ + def joining(node: Address): Unit = { + val localGossip = latestGossip + val localMembers = localGossip.members + val localUnreachable = localGossip.overview.unreachable + + val alreadyMember = localMembers.exists(_.address == node) + val isUnreachable = localGossip.overview.isNonDownUnreachable(node) + + if (!alreadyMember && !isUnreachable) { + + // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster + val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } + val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) + + // remove the node from the failure detector if it is a DOWN node that is rejoining cluster + if (rejoiningMember.nonEmpty) failureDetector.remove(node) + + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) + val newGossip = localGossip copy (overview = newOverview, members = newMembers) + + val versionedGossip = newGossip :+ vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress + + latestGossip = seenVersionedGossip + + log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) + // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens + if (node != selfAddress) { + failureDetector heartbeat node + gossipTo(node) + } + + notifyListeners(localGossip) + } + } + + /** + * State transition to LEAVING. + */ + def leaving(address: Address): Unit = { + val localGossip = latestGossip + if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) + val newMembers = localGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING + val newGossip = localGossip copy (members = newMembers) + + val versionedGossip = newGossip :+ vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress + + latestGossip = seenVersionedGossip + + log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) + notifyListeners(localGossip) + } + } + + /** + * State transition to EXITING. + */ + def exiting(address: Address): Unit = { + log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address) + // FIXME implement when we implement hand-off + } + + /** + * State transition to REMOVED. + * + * This method is for now only called after the LEADER have sent a Removed message - telling the node + * to shut down himself. + * + * In the future we might change this to allow the USER to send a Removed(address) message telling an + * arbitrary node to be moved direcly from UP -> REMOVED. + */ + def removing(address: Address): Unit = { + log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) + val localGossip = latestGossip + // just cleaning up the gossip state + latestGossip = Gossip() + // make sure the final (removed) state is always published + notifyListeners(localGossip) + environment.shutdown() + } + + /** + * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there) + * and its status is set to DOWN. The node is also removed from the 'seen' table. + * + * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly + * to this node and it will then go through the normal JOINING procedure. + */ + def downing(address: Address): Unit = { + val localGossip = latestGossip + val localMembers = localGossip.members + val localOverview = localGossip.overview + val localSeen = localOverview.seen + val localUnreachableMembers = localOverview.unreachable + + // 1. check if the node to DOWN is in the 'members' set + val downedMember: Option[Member] = localMembers.collectFirst { + case m if m.address == address ⇒ m.copy(status = Down) + } + val newMembers = downedMember match { + case Some(m) ⇒ + log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) + localMembers - m + case None ⇒ localMembers + } + + // 2. check if the node to DOWN is in the 'unreachable' set + val newUnreachableMembers = + localUnreachableMembers.map { member ⇒ + // no need to DOWN members already DOWN + if (member.address == address && member.status != Down) { + log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) + member copy (status = Down) + } else member + } + + // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set. + val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember + + // 4. remove nodes marked as DOWN from the 'seen' table + val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { + case m if m.status == Down ⇒ m.address + } + + // update gossip overview + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) + val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip + val versionedGossip = newGossip :+ vclockNode + latestGossip = versionedGossip seen selfAddress + + notifyListeners(localGossip) + } + + /** + * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected + * it's forwarded to the leader for conflict resolution. Trying to simultaneously + * resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves + * conflicts to limit divergence. To avoid overload there is also a configurable rate + * limit of how many conflicts that are handled by second. If the limit is + * exceeded the conflicting gossip messages are dropped and will reappear later. + */ + def receiveGossipMerge(merge: GossipMergeConflict): Unit = { + stats = stats.incrementMergeConflictCount + val rate = mergeRate(stats.mergeConflictCount) + if (rate <= MaxGossipMergeRate) { + receiveGossip(merge.a.copy(conversation = false)) + receiveGossip(merge.b.copy(conversation = false)) + + // use one-way gossip from leader to reduce load of leader + def sendBack(to: Address): Unit = { + if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to)) + oneWayGossipTo(to) + } + + sendBack(merge.a.from) + sendBack(merge.b.from) + + } else { + log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate) + } + } + + /** + * Receive new gossip. + */ + def receiveGossip(envelope: GossipEnvelope): Unit = { + val from = envelope.from + val remoteGossip = envelope.gossip + val localGossip = latestGossip + + if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { + // FIXME how should we handle this situation? + log.debug("Received gossip with self as unreachable, from [{}]", from) + + } else if (!localGossip.overview.isNonDownUnreachable(from)) { + + // leader handles merge conflicts, or when they have different views of how is leader + val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader + val conflict = remoteGossip.version <> localGossip.version + + if (conflict && !handleMerge) { + // delegate merge resolution to leader to reduce number of simultaneous resolves, + // which will result in new conflicts + + stats = stats.incrementMergeDetectedCount + log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from) + + stats = stats.incrementMergeConflictCount + val rate = mergeRate(stats.mergeConflictCount) + if (rate <= MaxGossipMergeRate) { + coreSender ! SendClusterMessage( + to = localGossip.leader.get, + msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) + } else { + log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) + } + + } else { + + val winningGossip = + + if (conflict) { + // conflicting versions, merge, and new version + val mergedGossip = remoteGossip merge localGossip + mergedGossip :+ vclockNode + + } else if (remoteGossip.version < localGossip.version) { + // local gossip is newer + localGossip + + } else if (!remoteGossip.members.exists(_.address == selfAddress)) { + // FIXME This is a very strange. It can happen when many nodes join at the same time. + // It's not detected as an ordinary version conflict <> + // If we don't handle this situation there will be IllegalArgumentException when marking this as seen + // merge, and new version + val mergedGossip = remoteGossip merge (localGossip :+ Member(selfAddress, Joining)) + mergedGossip :+ vclockNode + + } else { + // remote gossip is newer + remoteGossip + + } + + val newJoinInProgress = + if (joinInProgress.isEmpty) joinInProgress + else joinInProgress -- + winningGossip.members.map(_.address) -- + winningGossip.overview.unreachable.map(_.address) + + latestGossip = winningGossip seen selfAddress + joinInProgress = newJoinInProgress + + // for all new joining nodes we remove them from the failure detector + (latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node ⇒ + failureDetector.remove(node.address) + } + + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + + if (conflict) { + stats = stats.incrementMergeCount + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + } + + stats = stats.incrementReceivedGossipCount + notifyListeners(localGossip) + + if (envelope.conversation && + (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) + } + } + } + } + + def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis + + /** + * Initiates a new round of gossip. + */ + def gossip(): Unit = { + stats = stats.copy(mergeConflictCount = 0) + + log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) + + if (!isSingletonCluster && isAvailable) { + val localGossip = latestGossip + // important to not accidentally use `map` of the SortedSet, since the original order is not preserved + val localMembers = localGossip.members.toIndexedSeq + val localMembersSize = localMembers.size + val localMemberAddresses = localMembers map { _.address } + + val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq + val localUnreachableSize = localUnreachableMembers.size + + // gossip to a random alive member with preference to a member + // with older or newer gossip version + val nodesWithdifferentView = { + val localMemberAddressesSet = localGossip.members map { _.address } + for { + (address, version) ← localGossip.overview.seen + if localMemberAddressesSet contains address + if version != localGossip.version + } yield address + } + val gossipedToAlive = + if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) + gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) + else + gossipToRandomNodeOf(localMemberAddresses) + + } + } + + /** + * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. + */ + def leaderActions(): Unit = { + val localGossip = latestGossip + val localMembers = localGossip.members + + val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) + + if (isLeader && isAvailable) { + // only run the leader actions if we are the LEADER and available + + val localOverview = localGossip.overview + val localSeen = localOverview.seen + val localUnreachableMembers = localOverview.unreachable + val hasPartionHandoffCompletedSuccessfully: Boolean = { + // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully + true + } + + // Leader actions are as follows: + // 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table + // 2. Move JOINING => UP -- When a node joins the cluster + // 3. Move LEAVING => EXITING -- When all partition handoff has completed + // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader + // 5. Store away all stuff needed for the side-effecting processing in 10. + // 6. Updating the vclock version for the changes + // 7. Updating the 'seen' table + // 8. Try to update the state with the new gossip + // 9. If failure - retry + // 10. If success - run all the side-effecting processing + + val ( + newGossip: Gossip, + hasChangedState: Boolean, + upMembers, + exitingMembers, + removedMembers, + unreachableButNotDownedMembers) = + + if (localGossip.convergence) { + // we have convergence - so we can't have unreachable nodes + + // transform the node member ring - filterNot/map/map + val newMembers = + localMembers filterNot { member ⇒ + // ---------------------- + // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table + // ---------------------- + member.status == MemberStatus.Exiting + + } map { member ⇒ + // ---------------------- + // 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) + // ---------------------- + if (member.status == Joining) member copy (status = Up) + else member + + } map { member ⇒ + // ---------------------- + // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) + // ---------------------- + if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting) + else member + } + + // ---------------------- + // 5. Store away all stuff needed for the side-effecting processing in 10. + // ---------------------- + + // Check for the need to do side-effecting on successful state change + // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED + // to check for state-changes and to store away removed and exiting members for later notification + // 1. check for state-changes to update + // 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending + val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting) + + val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) + + val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) + + val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty + + // removing REMOVED nodes from the 'seen' table + val newSeen = localSeen -- removedMembers.map(_.address) + + // removing REMOVED nodes from the 'unreachable' set + val newUnreachableMembers = localUnreachableMembers -- removedMembers + + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview + val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip + + (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member]) + + } else if (AutoDown) { + // we don't have convergence - so we might have unreachable nodes + + // if 'auto-down' is turned on, then try to auto-down any unreachable nodes + val newUnreachableMembers = localUnreachableMembers.map { member ⇒ + // ---------------------- + // 5. Move UNREACHABLE => DOWN (auto-downing by leader) + // ---------------------- + if (member.status == Down) member // no need to DOWN members already DOWN + else member copy (status = Down) + } + + // Check for the need to do side-effecting on successful state change + val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down) + + // removing nodes marked as DOWN from the 'seen' table + val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address } + + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview + val newGossip = localGossip copy (overview = newOverview) // update gossip + + (newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers) + + } else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member]) + + if (hasChangedState) { // we have a change of state - version it and try to update + // ---------------------- + // 6. Updating the vclock version for the changes + // ---------------------- + val versionedGossip = newGossip :+ vclockNode + + // ---------------------- + // 7. Updating the 'seen' table + // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED + // ---------------------- + val seenVersionedGossip = + if (removedMembers.exists(_.address == selfAddress)) versionedGossip + else versionedGossip seen selfAddress + + // ---------------------- + // 8. Update the state with the new gossip + // ---------------------- + latestGossip = seenVersionedGossip + + // ---------------------- + // 9. Run all the side-effecting processing + // ---------------------- + + // log the move of members from joining to up + upMembers foreach { member ⇒ log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) } + + // tell all removed members to remove and shut down themselves + removedMembers foreach { member ⇒ + val address = member.address + log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address) + coreSender ! SendClusterMessage( + to = address, + msg = ClusterLeaderAction.Remove(address)) + } + + // tell all exiting members to exit + exitingMembers foreach { member ⇒ + val address = member.address + log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address) + coreSender ! SendClusterMessage( + to = address, + msg = ClusterLeaderAction.Exit(address)) // FIXME should use ? to await completion of handoff? + } + + // log the auto-downing of the unreachable nodes + unreachableButNotDownedMembers foreach { member ⇒ + log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) + } + + notifyListeners(localGossip) + } + } + } + + def heartbeat(): Unit = { + removeOverdueJoinInProgress() + + val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys + + val deadline = Deadline.now + HeartbeatInterval + for (address ← beatTo; if address != selfAddress) + heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) + } + + /** + * Removes overdue joinInProgress from State. + */ + def removeOverdueJoinInProgress(): Unit = { + val overdueJoins = joinInProgress collect { + case (address, deadline) if deadline.isOverdue ⇒ address + } + if (overdueJoins.nonEmpty) { + joinInProgress = joinInProgress -- overdueJoins + } + } + + /** + * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. + */ + def reapUnreachableMembers(): Unit = { + + if (!isSingletonCluster && isAvailable) { + // only scrutinize if we are a non-singleton cluster and available + + val localGossip = latestGossip + val localOverview = localGossip.overview + val localMembers = localGossip.members + val localUnreachableMembers = localGossip.overview.unreachable + + val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ + member.address == selfAddress || failureDetector.isAvailable(member.address) + } + + if (newlyDetectedUnreachableMembers.nonEmpty) { + + val newMembers = localMembers -- newlyDetectedUnreachableMembers + val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers + + val newOverview = localOverview copy (unreachable = newUnreachableMembers) + val newGossip = localGossip copy (overview = newOverview, members = newMembers) + + // updating vclock and 'seen' table + val versionedGossip = newGossip :+ vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress + + latestGossip = seenVersionedGossip + + log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) + + notifyListeners(localGossip) + } + } + } + + def seedNodes: IndexedSeq[Address] = environment.seedNodes + + def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = + if (addresses.isEmpty) None + else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) + + def isSingletonCluster: Boolean = latestGossip.isSingletonCluster + + def isAvailable: Boolean = latestGossip.isAvailable(selfAddress) + + /** + * Gossips latest gossip to a random member in the set of members passed in as argument. + * + * @return the used [[akka.actor.Address] if any + */ + private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = { + log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", ")) + // filter out myself + val peer = selectRandomNode(addresses filterNot (_ == selfAddress)) + peer foreach gossipTo + peer + } + + /** + * Gossips latest gossip to an address. + */ + def gossipTo(address: Address): Unit = + gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true)) + + def oneWayGossipTo(address: Address): Unit = + gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) + + def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) + coreSender ! SendClusterMessage(address, gossipMsg) + + def notifyListeners(oldGossip: Gossip): Unit = { + if (PublishStateInterval == Duration.Zero) publishState() + + val oldMembersStatus = oldGossip.members.map(m ⇒ (m.address, m.status)) + val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) + if (newMembersStatus != oldMembersStatus) + environment notifyMembershipChangeListeners latestGossip.members + } + + def publishState(): Unit = { + environment.publishLatestGossip(latestGossip) + environment.publishLatestStats(stats) + } + + def ping(p: Ping): Unit = sender ! Pong(p) +} + +/** + * INTERNAL API. + */ +private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging { + import InternalClusterAction._ + + /** + * Looks up and returns the remote cluster command connection for the specific address. + */ + private def clusterCoreConnectionFor(address: Address): ActorRef = + context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "core") + + def receive = { + case SendClusterMessage(to, msg) ⇒ + log.debug("Cluster Node [{}] - Trying to send [{}] to [{}]", selfAddress, msg.getClass.getSimpleName, to) + clusterCoreConnectionFor(to) ! msg + } +} + +/** + * INTERNAL API + */ +private[cluster] case class ClusterStats( + receivedGossipCount: Long = 0L, + mergeConflictCount: Long = 0L, + mergeCount: Long = 0L, + mergeDetectedCount: Long = 0L) { + + def incrementReceivedGossipCount(): ClusterStats = + copy(receivedGossipCount = receivedGossipCount + 1) + + def incrementMergeConflictCount(): ClusterStats = + copy(mergeConflictCount = mergeConflictCount + 1) + + def incrementMergeCount(): ClusterStats = + copy(mergeCount = mergeCount + 1) + + def incrementMergeDetectedCount(): ClusterStats = + copy(mergeDetectedCount = mergeDetectedCount + 1) +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala new file mode 100644 index 0000000000..29c4a8f562 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import akka.actor.ReceiveTimeout +import akka.actor.ActorLogging +import java.security.MessageDigest +import akka.pattern.CircuitBreaker +import akka.actor.ActorRef +import akka.pattern.CircuitBreakerOpenException +import akka.actor.Address +import akka.actor.Actor +import akka.actor.RootActorPath +import akka.actor.Props +import akka.util.duration._ +import akka.util.Deadline + +/** + * Sent at regular intervals for failure detection. + */ +case class Heartbeat(from: Address) extends ClusterMessage + +/** + * INTERNAL API. + * + * Receives Heartbeat messages and delegates to Cluster. + * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized + * to Cluster message after message, but concurrent with other types of messages. + */ +private[cluster] final class ClusterHeartbeatDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { + + def receive = { + case Heartbeat(from) ⇒ environment.failureDetector heartbeat from + } + +} + +/** + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSender { + /** + * + * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] + * to the other node. + * Local only, no need to serialize. + */ + case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) +} + +/* + * INTERNAL API + * + * This actor is responsible for sending the heartbeat messages to + * other nodes. Netty blocks when sending to broken connections. This actor + * isolates sending to different nodes by using child workers for each target + * address and thereby reduce the risk of irregular heartbeats to healty + * nodes due to broken connections to other nodes. + */ +private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironment) extends Actor with ActorLogging { + import ClusterHeartbeatSender._ + + /** + * Looks up and returns the remote cluster heartbeat connection for the specific address. + */ + def clusterHeartbeatConnectionFor(address: Address): ActorRef = + context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + + val digester = MessageDigest.getInstance("MD5") + + /** + * Child name is MD5 hash of the address. + * FIXME Change to URLEncode when ticket #2123 has been fixed + */ + def encodeChildName(name: String): String = { + digester update name.getBytes("UTF-8") + digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString + } + + def receive = { + case msg @ SendHeartbeat(from, to, deadline) ⇒ + val workerName = encodeChildName(to.toString) + val worker = context.actorFor(workerName) match { + case notFound if notFound.isTerminated ⇒ + context.actorOf(Props(new ClusterHeartbeatSenderWorker( + environment.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) + case child ⇒ child + } + worker ! msg + } + +} + +/** + * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. + * + * Netty blocks when sending to broken connections, and this actor uses + * a configurable circuit breaker to reduce connect attempts to broken + * connections. + * + * @see ClusterHeartbeatSender + */ +private[cluster] final class ClusterHeartbeatSenderWorker( + cbSettings: CircuitBreakerSettings, toRef: ActorRef) + extends Actor with ActorLogging { + + import ClusterHeartbeatSender._ + + val breaker = CircuitBreaker(context.system.scheduler, + cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). + onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). + onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + + context.setReceiveTimeout(30 seconds) + + def receive = { + case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ + if (!deadline.isOverdue) { + // the CircuitBreaker will measure elapsed time and open if too many long calls + try breaker.withSyncCircuitBreaker { + log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) + toRef ! heartbeatMsg + if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } + + // make sure it will cleanup when not used any more + context.setReceiveTimeout(30 seconds) + } + + case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used + + } +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala new file mode 100644 index 0000000000..aa28cc2c03 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -0,0 +1,212 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.Address +import scala.collection.immutable.SortedSet +import MemberStatus._ + +object Gossip { + val emptyMembers: SortedSet[Member] = SortedSet.empty +} + +/** + * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - + * all versioned by a vector clock. + * + * When a node is joining the `Member`, with status `Joining`, is added to `members`. + * If the joining node was downed it is moved from `overview.unreachable` (status `Down`) + * to `members` (status `Joining`). It cannot rejoin if not first downed. + * + * When convergence is reached the leader change status of `members` from `Joining` + * to `Up`. + * + * When failure detector consider a node as unavailable it will be moved from + * `members` to `overview.unreachable`. + * + * When a node is downed, either manually or automatically, its status is changed to `Down`. + * It is also removed from `overview.seen` table. The node will reside as `Down` in the + * `overview.unreachable` set until joining again and it will then go through the normal + * joining procedure. + * + * When a `Gossip` is received the version (vector clock) is used to determine if the + * received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip` + * and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without + * same history. When merged the seen table is cleared. + * + * When a node is told by the user to leave the cluster the leader will move it to `Leaving` + * and then rebalance and repartition the cluster and start hand-off by migrating the actors + * from the leaving node to the new partitions. Once this process is complete the leader will + * move the node to the `Exiting` state and once a convergence is complete move the node to + * `Removed` by removing it from the `members` set and sending a `Removed` command to the + * removed node telling it to shut itself down. + */ +case class Gossip( + overview: GossipOverview = GossipOverview(), + members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address + meta: Map[String, Array[Byte]] = Map.empty, + version: VectorClock = VectorClock()) // vector clock version + extends ClusterMessage // is a serializable cluster message + with Versioned[Gossip] { + + // FIXME can be disabled as optimization + assertInvariants + + private def assertInvariants: Unit = { + val unreachableAndLive = members.intersect(overview.unreachable) + if (unreachableAndLive.nonEmpty) + throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" + format unreachableAndLive.mkString(", ")) + + val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) + def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) + if (members exists hasNotAllowedLiveMemberStatus) + throw new IllegalArgumentException("Live members must have status [%s], got [%s]" + format (allowedLiveMemberStatuses.mkString(", "), + (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) + + val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address) + if (seenButNotMember.nonEmpty) + throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" + format seenButNotMember.mkString(", ")) + + } + + /** + * Increments the version for this 'Node'. + */ + def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node) + + /** + * Adds a member to the member node ring. + */ + def :+(member: Member): Gossip = { + if (members contains member) this + else this copy (members = members :+ member) + } + + /** + * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen' + * Map with the VectorClock (version) for the new gossip. + */ + def seen(address: Address): Gossip = { + if (overview.seen.contains(address) && overview.seen(address) == version) this + else this copy (overview = overview copy (seen = overview.seen + (address -> version))) + } + + /** + * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories. + */ + def merge(that: Gossip): Gossip = { + import Member.ordering + + // 1. merge vector clocks + val mergedVClock = this.version merge that.version + + // 2. merge meta-data + val mergedMeta = this.meta ++ that.meta + + // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups + val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) + + // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, + // and exclude unreachable + val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + + // 5. fresh seen table + val mergedSeen = Map.empty[Address, VectorClock] + + Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) + } + + /** + * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence - + * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down). + * + * @returns Some(convergedGossip) if convergence have been reached and None if not + */ + def convergence: Boolean = { + val unreachable = overview.unreachable + val seen = overview.seen + + // First check that: + // 1. we don't have any members that are unreachable, or + // 2. all unreachable members in the set have status DOWN + // Else we can't continue to check for convergence + // When that is done we check that all the entries in the 'seen' table have the same vector clock version + // and that all members exists in seen table + val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down } + def allMembersInSeen = members.forall(m ⇒ seen.contains(m.address)) + + def seenSame: Boolean = + if (seen.isEmpty) false + else { + val values = seen.values + val seenHead = values.head + values.forall(_ == seenHead) + } + + !hasUnreachable && allMembersInSeen && seenSame + } + + def isLeader(address: Address): Boolean = + members.nonEmpty && (address == members.head.address) + + def leader: Option[Address] = members.headOption.map(_.address) + + def isSingletonCluster: Boolean = members.size == 1 + + /** + * Returns true if the node is UP or JOINING. + */ + def isAvailable(address: Address): Boolean = !isUnavailable(address) + + def isUnavailable(address: Address): Boolean = { + val isUnreachable = overview.unreachable exists { _.address == address } + val hasUnavailableMemberStatus = members exists { m ⇒ m.status.isUnavailable && m.address == address } + isUnreachable || hasUnavailableMemberStatus + } + + def member(address: Address): Member = { + members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)). + getOrElse(Member(address, Removed)) + } + + override def toString = + "Gossip(" + + "overview = " + overview + + ", members = [" + members.mkString(", ") + + "], meta = [" + meta.mkString(", ") + + "], version = " + version + + ")" +} + +/** + * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. + */ +case class GossipOverview( + seen: Map[Address, VectorClock] = Map.empty, + unreachable: Set[Member] = Set.empty) { + + def isNonDownUnreachable(address: Address): Boolean = + unreachable.exists { m ⇒ m.address == address && m.status != Down } + + override def toString = + "GossipOverview(seen = [" + seen.mkString(", ") + + "], unreachable = [" + unreachable.mkString(", ") + + "])" +} + +/** + * Envelope adding a sender address to the gossip. + */ +case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage + +/** + * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected + * it's forwarded to the leader for conflict resolution. + */ +case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage + diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala new file mode 100644 index 0000000000..e4fa9f379e --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import scala.collection.immutable.SortedSet +import scala.collection.GenTraversableOnce +import akka.actor.Address +import MemberStatus._ + +/** + * Represents the address and the current status of a cluster member node. + * + * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`. + */ +class Member(val address: Address, val status: MemberStatus) extends ClusterMessage { + override def hashCode = address.## + override def equals(other: Any) = Member.unapply(this) == Member.unapply(other) + override def toString = "Member(address = %s, status = %s)" format (address, status) + def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status) +} + +/** + * Module with factory and ordering methods for Member instances. + */ +object Member { + + /** + * `Address` ordering type class, sorts addresses by host and port. + */ + implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 + else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0) + else false + } + + /** + * `Member` ordering type class, sorts members by host and port with the exception that + * it puts all members that are in MemberStatus.EXITING last. + */ + implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒ + if (a.status == Exiting && b.status != Exiting) false + else if (a.status != Exiting && b.status == Exiting) true + else addressOrdering.compare(a.address, b.address) < 0 + } + + def apply(address: Address, status: MemberStatus): Member = new Member(address, status) + + def unapply(other: Any) = other match { + case m: Member ⇒ Some(m.address) + case _ ⇒ None + } + + def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { + // group all members by Address => Seq[Member] + val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) + // pick highest MemberStatus + (Set.empty[Member] /: groupedByAddress) { + case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) + } + } + + /** + * Picks the Member with the highest "priority" MemberStatus. + */ + def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match { + case (Removed, _) ⇒ m1 + case (_, Removed) ⇒ m2 + case (Down, _) ⇒ m1 + case (_, Down) ⇒ m2 + case (Exiting, _) ⇒ m1 + case (_, Exiting) ⇒ m2 + case (Leaving, _) ⇒ m1 + case (_, Leaving) ⇒ m2 + case (Up, Joining) ⇒ m2 + case (Joining, Up) ⇒ m1 + case (Joining, Joining) ⇒ m1 + case (Up, Up) ⇒ m1 + } + + // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 + // SortedSet + and ++ operators replaces existing element + // Use these :+ and :++ operators for the Gossip members + implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) + class SortedSetWorkaround(sortedSet: SortedSet[Member]) { + implicit def :+(elem: Member): SortedSet[Member] = { + if (sortedSet.contains(elem)) sortedSet + else sortedSet + elem + } + + implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = + sortedSet ++ (elems.toSet diff sortedSet) + } +} + +/** + * Defines the current status of a cluster member node + * + * Can be one of: Joining, Up, Leaving, Exiting and Down. + */ +sealed trait MemberStatus extends ClusterMessage { + + /** + * Using the same notion for 'unavailable' as 'non-convergence': DOWN + */ + def isUnavailable: Boolean = this == Down +} + +object MemberStatus { + case object Joining extends MemberStatus + case object Up extends MemberStatus + case object Leaving extends MemberStatus + case object Exiting extends MemberStatus + case object Down extends MemberStatus + case object Removed extends MemberStatus +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/RemoteConnectionManager.scala b/akka-cluster/src/main/scala/akka/cluster/RemoteConnectionManager.scala deleted file mode 100644 index 63020367a5..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/RemoteConnectionManager.scala +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.actor._ -import akka.remote._ -import akka.routing._ -import akka.event.Logging - -import scala.collection.immutable.Map -import scala.annotation.tailrec - -import java.util.concurrent.atomic.AtomicReference - -/** - * Remote connection manager, manages remote connections, e.g. RemoteActorRef's. - */ -class RemoteConnectionManager( - system: ActorSystemImpl, - remote: RemoteActorRefProvider, - failureDetector: AccrualFailureDetector, - initialConnections: Map[Address, ActorRef] = Map.empty[Address, ActorRef]) - extends ConnectionManager { - - val log = Logging(system, "RemoteConnectionManager") - - // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. - case class State(version: Long, connections: Map[Address, ActorRef]) - extends VersionedIterable[ActorRef] { - def iterable: Iterable[ActorRef] = connections.values - } - - private val state: AtomicReference[State] = new AtomicReference[State](newState()) - - /** - * This method is using the FailureDetector to filter out connections that are considered not available. - */ - private def filterAvailableConnections(current: State): State = { - val availableConnections = current.connections filter { entry ⇒ failureDetector.isAvailable(entry._1) } - current copy (version = current.version, connections = availableConnections) - } - - private def newState() = State(Long.MinValue, initialConnections) - - def version: Long = state.get.version - - // FIXME should not return State value but a Seq with connections - def connections = filterAvailableConnections(state.get) - - def size: Int = connections.connections.size - - def connectionFor(address: Address): Option[ActorRef] = connections.connections.get(address) - - def isEmpty: Boolean = connections.connections.isEmpty - - def shutdown() { - state.get.iterable foreach (system.stop(_)) // shut down all remote connections - } - - @tailrec - final def failOver(from: Address, to: Address) { - log.debug("Failing over connection from [{}] to [{}]", from, to) - - val oldState = state.get - var changed = false - - val newMap = oldState.connections map { - case (`from`, actorRef) ⇒ - changed = true - //actorRef.stop() - (to, newConnection(to, actorRef.path)) - case other ⇒ other - } - - if (changed) { - //there was a state change, so we are now going to update the state. - val newState = oldState copy (version = oldState.version + 1, connections = newMap) - - //if we are not able to update, the state, we are going to try again. - if (!state.compareAndSet(oldState, newState)) { - failOver(from, to) // recur - } - } - } - - @tailrec - final def remove(faultyConnection: ActorRef) { - - val oldState = state.get() - var changed = false - - var faultyAddress: Address = null - var newConnections = Map.empty[Address, ActorRef] - - oldState.connections.keys foreach { address ⇒ - val actorRef: ActorRef = oldState.connections.get(address).get - if (actorRef ne faultyConnection) { - newConnections = newConnections + ((address, actorRef)) - } else { - faultyAddress = address - changed = true - } - } - - if (changed) { - //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = oldState copy (version = oldState.version + 1, connections = newConnections) - - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) { - remove(faultyConnection) // recur - } else { - log.debug("Removing connection [{}]", faultyAddress) - } - } - } - - @tailrec - final def putIfAbsent(address: Address, newConnectionFactory: () ⇒ ActorRef): ActorRef = { - - val oldState = state.get() - val oldConnections = oldState.connections - - oldConnections.get(address) match { - case Some(connection) ⇒ connection // we already had the connection, return it - case None ⇒ // we need to create it - val newConnection = newConnectionFactory() - val newConnections = oldConnections + (address -> newConnection) - - //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = oldState copy (version = oldState.version + 1, connections = newConnections) - - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) { - // we failed, need compensating action - system.stop(newConnection) // stop the new connection actor and try again - putIfAbsent(address, newConnectionFactory) // recur - } else { - // we succeeded - log.debug("Adding connection [{}]", address) - newConnection // return new connection actor - } - } - } - - private[cluster] def newConnection(remoteAddress: Address, actorPath: ActorPath) = - new RemoteActorRef(remote, remote.transport, actorPath, Nobody) -}