Disallow re-joining, see #2873

* Disallow join requests when already part of a cluster
* Remove wipe state when joining, since join can only be
  performed from empty state
* When trying to join, only accept gossip from that member
* Ignore gossips from unknown (and unreachable) members
* Make sure received gossip contains selfAddress
* Test join of fresh node with same host:port
* Remove JoinTwoClustersSpec
* Welcome message as reply to Join
* Retry unsucessful join request
* AddressUidExtension
* Uid in cluster Member identifier
  To be able to distinguish nodes with same host:port
  after restart.
* Ignore gossip with wrong uid
* Renamed Remove command to Shutdown
* Use uid in vclock identifier
* Update sample, Member apply is private
* Disabled config duration syntax and cleanup of io settings
* Update documentation
This commit is contained in:
Patrik Nordwall 2013-04-11 09:18:12 +02:00
parent cdf717e855
commit 9e56ab6fe5
35 changed files with 795 additions and 546 deletions

View file

@ -23,16 +23,19 @@ import akka.actor.ActorSelection
trait ClusterMessage extends Serializable
/**
* Cluster commands sent by the USER.
* INTERNAL API
* Cluster commands sent by the USER via
* [[akka.cluster.Cluster]] extension
* or JMX.
*/
object ClusterUserAction {
private[cluster] object ClusterUserAction {
/**
* Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver).
* Command to initiate join another node (represented by `address`).
* Join will be sent to the other node.
*/
@SerialVersionUID(1L)
case class Join(address: Address, roles: Set[String]) extends ClusterMessage
case class JoinTo(address: Address)
/**
* Command to leave the cluster.
@ -54,10 +57,18 @@ object ClusterUserAction {
private[cluster] object InternalClusterAction {
/**
* Command to initiate join another node (represented by 'address').
* Join will be sent to the other node.
* Command to join the cluster. Sent when a node wants to join another node (the receiver).
* @param node the node that wants to join the cluster
*/
case class JoinTo(address: Address)
@SerialVersionUID(1L)
case class Join(node: UniqueAddress, roles: Set[String]) extends ClusterMessage
/**
* Reply to Join
* @param from the sender node in the cluster, i.e. the node that received the Join command
*/
@SerialVersionUID(1L)
case class Welcome(from: UniqueAddress, gossip: Gossip) extends ClusterMessage
/**
* Command to initiate the process to join the specified
@ -134,7 +145,6 @@ private[cluster] object InternalClusterAction {
sealed trait PublishMessage
case class PublishChanges(newGossip: Gossip) extends PublishMessage
case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
case object PublishStart extends PublishMessage
}
/**
@ -147,15 +157,17 @@ private[cluster] object ClusterLeaderAction {
/**
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
* @param node the node to exit, i.e. destination of the message
*/
@SerialVersionUID(1L)
case class Exit(address: Address) extends ClusterMessage
case class Exit(node: UniqueAddress) extends ClusterMessage
/**
* Command to remove a node from the cluster immediately.
* @param node the node to shutdown, i.e. destination of the message
*/
@SerialVersionUID(1L)
case class Remove(address: Address) extends ClusterMessage
case class Shutdown(node: UniqueAddress) extends ClusterMessage
}
/**
@ -227,11 +239,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
import InternalClusterAction._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
import cluster.settings._
// FIXME the UUID should not be needed when Address contains uid, ticket #2788
val vclockNode = VectorClock.Node(selfAddress.toString + "-" + UUID.randomUUID())
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
@ -241,8 +253,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
var seedNodeProcess: Option[ActorRef] = None
var tryingToJoinWith: Option[Address] = None
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@ -267,10 +277,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
LeaderActionsInterval, self, LeaderActionsTick)
// start periodic publish of current stats
val publishStatsTask: Option[Cancellable] =
if (PublishStatsInterval == Duration.Zero) None
else Some(scheduler.schedule(PeriodicTasksInitialDelay.max(PublishStatsInterval),
PublishStatsInterval, self, PublishStatsTick))
val publishStatsTask: Option[Cancellable] = PublishStatsInterval match {
case Duration.Zero | Duration.Undefined | Duration.Inf None
case d: FiniteDuration
Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick))
}
override def preStart(): Unit = {
if (AutoJoin) self ! JoinSeedNodes(SeedNodes)
@ -284,28 +295,47 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
def uninitialized: Actor.Receive = {
case InitJoin sender ! InitJoinNack(selfAddress)
case JoinTo(address) join(address)
case JoinSeedNodes(seedNodes) joinSeedNodes(seedNodes)
case InitJoin sender ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address) join(address)
case JoinSeedNodes(seedNodes) joinSeedNodes(seedNodes)
case msg: SubscriptionMessage publisher forward msg
case _: Tick // ignore periodic tasks until initialized
}
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = {
case Welcome(from, gossip) welcome(joinWith, from, gossip)
case InitJoin sender ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address)
context.become(uninitialized)
join(address)
case JoinSeedNodes(seedNodes)
context.become(uninitialized)
joinSeedNodes(seedNodes)
case msg: SubscriptionMessage publisher forward msg
case _: Tick // ignore periodic tasks until initialized
case _: Tick
if (deadline.exists(_.isOverdue)) {
context.become(uninitialized)
if (AutoJoin) joinSeedNodes(SeedNodes)
else join(joinWith)
}
}
def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg)
case GossipTick gossip()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
case InitJoin initJoin()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address, roles) joining(address, roles)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case msg: GossipEnvelope receiveGossip(msg)
case GossipTick gossip()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
case InitJoin initJoin()
case Join(node, roles) joining(node, roles)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(node) exiting(node)
case Shutdown(node) shutdown(node)
case SendGossipTo(address) sendGossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case ClusterUserAction.JoinTo(address)
log.info("Trying to join [{}] when already part of a cluster, ignoring", address)
}
@ -322,7 +352,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress")
seedNodeProcess =
if (seedNodes.isEmpty || seedNodes == immutable.IndexedSeq(selfAddress)) {
self ! JoinTo(selfAddress)
self ! ClusterUserAction.JoinTo(selfAddress)
None
} else if (seedNodes.head == selfAddress) {
Some(context.actorOf(Props(new FirstSeedNodeProcess(seedNodes)).
@ -334,8 +364,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
* Try to join this cluster node with the node specified by `address`.
* It's only allowed to join from an empty state, i.e. when not already a member.
* A `Join(selfUniqueAddress)` command is sent to the node to join,
* which will reply with a `Welcome` message.
*/
def join(address: Address): Unit = {
if (address.protocol != selfAddress.protocol)
@ -344,7 +376,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (address.system != selfAddress.system)
log.warning("Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]",
selfAddress.system, address.system)
else if (!latestGossip.members.exists(_.address == address)) {
else {
require(latestGossip.members.isEmpty, "Join can only be done from empty state")
// to support manual join when joining to seed nodes is stuck (no seed nodes available)
val snd = sender
seedNodeProcess match {
@ -358,61 +392,63 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case None // no seedNodeProcess in progress
}
// only wipe the state if we're not in the process of joining this address
if (tryingToJoinWith.forall(_ != address)) {
tryingToJoinWith = Some(address)
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip.empty
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
// wipe the publisher since we are starting fresh
publisher ! PublishStart
publish(latestGossip)
if (address == selfAddress) {
context.become(initialized)
joining(selfUniqueAddress, cluster.selfRoles)
} else {
val joinDeadline = RetryUnsuccessfulJoinAfter match {
case Duration.Undefined | Duration.Inf None
case d: FiniteDuration Some(Deadline.now + d)
}
context.become(tryingToJoin(address, joinDeadline))
clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles)
}
context.become(initialized)
if (address == selfAddress)
joining(address, cluster.selfRoles)
else
clusterCore(address) ! ClusterUserAction.Join(selfAddress, cluster.selfRoles)
}
}
/**
* State transition to JOINING - new node joining.
* Received `Join` message and replies with `Welcome` message, containing
* current gossip state, including the new joining member.
*/
def joining(node: Address, roles: Set[String]): Unit = {
if (node.protocol != selfAddress.protocol)
def joining(node: UniqueAddress, roles: Set[String]): Unit = {
if (node.address.protocol != selfAddress.protocol)
log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol, node.protocol)
else if (node.system != selfAddress.system)
selfAddress.protocol, node.address.protocol)
else if (node.address.system != selfAddress.system)
log.warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.system, node.system)
selfAddress.system, node.address.system)
else {
val localMembers = latestGossip.members
val localUnreachable = latestGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localUnreachable.exists(_.address == node)
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
val alreadyMember = localMembers.exists(_.address == node.address)
val isUnreachable = localUnreachable.exists(_.address == node.address)
if (!alreadyMember && !isUnreachable) {
if (alreadyMember)
log.info("Existing member [{}] is trying to join, ignoring", node)
else if (isUnreachable)
log.info("Unreachable member [{}] is trying to join, ignoring", node)
else {
// remove the node from the failure detector
failureDetector.remove(node)
failureDetector.remove(node.address)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, Joining, roles) + Member(selfAddress, Joining, cluster.selfRoles)
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node, roles.mkString(", "))
if (node != selfAddress) {
gossipTo(node)
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", "))
if (node != selfUniqueAddress) {
clusterCore(node.address) ! Welcome(selfUniqueAddress, latestGossip)
}
publish(latestGossip)
@ -420,8 +456,27 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
}
/**
* Reply from Join request.
*/
def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = {
require(latestGossip.members.isEmpty, "Join can only be done from empty state")
if (joinWith != from.address)
log.info("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
else {
log.info("Cluster Node [{}] - Welcome from [{}]", selfAddress, from.address)
latestGossip = gossip seen selfUniqueAddress
publish(latestGossip)
if (from != selfUniqueAddress)
oneWayGossipTo(from)
context.become(initialized)
}
}
/**
* State transition to LEAVING.
* The node will eventually be removed by the leader, after hand-off in EXITING, and only after
* removal a new node with same address can join the cluster through the normal joining procedure.
*/
def leaving(address: Address): Unit = {
// only try to update if the node is available (in the member ring)
@ -430,7 +485,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
@ -442,31 +497,29 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
/**
* State transition to EXITING.
*/
def exiting(address: Address): Unit = {
log.info("Cluster Node [{}] - Marked node [{}] as [{}]", selfAddress, address, Exiting)
// FIXME implement when we implement hand-off
}
def exiting(node: UniqueAddress): Unit =
if (node == selfUniqueAddress) {
log.info("Cluster Node [{}] - Marked as [{}]", selfAddress, Exiting)
// TODO implement when we need hand-off
}
/**
* State transition to REMOVED.
*
* This method is for now only called after the LEADER have sent a Removed message - telling the node
* This method is only called after the LEADER has sent a Shutdown message - telling the node
* to shut down himself.
*
* In the future we might change this to allow the USER to send a Removed(address) message telling an
* arbitrary node to be moved directly from UP -> REMOVED.
*/
def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
cluster.shutdown()
}
def shutdown(node: UniqueAddress): Unit =
if (node == selfUniqueAddress) {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
cluster.shutdown()
}
/**
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
* State transition to DOW.
* The node to DOWN is removed from the `members` set and put in the `unreachable` set (if not already there)
* and its status is set to DOWN. The node is also removed from the `seen` table.
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure.
* The node will eventually be removed by the leader, and only after removal a new node with same address can
* join the cluster through the normal joining procedure.
*/
def downing(address: Address): Unit = {
val localGossip = latestGossip
@ -475,7 +528,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set
// 1. check if the node to DOWN is in the `members` set
val downedMember: Option[Member] =
localMembers.collectFirst { case m if m.address == address m.copy(status = Down) }
@ -486,7 +539,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case None localMembers
}
// 2. check if the node to DOWN is in the 'unreachable' set
// 2. check if the node to DOWN is in the `unreachable` set
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
@ -496,17 +549,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
// 3. add the newly DOWNED members from the `members` (in step 1.) to the `newUnreachableMembers` set.
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down m.address }
// 4. remove nodes marked as DOWN from the `seen` table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down m.uniqueAddress }
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfAddress
latestGossip = versionedGossip seen selfUniqueAddress
publish(latestGossip)
}
@ -519,14 +572,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val remoteGossip = envelope.gossip
val localGossip = latestGossip
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
log.debug("Ignoring received gossip with self [{}] as unreachable, from [{}]", selfAddress, from)
} else if (localGossip.overview.isNonDownUnreachable(from)) {
log.debug("Ignoring received gossip from unreachable [{}] ", from)
} else {
// if we're in the remote gossip and not Removed, then we're not joining
if (tryingToJoinWith.nonEmpty && remoteGossip.member(selfAddress).status != Removed)
tryingToJoinWith = None
if (envelope.to != selfUniqueAddress)
log.info("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress))
log.info("Ignoring received gossip with myself as unreachable, from [{}]", selfAddress, from.address)
else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from))
log.info("Ignoring received gossip from unreachable [{}] ", from)
else if (localGossip.members.forall(_.uniqueAddress != from))
log.info("Ignoring received gossip from unknown [{}]", from)
else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress))
log.info("Ignoring received gossip that does not contain myself, from [{}]", from)
else {
val comparison = remoteGossip.version tryCompareTo localGossip.version
val conflict = comparison.isEmpty
@ -537,17 +593,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
(remoteGossip merge localGossip, true, stats.incrementMergeCount)
case Some(0)
// same version
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementSameCount)
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementSameCount)
case Some(x) if x < 0
// local is newer
(localGossip, true, stats.incrementNewerCount)
case _
// remote is newer
(remoteGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementOlderCount)
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementOlderCount)
}
stats = newStats
latestGossip = winningGossip seen selfAddress
latestGossip = winningGossip seen selfUniqueAddress
// for all new joining nodes we remove them from the failure detector
latestGossip.members foreach {
@ -587,31 +643,32 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val preferredGossipTargets =
if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older or newer gossip version
val localMemberAddressesSet = localGossip.members map { _.address }
val localMemberAddressesSet = localGossip.members map { _.uniqueAddress }
val nodesWithDifferentView = for {
(address, version) localGossip.overview.seen
if localMemberAddressesSet contains address
(node, version) localGossip.overview.seen
if localMemberAddressesSet contains node
if version != localGossip.version
} yield address
} yield node
nodesWithDifferentView.toIndexedSeq
} else Vector.empty[Address]
} else Vector.empty[UniqueAddress]
gossipToRandomNodeOf(
if (preferredGossipTargets.nonEmpty) preferredGossipTargets
else localGossip.members.toIndexedSeq.map(_.address) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
else localGossip.members.toIndexedSeq.map(_.uniqueAddress) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
)
}
}
/**
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
* Runs periodic leader actions, such as member status transitions, auto-downing unreachable nodes,
* assigning partitions etc.
*/
def leaderActions(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val isLeader = localGossip.isLeader(selfAddress)
val isLeader = localGossip.isLeader(selfUniqueAddress)
if (isLeader && isAvailable) {
// only run the leader actions if we are the LEADER and available
@ -632,7 +689,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 6. Move DOWN => REMOVED -- When all nodes have seen that the node is DOWN (convergence) - remove the nodes from the node ring and seen table
// 7. Updating the vclock version for the changes
// 8. Updating the 'seen' table
// 8. Updating the `seen` table
// 9. Try to update the state with the new gossip
// 10. If success - run all the side-effecting processing
@ -666,7 +723,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
member copy (status = Exiting)
// Everyone else that is not Exiting stays as they are
case member if member.status != Exiting && member.status != Down member
// Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the 'members' set/node ring and seen table
// Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table
}
// ----------------------
@ -687,8 +744,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the 'seen' table
val newSeen = localSeen -- removedMembers.map(_.address) -- removedUnreachable.map(_.address)
// removing REMOVED nodes from the `seen` table
val newSeen = localSeen -- removedMembers.map(_.uniqueAddress) -- removedUnreachable.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
@ -710,8 +767,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// Check for the need to do side-effecting on successful state change
val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
// removing nodes marked as DOWN from the `seen` table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.uniqueAddress }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
@ -727,12 +784,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// Updating the 'seen' table
// Updating the `seen` table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
if (removedMembers.exists(_.address == selfAddress)) versionedGossip
else versionedGossip seen selfAddress
if (removedMembers.exists(_.uniqueAddress == selfUniqueAddress)) versionedGossip
else versionedGossip seen selfUniqueAddress
// ----------------------
// Update the state with the new gossip
@ -754,7 +811,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}] - and removing node from node ring",
selfAddress, address, member.status, Removed)
clusterCore(address) ! ClusterLeaderAction.Remove(address)
clusterCore(address) ! ClusterLeaderAction.Shutdown(member.uniqueAddress)
}
// tell all exiting members to exit
@ -762,7 +819,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]",
selfAddress, address, member.status, Exiting)
clusterCore(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff?
clusterCore(address) ! ClusterLeaderAction.Exit(member.uniqueAddress) // FIXME should wait for completion of handoff?
}
// log the auto-downing of the unreachable nodes
@ -781,7 +838,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
/**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
* Reaps the unreachable members (moves them to the `unreachable` list in the cluster overview) according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {
if (!isSingletonCluster && isAvailable) {
@ -793,7 +850,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localUnreachableMembers = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.address == selfAddress || failureDetector.isAvailable(member.address)
member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address)
}
if (newlyDetectedUnreachableMembers.nonEmpty) {
@ -804,9 +861,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and 'seen' table
// updating vclock and `seen` table
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
@ -817,39 +874,45 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
}
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] =
if (nodes.isEmpty) None
else Some(nodes(ThreadLocalRandom.current nextInt nodes.size))
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
def isAvailable: Boolean = !latestGossip.isUnreachable(selfAddress)
def isAvailable: Boolean = !latestGossip.isUnreachable(selfUniqueAddress)
/**
* Gossips latest gossip to a random member in the set of members passed in as argument.
*
* @return the used [[akka.actor.Address] if any
* @return the used [[UniqueAddress]] if any
*/
private def gossipToRandomNodeOf(addresses: immutable.IndexedSeq[Address]): Option[Address] = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
private def gossipToRandomNodeOf(nodes: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = {
// filter out myself
val peer = selectRandomNode(addresses filterNot (_ == selfAddress))
val peer = selectRandomNode(nodes filterNot (_ == selfUniqueAddress))
peer foreach gossipTo
peer
}
// needed for tests
def sendGossipTo(address: Address): Unit = {
latestGossip.members.foreach(m
if (m.address == address)
gossipTo(m.uniqueAddress))
}
/**
* Gossips latest gossip to an address.
* Gossips latest gossip to a node.
*/
def gossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true))
def gossipTo(node: UniqueAddress): Unit =
gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = true))
def oneWayGossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
def oneWayGossipTo(node: UniqueAddress): Unit =
gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = false))
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit =
if (address != selfAddress && gossipMsg.gossip.members.exists(_.address == address))
clusterCore(address) ! gossipMsg
def gossipTo(node: UniqueAddress, gossipMsg: GossipEnvelope): Unit =
if (node != selfUniqueAddress && gossipMsg.gossip.members.exists(_.uniqueAddress == node))
clusterCore(node.address) ! gossipMsg
def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip)
@ -874,6 +937,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
*/
private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging {
import InternalClusterAction._
import ClusterUserAction.JoinTo
val cluster = Cluster(context.system)
def selfAddress = cluster.selfAddress
@ -943,6 +1007,7 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
*/
private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging {
import InternalClusterAction._
import ClusterUserAction.JoinTo
def selfAddress = Cluster(context.system).selfAddress
@ -1007,7 +1072,7 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with
}
def isSelfUp(m: Member): Boolean =
m.address == cluster.selfAddress && m.status == MemberStatus.Up
m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up
}