Merge pull request #1432 from akka/wip-3309-exiting-patriknw

Hardening of cluster member leaving path, see #3309
This commit is contained in:
Patrik Nordwall 2013-05-17 04:55:32 -07:00
commit c1cbbea1f3
20 changed files with 338 additions and 340 deletions

View file

@ -54,20 +54,6 @@ message Welcome {
* Sends an Address
*/
/****************************************
* Cluster Leader Action Messages
****************************************/
/**
* Exit
* Sends a UniqueAddress
*/
/**
* Shutdown
* Sends a UniqueAddress
*/
/****************************************
* Cluster Heartbeat Messages

View file

@ -148,29 +148,6 @@ private[cluster] object InternalClusterAction {
case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
}
/**
* INTERNAL API.
*
* Cluster commands sent by the LEADER.
*/
private[cluster] object ClusterLeaderAction {
/**
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
* @param node the node to exit, i.e. destination of the message
*/
@SerialVersionUID(1L)
case class Exit(node: UniqueAddress) extends ClusterMessage
/**
* Command to remove a node from the cluster immediately.
* @param node the node to shutdown, i.e. destination of the message
*/
@SerialVersionUID(1L)
case class Shutdown(node: UniqueAddress) extends ClusterMessage
}
/**
* INTERNAL API.
*
@ -239,13 +216,14 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
*/
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import ClusterLeaderAction._
import InternalClusterAction._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
import cluster.settings._
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
@ -263,7 +241,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
private def clusterCore(address: Address): ActorSelection =
context.actorSelection(RootActorPath(address) / "system" / "cluster" / "core" / "daemon")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
import context.dispatcher
@ -334,8 +312,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case Join(node, roles) joining(node, roles)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(node) exiting(node)
case Shutdown(node) shutdown(node)
case SendGossipTo(address) sendGossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case ClusterUserAction.JoinTo(address)
@ -451,10 +427,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
updateLatestGossip(newGossip)
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", "))
if (node != selfUniqueAddress) {
@ -494,10 +467,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newMembers = latestGossip.members map { m if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
updateLatestGossip(newGossip)
log.info("Cluster Node [{}] - Marked address [{}] as [{}]", selfAddress, address, Leaving)
publish(latestGossip)
@ -505,23 +475,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
/**
* State transition to EXITING.
* This method is called when a member sees itself as Exiting.
*/
def exiting(node: UniqueAddress): Unit =
if (node == selfUniqueAddress) {
log.info("Cluster Node [{}] - Marked as [{}]", selfAddress, Exiting)
// TODO implement when we need hand-off
}
/**
* This method is only called after the LEADER has sent a Shutdown message - telling the node
* to shut down himself.
*/
def shutdown(node: UniqueAddress): Unit =
if (node == selfUniqueAddress) {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
cluster.shutdown()
}
def shutdown(): Unit = {
log.info("Cluster Node [{}] - Node shutting down...", latestGossip.member(selfUniqueAddress))
cluster.shutdown()
}
/**
* State transition to DOW.
@ -568,8 +527,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfUniqueAddress
updateLatestGossip(newGossip)
publish(latestGossip)
}
@ -645,7 +603,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
stats = stats.incrementReceivedGossipCount
publish(latestGossip)
if (talkback) {
if (latestGossip.member(selfUniqueAddress).status == Exiting)
shutdown()
else if (talkback) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from, sender)
@ -698,189 +658,154 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* Runs periodic leader actions, such as member status transitions, auto-downing unreachable nodes,
* assigning partitions etc.
*/
def leaderActions(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val isLeader = localGossip.isLeader(selfUniqueAddress)
if (isLeader && isAvailable) {
def leaderActions(): Unit =
if (latestGossip.isLeader(selfUniqueAddress) && isAvailable) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
if (AutoDown)
leaderAutoDownActions()
if (latestGossip.convergence)
leaderActionsOnConvergence()
}
/**
* Leader actions are as follows:
* 1. Move JOINING => UP -- When a node joins the cluster
* 2. Move LEAVING => EXITING -- When all partition handoff has completed
* 3. Non-exiting remain -- When all partition handoff has completed
* 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) -
* remove the node from the node ring and seen table
* 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) -
* remove the node from the node ring and seen table
* 7. Updating the vclock version for the changes
* 8. Updating the `seen` table
* 9. Update the state with the new gossip
*/
def leaderActionsOnConvergence(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
def enoughMembers: Boolean = {
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
case (role, threshold) localMembers.count(_.hasRole(role)) >= threshold
}
}
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
val (removedUnreachable, newUnreachable) = localUnreachableMembers partition { m
Gossip.removeUnreachableWithMemberStatus(m.status)
}
val changedMembers = localMembers collect {
var upNumber = 0
{
case m if isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = localGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully
// Move LEAVING => EXITING (once we have a convergence on LEAVING
// *and* if we have a successful partition handoff)
m copy (status = Exiting)
}
}
if (removedUnreachable.nonEmpty || changedMembers.nonEmpty) {
// handle changes
// replace changed members
val newMembers = localMembers -- changedMembers ++ changedMembers
// removing REMOVED nodes from the `seen` table
val newSeen = localSeen -- removedUnreachable.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
updateLatestGossip(newGossip)
// log status changes
changedMembers foreach { m
log.info("Cluster Node [{}] - Leader is moving node [{}] to [{}]",
selfAddress, m.address, m.status)
}
// Leader actions are as follows:
// 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move LEAVING => EXITING -- When all partition handoff has completed
// 3. Non-exiting remain -- When all partition handoff has completed
// 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 6. Move DOWN => REMOVED -- When all nodes have seen that the node is DOWN (convergence) - remove the nodes from the node ring and seen table
// 7. Updating the vclock version for the changes
// 8. Updating the `seen` table
// 9. Try to update the state with the new gossip
// 10. If success - run all the side-effecting processing
val (
newGossip: Gossip,
hasChangedState: Boolean,
upMembers,
exitingMembers,
removedMembers,
removedUnreachableMembers,
unreachableButNotDownedMembers) =
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
def enoughMembers: Boolean = {
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
case (role, threshold) localMembers.count(_.hasRole(role)) >= threshold
}
}
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
// transform the node member ring
val newMembers = localMembers collect {
var upNumber = 0
{
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
case member if isJoiningToUp(member)
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = localGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
member.copyUp(upNumber)
// Move LEAVING => EXITING (once we have a convergence on LEAVING
// *and* if we have a successful partition handoff)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully
member copy (status = Exiting)
// Everyone else that is not Exiting stays as they are
case member if member.status != Exiting && member.status != Down member
// Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table
}
}
// ----------------------
// Store away all stuff needed for the side-effecting processing
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED, DOWN -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes
val (removedMembers, newMembers1) = localMembers partition (m m.status == Exiting || m.status == Down)
val (removedUnreachable, newUnreachable) = localUnreachableMembers partition (_.status == Down)
val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_))
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the `seen` table
val newSeen = localSeen -- removedMembers.map(_.uniqueAddress) -- removedUnreachable.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, removedUnreachable, Member.none)
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers collect {
// ----------------------
// Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
case member if member.status != Down member copy (status = Down)
case downMember downMember // no need to DOWN members already DOWN
}
// Check for the need to do side-effecting on successful state change
val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down)
// removing nodes marked as DOWN from the `seen` table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.uniqueAddress }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
} else (localGossip, false, Member.none, Member.none, Member.none, Member.none, Member.none)
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
// Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// Updating the `seen` table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
if (removedMembers.exists(_.uniqueAddress == selfUniqueAddress)) versionedGossip
else versionedGossip seen selfUniqueAddress
// ----------------------
// Update the state with the new gossip
// ----------------------
latestGossip = seenVersionedGossip
// ----------------------
// Run all the side-effecting processing
// ----------------------
// log the move of members from joining to up
upMembers foreach { member
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]",
selfAddress, member.address, member.status, Up)
}
// tell all removed members to remove and shut down themselves
removedMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}] - and removing node from node ring",
selfAddress, address, member.status, Removed)
clusterCore(address) ! ClusterLeaderAction.Shutdown(member.uniqueAddress)
}
// tell all exiting members to exit
exitingMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]",
selfAddress, address, member.status, Exiting)
clusterCore(address) ! ClusterLeaderAction.Exit(member.uniqueAddress) // FIXME should wait for completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down)
}
// log the auto-downing of the unreachable nodes
removedUnreachableMembers foreach { member
log.info("Cluster Node [{}] - Leader is removing unreachable node [{}]", selfAddress, member.address)
}
publish(latestGossip)
// log the removal of the unreachable nodes
removedUnreachable foreach { m
val status = if (m.status == Exiting) "exiting" else "unreachable"
log.info("Cluster Node [{}] - Leader is removing {} node [{}]", selfAddress, status, m.address)
}
publish(latestGossip)
if (latestGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting. Let others know (best effort)
// before shutdown. Otherwise they will not see the Exiting state change
// and there will not be convergence until they have detected this node as
// unreachable and the required downing has finished. They will still need to detect
// unreachable, but Exiting unreachable will be removed without downing, i.e.
// normally the leaving of a leader will be graceful without the need
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.
for (_ 1 to NumberOfGossipsBeforeShutdownWhenLeaderExits) gossip()
shutdown()
}
}
}
/**
* When the node is in the UNREACHABLE set it can be auto-down by leader
*/
def leaderAutoDownActions(): Unit = {
val localGossip = latestGossip
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val changedUnreachableMembers = localUnreachableMembers collect {
case m if !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status) m copy (status = Down)
}
if (changedUnreachableMembers.nonEmpty) {
// handle changes
// replace changed unreachable
val newUnreachableMembers = localUnreachableMembers -- changedUnreachableMembers ++ changedUnreachableMembers
// removing nodes marked as Down/Exiting from the `seen` table
val newSeen = localSeen -- changedUnreachableMembers.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
updateLatestGossip(newGossip)
// log the auto-downing of the unreachable nodes
changedUnreachableMembers foreach { m
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, m.address, m.status)
}
publish(latestGossip)
}
}
@ -897,7 +822,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localUnreachableMembers = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.uniqueAddress == selfUniqueAddress || member.status == Exiting || failureDetector.isAvailable(member.address)
member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address)
}
if (newlyDetectedUnreachableMembers.nonEmpty) {
@ -908,13 +833,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and `seen` table
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
updateLatestGossip(newGossip)
latestGossip = seenVersionedGossip
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting)
if (nonExiting.nonEmpty)
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", "))
if (exiting.nonEmpty)
log.info("Cluster Node [{}] - Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
selfAddress, exiting.mkString(", "))
publish(latestGossip)
}
@ -958,6 +884,15 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def validNodeForGossip(node: UniqueAddress): Boolean =
(node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node))
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = newGossip :+ vclockNode
// Updating the `seen` table
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
// Update the state with the new gossip
latestGossip = seenVersionedGossip
}
def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()

