Separate Cluster to several files, see #2311

* Introduced ClusterEnvironment trait to make it easier to test
  the actors without using the extension
* Incorparate more feedback from review
This commit is contained in:
Patrik Nordwall 2012-07-05 13:55:08 +02:00
parent 20a1e67575
commit 01ee8e8fbf
6 changed files with 1442 additions and 1494 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,926 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.Props
import akka.actor.RootActorPath
import akka.actor.Status.Failure
import akka.actor.PoisonPill
import akka.actor.Scheduler
import akka.routing.ScatterGatherFirstCompletedRouter
import akka.util.Deadline
import akka.util.Duration
import akka.util.Timeout
import akka.jsr166y.ThreadLocalRandom
import akka.pattern.AskTimeoutException
import akka.pattern.ask
import akka.pattern.pipe
import MemberStatus._
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
*
* FIXME Protobuf all ClusterMessages
*/
trait ClusterMessage extends Serializable
/**
* Cluster commands sent by the USER.
*/
object ClusterUserAction {
/**
* Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver).
*/
case class Join(address: Address) extends ClusterMessage
/**
* Command to leave the cluster.
*/
case class Leave(address: Address) extends ClusterMessage
/**
* Command to mark node as temporary down.
*/
case class Down(address: Address) extends ClusterMessage
}
/**
* INTERNAL API
*/
private[cluster] object InternalClusterAction {
/**
* Command to initiate join another node (represented by 'address').
* Join will be sent to the other node.
*/
case class JoinTo(address: Address) extends ClusterMessage
/**
* Start message of the process to join one of the seed nodes.
* The node sends `InitJoin` to all seed nodes, which replies
* with `InitJoinAck`. The first reply is used others are discarded.
* The node sends `Join` command to the seed node that replied first.
*/
case object JoinSeedNode extends ClusterMessage
/**
* @see JoinSeedNode
*/
case object InitJoin extends ClusterMessage
/**
* @see JoinSeedNode
*/
case class InitJoinAck(address: Address) extends ClusterMessage
case object GossipTick
case object HeartbeatTick
case object ReapUnreachableTick
case object LeaderActionsTick
case object PublishStateTick
case class SendClusterMessage(to: Address, msg: ClusterMessage)
case class SendGossipTo(address: Address)
case object GetClusterCoreRef
case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage
case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage
}
/**
* INTERNAL API.
*
* Cluster commands sent by the LEADER.
*/
private[cluster] object ClusterLeaderAction {
/**
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
*/
case class Exit(address: Address) extends ClusterMessage
/**
* Command to remove a node from the cluster immediately.
*/
case class Remove(address: Address) extends ClusterMessage
}
/**
* INTERNAL API
*
* The contextual pieces that ClusterDaemon actors need.
* Makes it easier to test the actors without using the Cluster extension.
*/
private[cluster] trait ClusterEnvironment {
private[cluster] def settings: ClusterSettings
private[cluster] def failureDetector: FailureDetector
private[cluster] def selfAddress: Address
private[cluster] def scheduler: Scheduler
private[cluster] def seedNodes: IndexedSeq[Address]
private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit
private[cluster] def publishLatestGossip(gossip: Gossip): Unit
private[cluster] def publishLatestStats(stats: ClusterStats): Unit
private[cluster] def shutdown(): Unit
}
/**
* INTERNAL API.
*
* Supervisor managing the different Cluster daemons.
*/
private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
val configuredDispatcher = environment.settings.UseDispatcher
val core = context.actorOf(Props(new ClusterCoreDaemon(environment)).
withDispatcher(configuredDispatcher), name = "core")
val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)).
withDispatcher(configuredDispatcher), name = "heartbeat")
def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core
}
}
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
// FIXME break up the cluster constructor parameter into something that is easier to test without Cluster
import ClusterLeaderAction._
import InternalClusterAction._
import ClusterHeartbeatSender._
def selfAddress = environment.selfAddress
def clusterScheduler = environment.scheduler
def failureDetector = environment.failureDetector
val settings = environment.settings
import settings._
val vclockNode = VectorClock.Node(selfAddress.toString)
val selfHeartbeat = Heartbeat(selfAddress)
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip()
var joinInProgress: Map[Address, Deadline] = Map.empty
var stats = ClusterStats()
val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)).
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)).
withDispatcher(UseDispatcher), name = "coreSender")
// start periodic gossip to random nodes in cluster
val gossipTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
self ! GossipTick
}
// start periodic heartbeat to all nodes in cluster
val heartbeatTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
self ! HeartbeatTick
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
self ! ReapUnreachableTick
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
self ! LeaderActionsTick
}
// start periodic publish of current state
private val publishStateTask: Option[Cancellable] =
if (PublishStateInterval == Duration.Zero) None
else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) {
self ! PublishStateTick
})
override def preStart(): Unit = {
if (AutoJoin) self ! InternalClusterAction.JoinSeedNode
}
override def postStop(): Unit = {
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStateTask foreach { _.cancel() }
}
def receive = {
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStateTick publishState()
case JoinSeedNode joinSeedNode()
case InitJoin initJoin()
case InitJoinAck(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case p: Ping ping(p)
}
def joinSeedNode(): Unit = {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
if (seedRoutees.isEmpty) {
join(selfAddress)
} else {
implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf(
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
routees = seedRoutees, within = within.duration)))
seedRouter ? InitJoin pipeTo self
seedRouter ! PoisonPill
}
}
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodeTimeout(): Unit = join(selfAddress)
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address): Unit = {
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
joinInProgress = Map(address -> (Deadline.now + JoinTimeout))
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
notifyListeners(localGossip)
val command = ClusterUserAction.Join(selfAddress)
coreSender ! SendClusterMessage(address, command)
}
/**
* State transition to JOINING - new node joining.
*/
def joining(node: Address): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localUnreachable = localGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localGossip.overview.isNonDownUnreachable(node)
if (!alreadyMember && !isUnreachable) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
if (rejoiningMember.nonEmpty) failureDetector.remove(node)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) {
failureDetector heartbeat node
gossipTo(node)
}
notifyListeners(localGossip)
}
}
/**
* State transition to LEAVING.
*/
def leaving(address: Address): Unit = {
val localGossip = latestGossip
if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
val newMembers = localGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
notifyListeners(localGossip)
}
}
/**
* State transition to EXITING.
*/
def exiting(address: Address): Unit = {
log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address)
// FIXME implement when we implement hand-off
}
/**
* State transition to REMOVED.
*
* This method is for now only called after the LEADER have sent a Removed message - telling the node
* to shut down himself.
*
* In the future we might change this to allow the USER to send a Removed(address) message telling an
* arbitrary node to be moved direcly from UP -> REMOVED.
*/
def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
val localGossip = latestGossip
// just cleaning up the gossip state
latestGossip = Gossip()
// make sure the final (removed) state is always published
notifyListeners(localGossip)
environment.shutdown()
}
/**
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure.
*/
def downing(address: Address): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set
val downedMember: Option[Member] = localMembers.collectFirst {
case m if m.address == address m.copy(status = Down)
}
val newMembers = downedMember match {
case Some(m)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address)
localMembers - m
case None localMembers
}
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.address == address && member.status != Down) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = Down)
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect {
case m if m.status == Down m.address
}
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfAddress
notifyListeners(localGossip)
}
/**
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution. Trying to simultaneously
* resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves
* conflicts to limit divergence. To avoid overload there is also a configurable rate
* limit of how many conflicts that are handled by second. If the limit is
* exceeded the conflicting gossip messages are dropped and will reappear later.
*/
def receiveGossipMerge(merge: GossipMergeConflict): Unit = {
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
receiveGossip(merge.a.copy(conversation = false))
receiveGossip(merge.b.copy(conversation = false))
// use one-way gossip from leader to reduce load of leader
def sendBack(to: Address): Unit = {
if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to))
oneWayGossipTo(to)
}
sendBack(merge.a.from)
sendBack(merge.b.from)
} else {
log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate)
}
}
/**
* Receive new gossip.
*/
def receiveGossip(envelope: GossipEnvelope): Unit = {
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
// FIXME how should we handle this situation?
log.debug("Received gossip with self as unreachable, from [{}]", from)
} else if (!localGossip.overview.isNonDownUnreachable(from)) {
// leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
val conflict = remoteGossip.version <> localGossip.version
if (conflict && !handleMerge) {
// delegate merge resolution to leader to reduce number of simultaneous resolves,
// which will result in new conflicts
stats = stats.incrementMergeDetectedCount
log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from)
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
coreSender ! SendClusterMessage(
to = localGossip.leader.get,
msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope))
} else {
log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate)
}
} else {
val winningGossip =
if (conflict) {
// conflicting versions, merge, and new version
val mergedGossip = remoteGossip merge localGossip
mergedGossip :+ vclockNode
} else if (remoteGossip.version < localGossip.version) {
// local gossip is newer
localGossip
} else if (!remoteGossip.members.exists(_.address == selfAddress)) {
// FIXME This is a very strange. It can happen when many nodes join at the same time.
// It's not detected as an ordinary version conflict <>
// If we don't handle this situation there will be IllegalArgumentException when marking this as seen
// merge, and new version
val mergedGossip = remoteGossip merge (localGossip :+ Member(selfAddress, Joining))
mergedGossip :+ vclockNode
} else {
// remote gossip is newer
remoteGossip
}
val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress
else joinInProgress --
winningGossip.members.map(_.address) --
winningGossip.overview.unreachable.map(_.address)
latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node
failureDetector.remove(node.address)
}
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (conflict) {
stats = stats.incrementMergeCount
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
}
stats = stats.incrementReceivedGossipCount
notifyListeners(localGossip)
if (envelope.conversation &&
(conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from)
}
}
}
}
def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis
/**
* Initiates a new round of gossip.
*/
def gossip(): Unit = {
stats = stats.copy(mergeConflictCount = 0)
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (!isSingletonCluster && isAvailable) {
val localGossip = latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
val localMembersSize = localMembers.size
val localMemberAddresses = localMembers map { _.address }
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size
// gossip to a random alive member with preference to a member
// with older or newer gossip version
val nodesWithdifferentView = {
val localMemberAddressesSet = localGossip.members map { _.address }
for {
(address, version) localGossip.overview.seen
if localMemberAddressesSet contains address
if version != localGossip.version
} yield address
}
val gossipedToAlive =
if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability)
gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq)
else
gossipToRandomNodeOf(localMemberAddresses)
}
}
/**
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
def leaderActions(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
if (isLeader && isAvailable) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
// Leader actions are as follows:
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
// 9. If failure - retry
// 10. If success - run all the side-effecting processing
val (
newGossip: Gossip,
hasChangedState: Boolean,
upMembers,
exitingMembers,
removedMembers,
unreachableButNotDownedMembers) =
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
// transform the node member ring - filterNot/map/map
val newMembers =
localMembers filterNot { member
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
member.status == MemberStatus.Exiting
} map { member
// ----------------------
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Joining) member copy (status = Up)
else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
else member
}
// ----------------------
// 5. Store away all stuff needed for the side-effecting processing in 10.
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the 'seen' table
val newSeen = localSeen -- removedMembers.map(_.address)
// removing REMOVED nodes from the 'unreachable' set
val newUnreachableMembers = localUnreachableMembers -- removedMembers
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers.map { member
// ----------------------
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
if (member.status == Down) member // no need to DOWN members already DOWN
else member copy (status = Down)
}
// Check for the need to do side-effecting on successful state change
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
// 6. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// 7. Updating the 'seen' table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
if (removedMembers.exists(_.address == selfAddress)) versionedGossip
else versionedGossip seen selfAddress
// ----------------------
// 8. Update the state with the new gossip
// ----------------------
latestGossip = seenVersionedGossip
// ----------------------
// 9. Run all the side-effecting processing
// ----------------------
// log the move of members from joining to up
upMembers foreach { member log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) }
// tell all removed members to remove and shut down themselves
removedMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
coreSender ! SendClusterMessage(
to = address,
msg = ClusterLeaderAction.Remove(address))
}
// tell all exiting members to exit
exitingMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
coreSender ! SendClusterMessage(
to = address,
msg = ClusterLeaderAction.Exit(address)) // FIXME should use ? to await completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
}
notifyListeners(localGossip)
}
}
}
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval
for (address beatTo; if address != selfAddress)
heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline)
}
/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
val overdueJoins = joinInProgress collect {
case (address, deadline) if deadline.isOverdue address
}
if (overdueJoins.nonEmpty) {
joinInProgress = joinInProgress -- overdueJoins
}
}
/**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {
if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available
val localGossip = latestGossip
val localOverview = localGossip.overview
val localMembers = localGossip.members
val localUnreachableMembers = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.address == selfAddress || failureDetector.isAvailable(member.address)
}
if (newlyDetectedUnreachableMembers.nonEmpty) {
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and 'seen' table
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
notifyListeners(localGossip)
}
}
}
def seedNodes: IndexedSeq[Address] = environment.seedNodes
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
def isAvailable: Boolean = latestGossip.isAvailable(selfAddress)
/**
* Gossips latest gossip to a random member in the set of members passed in as argument.
*
* @return the used [[akka.actor.Address] if any
*/
private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
// filter out myself
val peer = selectRandomNode(addresses filterNot (_ == selfAddress))
peer foreach gossipTo
peer
}
/**
* Gossips latest gossip to an address.
*/
def gossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true))
def oneWayGossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress)
coreSender ! SendClusterMessage(address, gossipMsg)
def notifyListeners(oldGossip: Gossip): Unit = {
if (PublishStateInterval == Duration.Zero) publishState()
val oldMembersStatus = oldGossip.members.map(m (m.address, m.status))
val newMembersStatus = latestGossip.members.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
environment notifyMembershipChangeListeners latestGossip.members
}
def publishState(): Unit = {
environment.publishLatestGossip(latestGossip)
environment.publishLatestStats(stats)
}
def ping(p: Ping): Unit = sender ! Pong(p)
}
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging {
import InternalClusterAction._
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
private def clusterCoreConnectionFor(address: Address): ActorRef =
context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
def receive = {
case SendClusterMessage(to, msg)
log.debug("Cluster Node [{}] - Trying to send [{}] to [{}]", selfAddress, msg.getClass.getSimpleName, to)
clusterCoreConnectionFor(to) ! msg
}
}
/**
* INTERNAL API
*/
private[cluster] case class ClusterStats(
receivedGossipCount: Long = 0L,
mergeConflictCount: Long = 0L,
mergeCount: Long = 0L,
mergeDetectedCount: Long = 0L) {
def incrementReceivedGossipCount(): ClusterStats =
copy(receivedGossipCount = receivedGossipCount + 1)
def incrementMergeConflictCount(): ClusterStats =
copy(mergeConflictCount = mergeConflictCount + 1)
def incrementMergeCount(): ClusterStats =
copy(mergeCount = mergeCount + 1)
def incrementMergeDetectedCount(): ClusterStats =
copy(mergeDetectedCount = mergeDetectedCount + 1)
}

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.ReceiveTimeout
import akka.actor.ActorLogging
import java.security.MessageDigest
import akka.pattern.CircuitBreaker
import akka.actor.ActorRef
import akka.pattern.CircuitBreakerOpenException
import akka.actor.Address
import akka.actor.Actor
import akka.actor.RootActorPath
import akka.actor.Props
import akka.util.duration._
import akka.util.Deadline
/**
* Sent at regular intervals for failure detection.
*/
case class Heartbeat(from: Address) extends ClusterMessage
/**
* INTERNAL API.
*
* Receives Heartbeat messages and delegates to Cluster.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages.
*/
private[cluster] final class ClusterHeartbeatDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
def receive = {
case Heartbeat(from) environment.failureDetector heartbeat from
}
}
/**
* INTERNAL API
*/
private[cluster] object ClusterHeartbeatSender {
/**
*
* Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]]
* to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
}
/*
* INTERNAL API
*
* This actor is responsible for sending the heartbeat messages to
* other nodes. Netty blocks when sending to broken connections. This actor
* isolates sending to different nodes by using child workers for each target
* address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes.
*/
private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironment) extends Actor with ActorLogging {
import ClusterHeartbeatSender._
/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
val digester = MessageDigest.getInstance("MD5")
/**
* Child name is MD5 hash of the address.
* FIXME Change to URLEncode when ticket #2123 has been fixed
*/
def encodeChildName(name: String): String = {
digester update name.getBytes("UTF-8")
digester.digest.map { h "%02x".format(0xFF & h) }.mkString
}
def receive = {
case msg @ SendHeartbeat(from, to, deadline)
val workerName = encodeChildName(to.toString)
val worker = context.actorFor(workerName) match {
case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderWorker(
environment.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName)
case child child
}
worker ! msg
}
}
/**
* Responsible for sending [[akka.cluster.Heartbeat]] to one specific address.
*
* Netty blocks when sending to broken connections, and this actor uses
* a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
* @see ClusterHeartbeatSender
*/
private[cluster] final class ClusterHeartbeatSenderWorker(
cbSettings: CircuitBreakerSettings, toRef: ActorRef)
extends Actor with ActorLogging {
import ClusterHeartbeatSender._
val breaker = CircuitBreaker(context.system.scheduler,
cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout).
onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)).
onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)).
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
context.setReceiveTimeout(30 seconds)
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
// the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
toRef ! heartbeatMsg
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
// make sure it will cleanup when not used any more
context.setReceiveTimeout(30 seconds)
}
case ReceiveTimeout context.stop(self) // cleanup when not used
}
}

