diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 1b9340c493..b0a1ffe6e9 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -54,20 +54,6 @@ message Welcome { * Sends an Address */ -/**************************************** - * Cluster Leader Action Messages - ****************************************/ - -/** - * Exit - * Sends a UniqueAddress - */ - -/** - * Shutdown - * Sends a UniqueAddress - */ - /**************************************** * Cluster Heartbeat Messages diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index f1932ec2e4..8b7dd5db66 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -148,29 +148,6 @@ private[cluster] object InternalClusterAction { case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage } -/** - * INTERNAL API. - * - * Cluster commands sent by the LEADER. - */ -private[cluster] object ClusterLeaderAction { - - /** - * Command to mark a node to be removed from the cluster immediately. - * Can only be sent by the leader. - * @param node the node to exit, i.e. destination of the message - */ - @SerialVersionUID(1L) - case class Exit(node: UniqueAddress) extends ClusterMessage - - /** - * Command to remove a node from the cluster immediately. - * @param node the node to shutdown, i.e. destination of the message - */ - @SerialVersionUID(1L) - case class Shutdown(node: UniqueAddress) extends ClusterMessage -} - /** * INTERNAL API. * @@ -239,13 +216,14 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi */ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { - import ClusterLeaderAction._ import InternalClusterAction._ val cluster = Cluster(context.system) import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector } import cluster.settings._ + val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 + def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress)) @@ -263,7 +241,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto private def clusterCore(address: Address): ActorSelection = context.actorSelection(RootActorPath(address) / "system" / "cluster" / "core" / "daemon") - val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. + context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") import context.dispatcher @@ -334,8 +312,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case Join(node, roles) ⇒ joining(node, roles) case ClusterUserAction.Down(address) ⇒ downing(address) case ClusterUserAction.Leave(address) ⇒ leaving(address) - case Exit(node) ⇒ exiting(node) - case Shutdown(node) ⇒ shutdown(node) case SendGossipTo(address) ⇒ sendGossipTo(address) case msg: SubscriptionMessage ⇒ publisher forward msg case ClusterUserAction.JoinTo(address) ⇒ @@ -451,10 +427,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles) val newGossip = latestGossip copy (members = newMembers) - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfUniqueAddress - - latestGossip = seenVersionedGossip + updateLatestGossip(newGossip) log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", ")) if (node != selfUniqueAddress) { @@ -494,10 +467,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newMembers = latestGossip.members map { m ⇒ if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING val newGossip = latestGossip copy (members = newMembers) - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfUniqueAddress - - latestGossip = seenVersionedGossip + updateLatestGossip(newGossip) log.info("Cluster Node [{}] - Marked address [{}] as [{}]", selfAddress, address, Leaving) publish(latestGossip) @@ -505,23 +475,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } /** - * State transition to EXITING. + * This method is called when a member sees itself as Exiting. */ - def exiting(node: UniqueAddress): Unit = - if (node == selfUniqueAddress) { - log.info("Cluster Node [{}] - Marked as [{}]", selfAddress, Exiting) - // TODO implement when we need hand-off - } - - /** - * This method is only called after the LEADER has sent a Shutdown message - telling the node - * to shut down himself. - */ - def shutdown(node: UniqueAddress): Unit = - if (node == selfUniqueAddress) { - log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) - cluster.shutdown() - } + def shutdown(): Unit = { + log.info("Cluster Node [{}] - Node shutting down...", latestGossip.member(selfUniqueAddress)) + cluster.shutdown() + } /** * State transition to DOW. @@ -568,8 +527,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // update gossip overview val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip - val versionedGossip = newGossip :+ vclockNode - latestGossip = versionedGossip seen selfUniqueAddress + updateLatestGossip(newGossip) publish(latestGossip) } @@ -645,7 +603,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto stats = stats.incrementReceivedGossipCount publish(latestGossip) - if (talkback) { + if (latestGossip.member(selfUniqueAddress).status == Exiting) + shutdown() + else if (talkback) { // send back gossip to sender when sender had different view, i.e. merge, or sender had // older or sender had newer gossipTo(from, sender) @@ -698,189 +658,154 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * Runs periodic leader actions, such as member status transitions, auto-downing unreachable nodes, * assigning partitions etc. */ - def leaderActions(): Unit = { - val localGossip = latestGossip - val localMembers = localGossip.members - - val isLeader = localGossip.isLeader(selfUniqueAddress) - - if (isLeader && isAvailable) { + def leaderActions(): Unit = + if (latestGossip.isLeader(selfUniqueAddress) && isAvailable) { // only run the leader actions if we are the LEADER and available - val localOverview = localGossip.overview - val localSeen = localOverview.seen - val localUnreachableMembers = localOverview.unreachable - val hasPartionHandoffCompletedSuccessfully: Boolean = { - // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully - true + if (AutoDown) + leaderAutoDownActions() + + if (latestGossip.convergence) + leaderActionsOnConvergence() + } + + /** + * Leader actions are as follows: + * 1. Move JOINING => UP -- When a node joins the cluster + * 2. Move LEAVING => EXITING -- When all partition handoff has completed + * 3. Non-exiting remain -- When all partition handoff has completed + * 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) - + * remove the node from the node ring and seen table + * 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) - + * remove the node from the node ring and seen table + * 7. Updating the vclock version for the changes + * 8. Updating the `seen` table + * 9. Update the state with the new gossip + */ + def leaderActionsOnConvergence(): Unit = { + val localGossip = latestGossip + val localMembers = localGossip.members + val localOverview = localGossip.overview + val localSeen = localOverview.seen + val localUnreachableMembers = localOverview.unreachable + + val hasPartionHandoffCompletedSuccessfully: Boolean = { + // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully + true + } + + def enoughMembers: Boolean = { + localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { + case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold + } + } + def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers + + val (removedUnreachable, newUnreachable) = localUnreachableMembers partition { m ⇒ + Gossip.removeUnreachableWithMemberStatus(m.status) + } + + val changedMembers = localMembers collect { + var upNumber = 0 + + { + case m if isJoiningToUp(m) ⇒ + // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) + // and minimum number of nodes have joined the cluster + if (upNumber == 0) { + // It is alright to use same upNumber as already used by a removed member, since the upNumber + // is only used for comparing age of current cluster members (Member.isOlderThan) + val youngest = localGossip.youngestMember + upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber) + } else { + upNumber += 1 + } + m.copyUp(upNumber) + + case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ + // Move LEAVING => EXITING (once we have a convergence on LEAVING + // *and* if we have a successful partition handoff) + m copy (status = Exiting) + } + } + + if (removedUnreachable.nonEmpty || changedMembers.nonEmpty) { + // handle changes + + // replace changed members + val newMembers = localMembers -- changedMembers ++ changedMembers + + // removing REMOVED nodes from the `seen` table + val newSeen = localSeen -- removedUnreachable.map(_.uniqueAddress) + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview + val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip + + updateLatestGossip(newGossip) + + // log status changes + changedMembers foreach { m ⇒ + log.info("Cluster Node [{}] - Leader is moving node [{}] to [{}]", + selfAddress, m.address, m.status) } - // Leader actions are as follows: - // 1. Move JOINING => UP -- When a node joins the cluster - // 2. Move LEAVING => EXITING -- When all partition handoff has completed - // 3. Non-exiting remain -- When all partition handoff has completed - // 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table - // 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader - // 6. Move DOWN => REMOVED -- When all nodes have seen that the node is DOWN (convergence) - remove the nodes from the node ring and seen table - // 7. Updating the vclock version for the changes - // 8. Updating the `seen` table - // 9. Try to update the state with the new gossip - // 10. If success - run all the side-effecting processing - - val ( - newGossip: Gossip, - hasChangedState: Boolean, - upMembers, - exitingMembers, - removedMembers, - removedUnreachableMembers, - unreachableButNotDownedMembers) = - - if (localGossip.convergence) { - // we have convergence - so we can't have unreachable nodes - - def enoughMembers: Boolean = { - localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { - case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold - } - } - def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers - - // transform the node member ring - val newMembers = localMembers collect { - var upNumber = 0 - - { - // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) - // and minimum number of nodes have joined the cluster - case member if isJoiningToUp(member) ⇒ - if (upNumber == 0) { - // It is alright to use same upNumber as already used by a removed member, since the upNumber - // is only used for comparing age of current cluster members (Member.isOlderThan) - val youngest = localGossip.youngestMember - upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber) - } else { - upNumber += 1 - } - member.copyUp(upNumber) - // Move LEAVING => EXITING (once we have a convergence on LEAVING - // *and* if we have a successful partition handoff) - case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ - member copy (status = Exiting) - // Everyone else that is not Exiting stays as they are - case member if member.status != Exiting && member.status != Down ⇒ member - // Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table - } - } - - // ---------------------- - // Store away all stuff needed for the side-effecting processing - // ---------------------- - - // Check for the need to do side-effecting on successful state change - // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED, DOWN -> REMOVED - // to check for state-changes and to store away removed and exiting members for later notification - // 1. check for state-changes to update - // 2. store away removed and exiting members so we can separate the pure state changes - val (removedMembers, newMembers1) = localMembers partition (m ⇒ m.status == Exiting || m.status == Down) - val (removedUnreachable, newUnreachable) = localUnreachableMembers partition (_.status == Down) - - val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_)) - - val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) - - val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty - - // removing REMOVED nodes from the `seen` table - val newSeen = localSeen -- removedMembers.map(_.uniqueAddress) -- removedUnreachable.map(_.uniqueAddress) - - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview - val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip - - (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, removedUnreachable, Member.none) - - } else if (AutoDown) { - // we don't have convergence - so we might have unreachable nodes - - // if 'auto-down' is turned on, then try to auto-down any unreachable nodes - val newUnreachableMembers = localUnreachableMembers collect { - // ---------------------- - // Move UNREACHABLE => DOWN (auto-downing by leader) - // ---------------------- - case member if member.status != Down ⇒ member copy (status = Down) - case downMember ⇒ downMember // no need to DOWN members already DOWN - } - - // Check for the need to do side-effecting on successful state change - val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down) - - // removing nodes marked as DOWN from the `seen` table - val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.uniqueAddress } - - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview - val newGossip = localGossip copy (overview = newOverview) // update gossip - - (newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, Member.none, unreachableButNotDownedMembers) - - } else (localGossip, false, Member.none, Member.none, Member.none, Member.none, Member.none) - - if (hasChangedState) { // we have a change of state - version it and try to update - // ---------------------- - // Updating the vclock version for the changes - // ---------------------- - val versionedGossip = newGossip :+ vclockNode - - // ---------------------- - // Updating the `seen` table - // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED - // ---------------------- - val seenVersionedGossip = - if (removedMembers.exists(_.uniqueAddress == selfUniqueAddress)) versionedGossip - else versionedGossip seen selfUniqueAddress - - // ---------------------- - // Update the state with the new gossip - // ---------------------- - latestGossip = seenVersionedGossip - - // ---------------------- - // Run all the side-effecting processing - // ---------------------- - - // log the move of members from joining to up - upMembers foreach { member ⇒ - log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]", - selfAddress, member.address, member.status, Up) - } - - // tell all removed members to remove and shut down themselves - removedMembers foreach { member ⇒ - val address = member.address - log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}] - and removing node from node ring", - selfAddress, address, member.status, Removed) - clusterCore(address) ! ClusterLeaderAction.Shutdown(member.uniqueAddress) - } - - // tell all exiting members to exit - exitingMembers foreach { member ⇒ - val address = member.address - log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]", - selfAddress, address, member.status, Exiting) - clusterCore(address) ! ClusterLeaderAction.Exit(member.uniqueAddress) // FIXME should wait for completion of handoff? - } - - // log the auto-downing of the unreachable nodes - unreachableButNotDownedMembers foreach { member ⇒ - log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down) - } - - // log the auto-downing of the unreachable nodes - removedUnreachableMembers foreach { member ⇒ - log.info("Cluster Node [{}] - Leader is removing unreachable node [{}]", selfAddress, member.address) - } - - publish(latestGossip) + // log the removal of the unreachable nodes + removedUnreachable foreach { m ⇒ + val status = if (m.status == Exiting) "exiting" else "unreachable" + log.info("Cluster Node [{}] - Leader is removing {} node [{}]", selfAddress, status, m.address) } + + publish(latestGossip) + + if (latestGossip.member(selfUniqueAddress).status == Exiting) { + // Leader is moving itself from Leaving to Exiting. Let others know (best effort) + // before shutdown. Otherwise they will not see the Exiting state change + // and there will not be convergence until they have detected this node as + // unreachable and the required downing has finished. They will still need to detect + // unreachable, but Exiting unreachable will be removed without downing, i.e. + // normally the leaving of a leader will be graceful without the need + // for downing. However, if those final gossip messages never arrive it is + // alright to require the downing, because that is probably caused by a + // network failure anyway. + for (_ ← 1 to NumberOfGossipsBeforeShutdownWhenLeaderExits) gossip() + shutdown() + } + + } + } + + /** + * When the node is in the UNREACHABLE set it can be auto-down by leader + */ + def leaderAutoDownActions(): Unit = { + val localGossip = latestGossip + val localOverview = localGossip.overview + val localSeen = localOverview.seen + val localUnreachableMembers = localOverview.unreachable + + val changedUnreachableMembers = localUnreachableMembers collect { + case m if !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status) ⇒ m copy (status = Down) + } + + if (changedUnreachableMembers.nonEmpty) { + // handle changes + + // replace changed unreachable + val newUnreachableMembers = localUnreachableMembers -- changedUnreachableMembers ++ changedUnreachableMembers + + // removing nodes marked as Down/Exiting from the `seen` table + val newSeen = localSeen -- changedUnreachableMembers.map(_.uniqueAddress) + + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview + val newGossip = localGossip copy (overview = newOverview) // update gossip + + updateLatestGossip(newGossip) + + // log the auto-downing of the unreachable nodes + changedUnreachableMembers foreach { m ⇒ + log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, m.address, m.status) + } + + publish(latestGossip) } } @@ -897,7 +822,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localUnreachableMembers = localGossip.overview.unreachable val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ - member.uniqueAddress == selfUniqueAddress || member.status == Exiting || failureDetector.isAvailable(member.address) + member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address) } if (newlyDetectedUnreachableMembers.nonEmpty) { @@ -908,13 +833,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newOverview = localOverview copy (unreachable = newUnreachableMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) - // updating vclock and `seen` table - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfUniqueAddress + updateLatestGossip(newGossip) - latestGossip = seenVersionedGossip - - log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) + val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting) + if (nonExiting.nonEmpty) + log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", ")) + if (exiting.nonEmpty) + log.info("Cluster Node [{}] - Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", + selfAddress, exiting.mkString(", ")) publish(latestGossip) } @@ -958,6 +884,15 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def validNodeForGossip(node: UniqueAddress): Boolean = (node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node)) + def updateLatestGossip(newGossip: Gossip): Unit = { + // Updating the vclock version for the changes + val versionedGossip = newGossip :+ vclockNode + // Updating the `seen` table + val seenVersionedGossip = versionedGossip seen selfUniqueAddress + // Update the state with the new gossip + latestGossip = seenVersionedGossip + } + def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) if (PublishStatsInterval == Duration.Zero) publishInternalStats() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index a48c3a4121..89b0d5f224 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -146,8 +146,6 @@ object ClusterEvent { case class UnreachableMember(member: Member) extends ClusterDomainEvent /** - * INTERNAL API - * * Current snapshot of cluster node metrics. Published to subscribers. */ case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent { @@ -200,13 +198,6 @@ object ClusterEvent { val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable - val unreachableGroupedByAddress = - List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.uniqueAddress) - val unreachableDownMembers = unreachableGroupedByAddress collect { - case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status ⇒ - newMember - } - val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++ (oldGossip.overview.unreachable -- newGossip.overview.unreachable) val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed))) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 16bb7bd4b5..6b7d201065 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -128,17 +128,33 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case UnreachableMember(m) ⇒ removeMember(m) case MemberRemoved(m) ⇒ removeMember(m) case s: CurrentClusterState ⇒ reset(s) + case MemberExited(m) ⇒ memberExited(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent case HeartbeatRequest(from) ⇒ addHeartbeatRequest(from) case SendHeartbeatRequest(to) ⇒ sendHeartbeatRequest(to) case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } - def reset(snapshot: CurrentClusterState): Unit = state = state.reset(snapshot.members.map(_.address)) + def reset(snapshot: CurrentClusterState): Unit = + state = state.reset(snapshot.members.map(_.address)(collection.breakOut)) def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address - def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address + def removeMember(m: Member): Unit = { + if (m.uniqueAddress == cluster.selfUniqueAddress) + // This cluster node will be shutdown, but stop this actor immediately + // to prevent it from sending out anything more and avoid sending EndHeartbeat. + context stop self + else + state = state removeMember m.address + } + + def memberExited(m: Member): Unit = + if (m.uniqueAddress == cluster.selfUniqueAddress) { + // This cluster node will be shutdown, but stop this actor immediately + // to prevent it from sending out anything more and avoid sending EndHeartbeat. + context stop self + } def addHeartbeatRequest(address: Address): Unit = if (address != selfAddress) state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive) @@ -253,7 +269,7 @@ private[cluster] case class ClusterHeartbeatSenderState private ( val active: Set[Address] = current ++ heartbeatRequest.keySet - def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = { + def reset(nodes: immutable.HashSet[Address]): ClusterHeartbeatSenderState = { ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeHeartbeatRequest _ }, ring.copy(nodes = nodes + ring.selfAddress)) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index e45fa0d437..4a714bb42b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -88,6 +88,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto case state: CurrentClusterState ⇒ receiveState(state) case MemberUp(m) ⇒ addMember(m) case MemberRemoved(m) ⇒ removeMember(m) + case MemberExited(m) ⇒ removeMember(m) case UnreachableMember(m) ⇒ removeMember(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index c37b7fd1be..0212988b5d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -18,7 +18,10 @@ private[cluster] object Gossip { if (members.isEmpty) empty else empty.copy(members = members) private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) - private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving, Exiting) + private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) + val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) + val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) + } /** @@ -177,12 +180,12 @@ private[cluster] case class Gossip( def convergence: Boolean = { // First check that: // 1. we don't have any members that are unreachable, or - // 2. all unreachable members in the set have status DOWN + // 2. all unreachable members in the set have status DOWN or EXITING // Else we can't continue to check for convergence // When that is done we check that all members with a convergence // status is in the seen table and has the latest vector clock // version - overview.unreachable.forall(_.status == Down) && + overview.unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) && !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress)) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 75ef83ba05..9d0ded1a34 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -33,7 +33,7 @@ class Member private[cluster] ( case m: Member ⇒ uniqueAddress == m.uniqueAddress case _ ⇒ false } - override def toString = s"{Member(address = ${address}, status = ${status})" + override def toString = s"Member(address = ${address}, status = ${status})" def hasRole(role: String): Boolean = roles.contains(role) diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 5248bb6b4b..2736ebd927 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -38,8 +38,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ InternalClusterAction.InitJoin.getClass -> (_ ⇒ InternalClusterAction.InitJoin), classOf[InternalClusterAction.InitJoinAck] -> (bytes ⇒ InternalClusterAction.InitJoinAck(addressFromBinary(bytes))), classOf[InternalClusterAction.InitJoinNack] -> (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), - classOf[ClusterLeaderAction.Exit] -> (bytes ⇒ ClusterLeaderAction.Exit(uniqueAddressFromBinary(bytes))), - classOf[ClusterLeaderAction.Shutdown] -> (bytes ⇒ ClusterLeaderAction.Shutdown(uniqueAddressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))), @@ -75,10 +73,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ addressToProto(address).toByteArray case InternalClusterAction.InitJoinNack(address) ⇒ addressToProto(address).toByteArray - case ClusterLeaderAction.Exit(node) ⇒ - uniqueAddressToProto(node).toByteArray - case ClusterLeaderAction.Shutdown(node) ⇒ - uniqueAddressToProto(node).toByteArray case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ addressToProto(from).toByteArray case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala index 777165a80b..fa7b8fe89c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala @@ -50,6 +50,7 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp } enterBarrier("first-left") runOn(second, third, fourth, fifth) { + markNodeAsUnavailable(first) awaitAssert(clusterView.clusterMetrics.size must be(roles.size - 1)) } enterBarrier("finished") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index fafbbee921..8fe06de445 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -18,12 +18,8 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - # turn off unreachable reaper - akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( + "akka.cluster.auto-down=on")).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec @@ -37,8 +33,6 @@ abstract class LeaderLeavingSpec import LeaderLeavingMultiJvmSpec._ import ClusterEvent._ - val leaderHandoffWaitingTime = 30.seconds - "A LEADER that is LEAVING" must { "be moved to LEAVING, then to EXITING, then to REMOVED, then be shut down and then a new LEADER should be elected" taggedAs LongRunningTest in { @@ -47,7 +41,7 @@ abstract class LeaderLeavingSpec val oldLeaderAddress = clusterView.leader.get - within(leaderHandoffWaitingTime) { + within(30.seconds) { if (clusterView.isLeader) { @@ -58,6 +52,7 @@ abstract class LeaderLeavingSpec // verify that the LEADER is shut down awaitCond(cluster.isTerminated) + enterBarrier("leader-shutdown") } else { @@ -76,12 +71,12 @@ abstract class LeaderLeavingSpec enterBarrier("leader-left") - val expectedAddresses = roles.toSet map address - awaitAssert(clusterView.members.map(_.address) must be(expectedAddresses)) - // verify that the LEADER is EXITING exitingLatch.await + enterBarrier("leader-shutdown") + markNodeAsUnavailable(oldLeaderAddress) + // verify that the LEADER is no longer part of the 'members' set awaitAssert(clusterView.members.map(_.address) must not contain (oldLeaderAddress)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index b83bcd12a8..f2b101c89d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -19,14 +19,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index d7ce04301e..39eb7e0aeb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -15,7 +15,8 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( + "akka.cluster.auto-down=on")).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec @@ -28,30 +29,35 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec import NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec._ - val reaperWaitingTime = 30.seconds.dilated - "A node that is LEAVING a non-singleton cluster" must { - "eventually set to REMOVED by the reaper, and removed from membership ring and seen table" taggedAs LongRunningTest in { + "eventually set to REMOVED and removed from membership ring and seen table" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) - runOn(first) { - cluster.leave(second) - } - enterBarrier("second-left") + within(30.seconds) { + runOn(first) { + cluster.leave(second) + } + enterBarrier("second-left") - runOn(first, third) { - // verify that the 'second' node is no longer part of the 'members' set - awaitAssert(clusterView.members.map(_.address) must not contain (address(second)), reaperWaitingTime) + runOn(first, third) { + enterBarrier("second-shutdown") + markNodeAsUnavailable(second) + // verify that the 'second' node is no longer part of the 'members'/'unreachable' set + awaitAssert { + clusterView.members.map(_.address) must not contain (address(second)) + } + awaitAssert { + clusterView.unreachableMembers.map(_.address) must not contain (address(second)) + } + } - // verify that the 'second' node is not part of the 'unreachable' set - awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(second)), reaperWaitingTime) - } - - runOn(second) { - // verify that the second node is shut down - awaitCond(cluster.isTerminated, reaperWaitingTime) + runOn(second) { + // verify that the second node is shut down + awaitCond(cluster.isTerminated) + enterBarrier("second-shutdown") + } } enterBarrier("finished") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 72c0eeb089..a2eec0867c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -18,12 +18,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - # turn off unreachable reaper - akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) + commonConfig(debugConfig(on = false). + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec @@ -63,9 +59,6 @@ abstract class NodeLeavingAndExitingSpec } enterBarrier("second-left") - val expectedAddresses = roles.toSet map address - awaitAssert(clusterView.members.map(_.address) must be(expectedAddresses)) - // Verify that 'second' node is set to EXITING exitingLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 81962d8bfc..b49ba60164 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -76,5 +76,13 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo enterBarrier("after-3") } + + "leave and shutdown itself when singleton cluster" taggedAs LongRunningTest in { + runOn(first) { + cluster.leave(first) + awaitCond(cluster.isTerminated, 5.seconds) + } + enterBarrier("after-4") + } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 07804290b9..7e1c14d323 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -858,7 +858,7 @@ abstract class StressSpec } } - def removeOne(shutdown: Boolean): Unit = within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) { + def removeOne(shutdown: Boolean): Unit = within(25.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) { val currentRoles = roles.take(nbrUsedRoles - 1) val title = s"${if (shutdown) "shutdown" else "remove"} one from ${nbrUsedRoles} nodes cluster" createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) @@ -903,7 +903,7 @@ abstract class StressSpec } def removeSeveral(numberOfNodes: Int, shutdown: Boolean): Unit = - within(10.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) { + within(25.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) { val currentRoles = roles.take(nbrUsedRoles - numberOfNodes) val removeRoles = roles.slice(currentRoles.size, nbrUsedRoles) val title = s"${if (shutdown) "shutdown" else "leave"} ${numberOfNodes} in ${nbrUsedRoles} nodes cluster" diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index d3a12d165d..2a61d4577c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -10,6 +10,7 @@ import akka.actor.Address import akka.routing.ConsistentHash import scala.concurrent.duration._ import scala.collection.immutable +import scala.collection.immutable.HashSet @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { @@ -43,7 +44,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { } "remove heartbeatRequest after reset" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)) + val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)) s.heartbeatRequest must be(Map.empty) } @@ -53,13 +54,13 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { } "remove heartbeatRequest after removeMember" in { - val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa) + val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)).removeMember(aa) s.heartbeatRequest must be(Map.empty) s.ending must be(Map(aa -> 0)) } "remove from ending after addHeartbeatRequest" in { - val s = emptyState.reset(Set(aa, bb)).removeMember(aa) + val s = emptyState.reset(HashSet(aa, bb)).removeMember(aa) s.ending must be(Map(aa -> 0)) val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds) s2.heartbeatRequest.keySet must be(Set(aa)) @@ -67,7 +68,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { } "include nodes from reset in active set" in { - val nodes = Set(aa, bb, cc) + val nodes = HashSet(aa, bb, cc) val s = emptyState.reset(nodes) s.current must be(nodes) s.ending must be(Map.empty) @@ -81,8 +82,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { s.addMember(ee).current.size must be(3) } - "move meber to ending set when removing member" in { - val nodes = Set(aa, bb, cc, dd, ee) + "move member to ending set when removing member" in { + val nodes = HashSet(aa, bb, cc, dd, ee) val s = emptyState.reset(nodes) s.ending must be(Map.empty) val included = s.current.head @@ -95,7 +96,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { } "increase ending count correctly" in { - val s = emptyState.reset(Set(aa)).removeMember(aa) + val s = emptyState.reset(HashSet(aa)).removeMember(aa) s.ending must be(Map(aa -> 0)) val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa) s2.ending must be(Map(aa -> 2)) diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index 2787a7f3b7..4aeb11d314 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -42,8 +42,6 @@ class ClusterMessageSerializerSpec extends AkkaSpec { checkSerialization(InternalClusterAction.InitJoin) checkSerialization(InternalClusterAction.InitJoinAck(address)) checkSerialization(InternalClusterAction.InitJoinNack(address)) - checkSerialization(ClusterLeaderAction.Exit(uniqueAddress)) - checkSerialization(ClusterLeaderAction.Shutdown(uniqueAddress)) checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address)) checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address)) checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address)) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index 384c1ffcc2..2a2d7b95b1 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -122,6 +122,7 @@ object ClusterSingletonManager { case object WasOldest extends State case object HandingOver extends State case object TakeOver extends State + case object End extends State case object Uninitialized extends Data case class YoungerData(oldestOption: Option[Address]) extends Data @@ -131,6 +132,7 @@ object ClusterSingletonManager { case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], newOldestOption: Option[Address]) extends Data case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data + case object EndData extends Data val HandOverRetryTimer = "hand-over-retry" val TakeOverRetryTimer = "take-over-retry" @@ -399,6 +401,8 @@ class ClusterSingletonManager( // Previous GetNext request delivered event and new GetNext is to be sent var oldestChangedReceived = true + var selfExited = false + // keep track of previously removed members var removed = Map.empty[Address, Deadline] @@ -423,6 +427,7 @@ class ClusterSingletonManager( require(!cluster.isTerminated, "Cluster node must not be terminated") // subscribe to cluster changes, re-subscribe when restart + cluster.subscribe(self, classOf[MemberExited]) cluster.subscribe(self, classOf[MemberRemoved]) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) @@ -595,7 +600,7 @@ class ClusterSingletonManager( case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if m.address == newOldest ⇒ + case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest ⇒ addRemoved(m.address) gotoHandingOver(singleton, singletonTerminated, handOverData, None) @@ -635,13 +640,28 @@ class ClusterSingletonManager( val newOldest = handOverTo.map(_.path.address) logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest) handOverTo foreach { _ ! HandOverDone(handOverData) } - goto(Younger) using YoungerData(newOldest) + if (selfExited || removed.contains(cluster.selfAddress)) + goto(End) using EndData + else + goto(Younger) using YoungerData(newOldest) + } + + when(End) { + case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒ + logInfo("Self removed, stopping ClusterSingletonManager") + stop() } whenUnhandled { case Event(_: CurrentClusterState, _) ⇒ stay + case Event(MemberExited(m), _) ⇒ + if (m.address == cluster.selfAddress) { + selfExited = true + logInfo("Exited [{}]", m.address) + } + stay case Event(MemberRemoved(m), _) ⇒ - logInfo("Member removed [{}]", m.address) + if (!selfExited) logInfo("Member removed [{}]", m.address) addRemoved(m.address) stay case Event(TakeOverFromMe, _) ⇒ @@ -670,9 +690,10 @@ class ClusterSingletonManager( } onTransition { - case _ -> Younger if removed.contains(cluster.selfAddress) ⇒ + case _ -> (Younger | End) if removed.contains(cluster.selfAddress) ⇒ logInfo("Self removed, stopping ClusterSingletonManager") - stop() + // note that FSM.stop() can't be used in onTransition + context.stop(self) } } \ No newline at end of file diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 14068b062d..952495a6c2 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -158,6 +158,26 @@ Be aware of that using auto-down implies that two separate clusters will automatically be formed in case of network partition. That might be desired by some applications but not by others. +Leaving +^^^^^^^ + +There are two ways to remove a member from the cluster. + +You can just stop the actor system (or the JVM process). It will be detected +as unreachable and removed after the automatic or manual downing as described +above. + +A more graceful exit can be performed if you tell the cluster that a node shall leave. +This can be performed using :ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`. +It can also be performed programatically with ``Cluster.get(system).leave(address)``. + +Note that this command can be issued to any member in the cluster, not necessarily the +one that is leaving. The cluster extension, but not the actor system or JVM, of the +leaving member will be shutdown after the leader has changed status of the member to +`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled +automatically, but in case of network failures during this process it might still be necessary +to set the node’s status to ``Down`` in order to complete the removal. + .. _cluster_subscriber_java: Subscribe to Cluster Events @@ -168,7 +188,15 @@ You can subscribe to change notifications of the cluster membership by using ``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber as the first event, followed by events for incremental updates. -There are several types of change events, consult the API documentation +The events to track the life-cycle of members are: + +* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. +* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``. + Note that the node might already have been shutdown when this event is published on another node. +* ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster. +* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector. + +There are more types of change events, consult the API documentation of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` for details about the events. diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index a1deb575f3..8410066bf0 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -151,6 +151,26 @@ Be aware of that using auto-down implies that two separate clusters will automatically be formed in case of network partition. That might be desired by some applications but not by others. +Leaving +^^^^^^^ + +There are two ways to remove a member from the cluster. + +You can just stop the actor system (or the JVM process). It will be detected +as unreachable and removed after the automatic or manual downing as described +above. + +A more graceful exit can be performed if you tell the cluster that a node shall leave. +This can be performed using :ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`. +It can also be performed programatically with ``Cluster(system).leave(address)``. + +Note that this command can be issued to any member in the cluster, not necessarily the +one that is leaving. The cluster extension, but not the actor system or JVM, of the +leaving member will be shutdown after the leader has changed status of the member to +`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled +automatically, but in case of network failures during this process it might still be necessary +to set the node’s status to ``Down`` in order to complete the removal. + .. _cluster_subscriber_scala: Subscribe to Cluster Events @@ -161,7 +181,15 @@ You can subscribe to change notifications of the cluster membership by using ``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber as the first event, followed by events for incremental updates. -There are several types of change events, consult the API documentation +The events to track the life-cycle of members are: + +* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. +* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``. + Note that the node might already have been shutdown when this event is published on another node. +* ``ClusterEvent.MemberRemoved`` - Member completely removed from the cluster. +* ``ClusterEvent.UnreachableMember`` - A member is considered as unreachable by the failure detector. + +There are more types of change events, consult the API documentation of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` for details about the events.