View file

@ -146,8 +146,6 @@ object ClusterEvent {
case class UnreachableMember(member: Member) extends ClusterDomainEvent
/**
* INTERNAL API
*
* Current snapshot of cluster node metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent {
@ -200,13 +198,6 @@ object ClusterEvent {
val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable
val unreachableGroupedByAddress =
List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.uniqueAddress)
val unreachableDownMembers = unreachableGroupedByAddress collect {
case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status
newMember
}
val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++
(oldGossip.overview.unreachable -- newGossip.overview.unreachable)
val removedEvents = removedMembers.map(m MemberRemoved(m.copy(status = Removed)))

View file

@ -128,17 +128,33 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case UnreachableMember(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case s: CurrentClusterState reset(s)
case MemberExited(m) memberExited(m)
case _: MemberEvent // not interested in other types of MemberEvent
case HeartbeatRequest(from) addHeartbeatRequest(from)
case SendHeartbeatRequest(to) sendHeartbeatRequest(to)
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
}
def reset(snapshot: CurrentClusterState): Unit = state = state.reset(snapshot.members.map(_.address))
def reset(snapshot: CurrentClusterState): Unit =
state = state.reset(snapshot.members.map(_.address)(collection.breakOut))
def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address
def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address
def removeMember(m: Member): Unit = {
if (m.uniqueAddress == cluster.selfUniqueAddress)
// This cluster node will be shutdown, but stop this actor immediately
// to prevent it from sending out anything more and avoid sending EndHeartbeat.
context stop self
else
state = state removeMember m.address
}
def memberExited(m: Member): Unit =
if (m.uniqueAddress == cluster.selfUniqueAddress) {
// This cluster node will be shutdown, but stop this actor immediately
// to prevent it from sending out anything more and avoid sending EndHeartbeat.
context stop self
}
def addHeartbeatRequest(address: Address): Unit =
if (address != selfAddress) state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive)
@ -253,7 +269,7 @@ private[cluster] case class ClusterHeartbeatSenderState private (
val active: Set[Address] = current ++ heartbeatRequest.keySet
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = {
def reset(nodes: immutable.HashSet[Address]): ClusterHeartbeatSenderState = {
ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeHeartbeatRequest _ }, ring.copy(nodes = nodes + ring.selfAddress))
}

View file

@ -88,6 +88,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case state: CurrentClusterState receiveState(state)
case MemberUp(m) addMember(m)
case MemberRemoved(m) removeMember(m)
case MemberExited(m) removeMember(m)
case UnreachableMember(m) removeMember(m)
case _: MemberEvent // not interested in other types of MemberEvent

View file

@ -18,7 +18,10 @@ private[cluster] object Gossip {
if (members.isEmpty) empty else empty.copy(members = members)
private val leaderMemberStatus = Set[MemberStatus](Up, Leaving)
private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving, Exiting)
private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving)
val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
}
/**
@ -177,12 +180,12 @@ private[cluster] case class Gossip(
def convergence: Boolean = {
// First check that:
// 1. we don't have any members that are unreachable, or
// 2. all unreachable members in the set have status DOWN
// 2. all unreachable members in the set have status DOWN or EXITING
// Else we can't continue to check for convergence
// When that is done we check that all members with a convergence
// status is in the seen table and has the latest vector clock
// version
overview.unreachable.forall(_.status == Down) &&
overview.unreachable.forall(m Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) &&
!members.exists(m Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress))
}

View file

@ -33,7 +33,7 @@ class Member private[cluster] (
case m: Member uniqueAddress == m.uniqueAddress
case _ false
}
override def toString = s"{Member(address = ${address}, status = ${status})"
override def toString = s"Member(address = ${address}, status = ${status})"
def hasRole(role: String): Boolean = roles.contains(role)

View file

@ -38,8 +38,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
InternalClusterAction.InitJoin.getClass -> (_ InternalClusterAction.InitJoin),
classOf[InternalClusterAction.InitJoinAck] -> (bytes InternalClusterAction.InitJoinAck(addressFromBinary(bytes))),
classOf[InternalClusterAction.InitJoinNack] -> (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))),
classOf[ClusterLeaderAction.Exit] -> (bytes ClusterLeaderAction.Exit(uniqueAddressFromBinary(bytes))),
classOf[ClusterLeaderAction.Shutdown] -> (bytes ClusterLeaderAction.Shutdown(uniqueAddressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))),
@ -75,10 +73,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
addressToProto(address).toByteArray
case InternalClusterAction.InitJoinNack(address)
addressToProto(address).toByteArray
case ClusterLeaderAction.Exit(node)
uniqueAddressToProto(node).toByteArray
case ClusterLeaderAction.Shutdown(node)
uniqueAddressToProto(node).toByteArray
case ClusterHeartbeatReceiver.EndHeartbeat(from)
addressToProto(from).toByteArray
case ClusterHeartbeatSender.HeartbeatRequest(from)

View file

@ -50,6 +50,7 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp
}
enterBarrier("first-left")
runOn(second, third, fourth, fifth) {
markNodeAsUnavailable(first)
awaitAssert(clusterView.clusterMetrics.size must be(roles.size - 1))
}
enterBarrier("finished")

View file

@ -18,12 +18,8 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
# turn off unreachable reaper
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)))
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
"akka.cluster.auto-down=on")).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec
@ -37,8 +33,6 @@ abstract class LeaderLeavingSpec
import LeaderLeavingMultiJvmSpec._
import ClusterEvent._
val leaderHandoffWaitingTime = 30.seconds
"A LEADER that is LEAVING" must {
"be moved to LEAVING, then to EXITING, then to REMOVED, then be shut down and then a new LEADER should be elected" taggedAs LongRunningTest in {
@ -47,7 +41,7 @@ abstract class LeaderLeavingSpec
val oldLeaderAddress = clusterView.leader.get
within(leaderHandoffWaitingTime) {
within(30.seconds) {
if (clusterView.isLeader) {
@ -58,6 +52,7 @@ abstract class LeaderLeavingSpec
// verify that the LEADER is shut down
awaitCond(cluster.isTerminated)
enterBarrier("leader-shutdown")
} else {
@ -76,12 +71,12 @@ abstract class LeaderLeavingSpec
enterBarrier("leader-left")
val expectedAddresses = roles.toSet map address
awaitAssert(clusterView.members.map(_.address) must be(expectedAddresses))
// verify that the LEADER is EXITING
exitingLatch.await
enterBarrier("leader-shutdown")
markNodeAsUnavailable(oldLeaderAddress)
// verify that the LEADER is no longer part of the 'members' set
awaitAssert(clusterView.members.map(_.address) must not contain (oldLeaderAddress))

View file

@ -19,14 +19,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)))
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec

View file

@ -15,7 +15,8 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
"akka.cluster.auto-down=on")).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec
@ -28,30 +29,35 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
import NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec._
val reaperWaitingTime = 30.seconds.dilated
"A node that is LEAVING a non-singleton cluster" must {
"eventually set to REMOVED by the reaper, and removed from membership ring and seen table" taggedAs LongRunningTest in {
"eventually set to REMOVED and removed from membership ring and seen table" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
runOn(first) {
cluster.leave(second)
}
enterBarrier("second-left")
within(30.seconds) {
runOn(first) {
cluster.leave(second)
}
enterBarrier("second-left")
runOn(first, third) {
// verify that the 'second' node is no longer part of the 'members' set
awaitAssert(clusterView.members.map(_.address) must not contain (address(second)), reaperWaitingTime)
runOn(first, third) {
enterBarrier("second-shutdown")
markNodeAsUnavailable(second)
// verify that the 'second' node is no longer part of the 'members'/'unreachable' set
awaitAssert {
clusterView.members.map(_.address) must not contain (address(second))
}
awaitAssert {
clusterView.unreachableMembers.map(_.address) must not contain (address(second))
}
}
// verify that the 'second' node is not part of the 'unreachable' set
awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(second)), reaperWaitingTime)
}
runOn(second) {
// verify that the second node is shut down
awaitCond(cluster.isTerminated, reaperWaitingTime)
runOn(second) {
// verify that the second node is shut down
awaitCond(cluster.isTerminated)
enterBarrier("second-shutdown")
}
}
enterBarrier("finished")

View file

@ -18,12 +18,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
# turn off unreachable reaper
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)))
commonConfig(debugConfig(on = false).
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec
@ -63,9 +59,6 @@ abstract class NodeLeavingAndExitingSpec
}
enterBarrier("second-left")
val expectedAddresses = roles.toSet map address
awaitAssert(clusterView.members.map(_.address) must be(expectedAddresses))
// Verify that 'second' node is set to EXITING
exitingLatch.await

View file

@ -76,5 +76,13 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo
enterBarrier("after-3")
}
"leave and shutdown itself when singleton cluster" taggedAs LongRunningTest in {
runOn(first) {
cluster.leave(first)
awaitCond(cluster.isTerminated, 5.seconds)
}
enterBarrier("after-4")
}
}
}

View file

@ -858,7 +858,7 @@ abstract class StressSpec
}
}
def removeOne(shutdown: Boolean): Unit = within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) {
def removeOne(shutdown: Boolean): Unit = within(25.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) {
val currentRoles = roles.take(nbrUsedRoles - 1)
val title = s"${if (shutdown) "shutdown" else "remove"} one from ${nbrUsedRoles} nodes cluster"
createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
@ -903,7 +903,7 @@ abstract class StressSpec
}
def removeSeveral(numberOfNodes: Int, shutdown: Boolean): Unit =
within(10.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) {
within(25.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) {
val currentRoles = roles.take(nbrUsedRoles - numberOfNodes)
val removeRoles = roles.slice(currentRoles.size, nbrUsedRoles)
val title = s"${if (shutdown) "shutdown" else "leave"} ${numberOfNodes} in ${nbrUsedRoles} nodes cluster"

View file

@ -10,6 +10,7 @@ import akka.actor.Address
import akka.routing.ConsistentHash
import scala.concurrent.duration._
import scala.collection.immutable
import scala.collection.immutable.HashSet
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
@ -43,7 +44,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
}
"remove heartbeatRequest after reset" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb))
s.heartbeatRequest must be(Map.empty)
}
@ -53,13 +54,13 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
}
"remove heartbeatRequest after removeMember" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)).removeMember(aa)
s.heartbeatRequest must be(Map.empty)
s.ending must be(Map(aa -> 0))
}
"remove from ending after addHeartbeatRequest" in {
val s = emptyState.reset(Set(aa, bb)).removeMember(aa)
val s = emptyState.reset(HashSet(aa, bb)).removeMember(aa)
s.ending must be(Map(aa -> 0))
val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds)
s2.heartbeatRequest.keySet must be(Set(aa))
@ -67,7 +68,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
}
"include nodes from reset in active set" in {
val nodes = Set(aa, bb, cc)
val nodes = HashSet(aa, bb, cc)
val s = emptyState.reset(nodes)
s.current must be(nodes)
s.ending must be(Map.empty)
@ -81,8 +82,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
s.addMember(ee).current.size must be(3)
}
"move meber to ending set when removing member" in {
val nodes = Set(aa, bb, cc, dd, ee)
"move member to ending set when removing member" in {
val nodes = HashSet(aa, bb, cc, dd, ee)
val s = emptyState.reset(nodes)
s.ending must be(Map.empty)
val included = s.current.head
@ -95,7 +96,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
}
"increase ending count correctly" in {
val s = emptyState.reset(Set(aa)).removeMember(aa)
val s = emptyState.reset(HashSet(aa)).removeMember(aa)
s.ending must be(Map(aa -> 0))
val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa)
s2.ending must be(Map(aa -> 2))

View file

@ -42,8 +42,6 @@ class ClusterMessageSerializerSpec extends AkkaSpec {
checkSerialization(InternalClusterAction.InitJoin)
checkSerialization(InternalClusterAction.InitJoinAck(address))
checkSerialization(InternalClusterAction.InitJoinNack(address))
checkSerialization(ClusterLeaderAction.Exit(uniqueAddress))
checkSerialization(ClusterLeaderAction.Shutdown(uniqueAddress))
checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address))
checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address))
checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address))

View file

@ -122,6 +122,7 @@ object ClusterSingletonManager {
case object WasOldest extends State
case object HandingOver extends State
case object TakeOver extends State
case object End extends State
case object Uninitialized extends Data
case class YoungerData(oldestOption: Option[Address]) extends Data
@ -131,6 +132,7 @@ object ClusterSingletonManager {
case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any],
newOldestOption: Option[Address]) extends Data
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data
case object EndData extends Data
val HandOverRetryTimer = "hand-over-retry"
val TakeOverRetryTimer = "take-over-retry"
@ -399,6 +401,8 @@ class ClusterSingletonManager(
// Previous GetNext request delivered event and new GetNext is to be sent
var oldestChangedReceived = true
var selfExited = false
// keep track of previously removed members
var removed = Map.empty[Address, Deadline]
@ -423,6 +427,7 @@ class ClusterSingletonManager(
require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart
cluster.subscribe(self, classOf[MemberExited])
cluster.subscribe(self, classOf[MemberRemoved])
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
@ -595,7 +600,7 @@ class ClusterSingletonManager(
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, handOverData, _))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if m.address == newOldest
case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest
addRemoved(m.address)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
@ -635,13 +640,28 @@ class ClusterSingletonManager(
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
handOverTo foreach { _ ! HandOverDone(handOverData) }
goto(Younger) using YoungerData(newOldest)
if (selfExited || removed.contains(cluster.selfAddress))
goto(End) using EndData
else
goto(Younger) using YoungerData(newOldest)
}
when(End) {
case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
}
whenUnhandled {
case Event(_: CurrentClusterState, _) stay
case Event(MemberExited(m), _)
if (m.address == cluster.selfAddress) {
selfExited = true
logInfo("Exited [{}]", m.address)
}
stay
case Event(MemberRemoved(m), _)
logInfo("Member removed [{}]", m.address)
if (!selfExited) logInfo("Member removed [{}]", m.address)
addRemoved(m.address)
stay
case Event(TakeOverFromMe, _)
@ -670,9 +690,10 @@ class ClusterSingletonManager(
}
onTransition {
case _ -> Younger if removed.contains(cluster.selfAddress)
case _ -> (Younger | End) if removed.contains(cluster.selfAddress)
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
// note that FSM.stop() can't be used in onTransition
context.stop(self)
}
}

View file

@ -158,6 +158,26 @@ Be aware of that using auto-down implies that two separate clusters will
automatically be formed in case of network partition. That might be
desired by some applications but not by others.
Leaving
^^^^^^^
There are two ways to remove a member from the cluster.
You can just stop the actor system (or the JVM process). It will be detected
as unreachable and removed after the automatic or manual downing as described
above.
A more graceful exit can be performed if you tell the cluster that a node shall leave.
This can be performed using :ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`.
It can also be performed programatically with ``Cluster.get(system).leave(address)``.
Note that this command can be issued to any member in the cluster, not necessarily the
one that is leaving. The cluster extension, but not the actor system or JVM, of the
leaving member will be shutdown after the leader has changed status of the member to
`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled
automatically, but in case of network failures during this process it might still be necessary
to set the nodes status to ``Down`` in order to complete the removal.
.. _cluster_subscriber_java:
Subscribe to Cluster Events
@ -168,7 +188,15 @@ You can subscribe to change notifications of the cluster membership by using
``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first event, followed by events for incremental updates.
There are several types of change events, consult the API documentation
The events to track the life-cycle of members are:
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``.
Note that the node might already have been shutdown when this event is published on another node.
* ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster.
* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector.
There are more types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
for details about the events.

View file

@ -151,6 +151,26 @@ Be aware of that using auto-down implies that two separate clusters will
automatically be formed in case of network partition. That might be
desired by some applications but not by others.
Leaving
^^^^^^^
There are two ways to remove a member from the cluster.
You can just stop the actor system (or the JVM process). It will be detected
as unreachable and removed after the automatic or manual downing as described
above.
A more graceful exit can be performed if you tell the cluster that a node shall leave.
This can be performed using :ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`.
It can also be performed programatically with ``Cluster(system).leave(address)``.
Note that this command can be issued to any member in the cluster, not necessarily the
one that is leaving. The cluster extension, but not the actor system or JVM, of the
leaving member will be shutdown after the leader has changed status of the member to
`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled
automatically, but in case of network failures during this process it might still be necessary
to set the nodes status to ``Down`` in order to complete the removal.
.. _cluster_subscriber_scala:
Subscribe to Cluster Events
@ -161,7 +181,15 @@ You can subscribe to change notifications of the cluster membership by using
``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first event, followed by events for incremental updates.
There are several types of change events, consult the API documentation
The events to track the life-cycle of members are:
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``.
Note that the node might already have been shutdown when this event is published on another node.
* ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster.
* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector.
There are more types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
for details about the events.