View file

@ -0,0 +1,212 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.Address
import scala.collection.immutable.SortedSet
import MemberStatus._
object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data -
* all versioned by a vector clock.
*
* When a node is joining the `Member`, with status `Joining`, is added to `members`.
* If the joining node was downed it is moved from `overview.unreachable` (status `Down`)
* to `members` (status `Joining`). It cannot rejoin if not first downed.
*
* When convergence is reached the leader change status of `members` from `Joining`
* to `Up`.
*
* When failure detector consider a node as unavailable it will be moved from
* `members` to `overview.unreachable`.
*
* When a node is downed, either manually or automatically, its status is changed to `Down`.
* It is also removed from `overview.seen` table. The node will reside as `Down` in the
* `overview.unreachable` set until joining again and it will then go through the normal
* joining procedure.
*
* When a `Gossip` is received the version (vector clock) is used to determine if the
* received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip`
* and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without
* same history. When merged the seen table is cleared.
*
* When a node is told by the user to leave the cluster the leader will move it to `Leaving`
* and then rebalance and repartition the cluster and start hand-off by migrating the actors
* from the leaving node to the new partitions. Once this process is complete the leader will
* move the node to the `Exiting` state and once a convergence is complete move the node to
* `Removed` by removing it from the `members` set and sending a `Removed` command to the
* removed node telling it to shut itself down.
*/
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty,
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {
// FIXME can be disabled as optimization
assertInvariants
private def assertInvariants: Unit = {
val unreachableAndLive = members.intersect(overview.unreachable)
if (unreachableAndLive.nonEmpty)
throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]"
format unreachableAndLive.mkString(", "))
val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting)
def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status)
if (members exists hasNotAllowedLiveMemberStatus)
throw new IllegalArgumentException("Live members must have status [%s], got [%s]"
format (allowedLiveMemberStatuses.mkString(", "),
(members filter hasNotAllowedLiveMemberStatus).mkString(", ")))
val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address)
if (seenButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember.mkString(", "))
}
/**
* Increments the version for this 'Node'.
*/
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
/**
* Adds a member to the member node ring.
*/
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members :+ member)
}
/**
* Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this
else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
}
/**
* Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories.
*/
def merge(that: Gossip): Gossip = {
import Member.ordering
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock)
}
/**
* Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence -
* waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down).
*
* @returns Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Boolean = {
val unreachable = overview.unreachable
val seen = overview.seen
// First check that:
// 1. we don't have any members that are unreachable, or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
// and that all members exists in seen table
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
def allMembersInSeen = members.forall(m seen.contains(m.address))
def seenSame: Boolean =
if (seen.isEmpty) false
else {
val values = seen.values
val seenHead = values.head
values.forall(_ == seenHead)
}
!hasUnreachable && allMembersInSeen && seenSame
}
def isLeader(address: Address): Boolean =
members.nonEmpty && (address == members.head.address)
def leader: Option[Address] = members.headOption.map(_.address)
def isSingletonCluster: Boolean = members.size == 1
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable(address: Address): Boolean = !isUnavailable(address)
def isUnavailable(address: Address): Boolean = {
val isUnreachable = overview.unreachable exists { _.address == address }
val hasUnavailableMemberStatus = members exists { m m.status.isUnavailable && m.address == address }
isUnreachable || hasUnavailableMemberStatus
}
def member(address: Address): Member = {
members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)).
getOrElse(Member(address, Removed))
}
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members.mkString(", ") +
"], meta = [" + meta.mkString(", ") +
"], version = " + version +
")"
}
/**
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
*/
case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty,
unreachable: Set[Member] = Set.empty) {
def isNonDownUnreachable(address: Address): Boolean =
unreachable.exists { m m.address == address && m.status != Down }
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
"], unreachable = [" + unreachable.mkString(", ") +
"])"
}
/**
* Envelope adding a sender address to the gossip.
*/
case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage
/**
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution.
*/
case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage

View file

@ -0,0 +1,117 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import scala.collection.GenTraversableOnce
import akka.actor.Address
import MemberStatus._
/**
* Represents the address and the current status of a cluster member node.
*
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
*/
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
override def hashCode = address.##
override def equals(other: Any) = Member.unapply(this) == Member.unapply(other)
override def toString = "Member(address = %s, status = %s)" format (address, status)
def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status)
}
/**
* Module with factory and ordering methods for Member instances.
*/
object Member {
/**
* `Address` ordering type class, sorts addresses by host and port.
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
else false
}
/**
* `Member` ordering type class, sorts members by host and port with the exception that
* it puts all members that are in MemberStatus.EXITING last.
*/
implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b)
if (a.status == Exiting && b.status != Exiting) false
else if (a.status != Exiting && b.status == Exiting) true
else addressOrdering.compare(a.address, b.address) < 0
}
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
def unapply(other: Any) = other match {
case m: Member Some(m.address)
case _ None
}
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf)
}
}
/**
* Picks the Member with the highest "priority" MemberStatus.
*/
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
case (Removed, _) m1
case (_, Removed) m2
case (Down, _) m1
case (_, Down) m2
case (Exiting, _) m1
case (_, Exiting) m2
case (Leaving, _) m1
case (_, Leaving) m2
case (Up, Joining) m2
case (Joining, Up) m1
case (Joining, Joining) m1
case (Up, Up) m1
}
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
// SortedSet + and ++ operators replaces existing element
// Use these :+ and :++ operators for the Gossip members
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
implicit def :+(elem: Member): SortedSet[Member] = {
if (sortedSet.contains(elem)) sortedSet
else sortedSet + elem
}
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
sortedSet ++ (elems.toSet diff sortedSet)
}
}
/**
* Defines the current status of a cluster member node
*
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
*/
def isUnavailable: Boolean = this == Down
}
object MemberStatus {
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
case object Removed extends MemberStatus
}

View file

@ -1,150 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor._
import akka.remote._
import akka.routing._
import akka.event.Logging
import scala.collection.immutable.Map
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
/**
* Remote connection manager, manages remote connections, e.g. RemoteActorRef's.
*/
class RemoteConnectionManager(
system: ActorSystemImpl,
remote: RemoteActorRefProvider,
failureDetector: AccrualFailureDetector,
initialConnections: Map[Address, ActorRef] = Map.empty[Address, ActorRef])
extends ConnectionManager {
val log = Logging(system, "RemoteConnectionManager")
// FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc.
case class State(version: Long, connections: Map[Address, ActorRef])
extends VersionedIterable[ActorRef] {
def iterable: Iterable[ActorRef] = connections.values
}
private val state: AtomicReference[State] = new AtomicReference[State](newState())
/**
* This method is using the FailureDetector to filter out connections that are considered not available.
*/
private def filterAvailableConnections(current: State): State = {
val availableConnections = current.connections filter { entry failureDetector.isAvailable(entry._1) }
current copy (version = current.version, connections = availableConnections)
}
private def newState() = State(Long.MinValue, initialConnections)
def version: Long = state.get.version
// FIXME should not return State value but a Seq with connections
def connections = filterAvailableConnections(state.get)
def size: Int = connections.connections.size
def connectionFor(address: Address): Option[ActorRef] = connections.connections.get(address)
def isEmpty: Boolean = connections.connections.isEmpty
def shutdown() {
state.get.iterable foreach (system.stop(_)) // shut down all remote connections
}
@tailrec
final def failOver(from: Address, to: Address) {
log.debug("Failing over connection from [{}] to [{}]", from, to)
val oldState = state.get
var changed = false
val newMap = oldState.connections map {
case (`from`, actorRef)
changed = true
//actorRef.stop()
(to, newConnection(to, actorRef.path))
case other other
}
if (changed) {
//there was a state change, so we are now going to update the state.
val newState = oldState copy (version = oldState.version + 1, connections = newMap)
//if we are not able to update, the state, we are going to try again.
if (!state.compareAndSet(oldState, newState)) {
failOver(from, to) // recur
}
}
}
@tailrec
final def remove(faultyConnection: ActorRef) {
val oldState = state.get()
var changed = false
var faultyAddress: Address = null
var newConnections = Map.empty[Address, ActorRef]
oldState.connections.keys foreach { address
val actorRef: ActorRef = oldState.connections.get(address).get
if (actorRef ne faultyConnection) {
newConnections = newConnections + ((address, actorRef))
} else {
faultyAddress = address
changed = true
}
}
if (changed) {
//one or more occurrances of the actorRef were removed, so we need to update the state.
val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) {
remove(faultyConnection) // recur
} else {
log.debug("Removing connection [{}]", faultyAddress)
}
}
}
@tailrec
final def putIfAbsent(address: Address, newConnectionFactory: () ActorRef): ActorRef = {
val oldState = state.get()
val oldConnections = oldState.connections
oldConnections.get(address) match {
case Some(connection) connection // we already had the connection, return it
case None // we need to create it
val newConnection = newConnectionFactory()
val newConnections = oldConnections + (address -> newConnection)
//one or more occurrances of the actorRef were removed, so we need to update the state.
val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) {
// we failed, need compensating action
system.stop(newConnection) // stop the new connection actor and try again
putIfAbsent(address, newConnectionFactory) // recur
} else {
// we succeeded
log.debug("Adding connection [{}]", address)
newConnection // return new connection actor
}
}
}
private[cluster] def newConnection(remoteAddress: Address, actorPath: ActorPath) =
new RemoteActorRef(remote, remote.transport, actorPath, Nobody)
}