diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5b9cbab9d3..4ebf64c1c7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -388,20 +388,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localUnreachable = latestGossip.overview.unreachable val alreadyMember = localMembers.exists(_.address == node) - val isUnreachable = latestGossip.overview.isNonDownUnreachable(node) + val isUnreachable = localUnreachable.exists(_.address == node) if (!alreadyMember && !isUnreachable) { - // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } - val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers) - // remove the node from the failure detector if it is a DOWN node that is rejoining cluster - if (rejoiningMember.nonEmpty) failureDetector.remove(node) + // remove the node from the failure detector + failureDetector.remove(node) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) - val newGossip = latestGossip copy (overview = newOverview, members = newMembers) + val newGossip = latestGossip copy (members = newMembers) val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress @@ -678,10 +675,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // 3. Non-exiting remain -- When all partition handoff has completed // 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table // 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader - // 6. Updating the vclock version for the changes - // 7. Updating the 'seen' table - // 8. Try to update the state with the new gossip - // 9. If success - run all the side-effecting processing + // 6. Move DOWN => REMOVED -- When all nodes have seen that the node is DOWN (convergence) - remove the nodes from the node ring and seen table + // 7. Updating the vclock version for the changes + // 8. Updating the 'seen' table + // 9. Try to update the state with the new gossip + // 10. If success - run all the side-effecting processing val ( newGossip: Gossip, @@ -699,45 +697,46 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // transform the node member ring val newMembers = localMembers collect { - // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) + // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) // and minimum number of nodes have joined the cluster case member if isJoiningToUp(member) ⇒ member copy (status = Up) - // 2. Move LEAVING => EXITING (once we have a convergence on LEAVING + // Move LEAVING => EXITING (once we have a convergence on LEAVING // *and* if we have a successful partition handoff) case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ member copy (status = Exiting) - // 3. Everyone else that is not Exiting stays as they are - case member if member.status != Exiting ⇒ member - // 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table + // Everyone else that is not Exiting stays as they are + case member if member.status != Exiting && member.status != Down ⇒ member + // Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the 'members' set/node ring and seen table } // ---------------------- - // 5. Store away all stuff needed for the side-effecting processing in 10. + // Store away all stuff needed for the side-effecting processing // ---------------------- // Check for the need to do side-effecting on successful state change - // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED + // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED, DOWN -> REMOVED // to check for state-changes and to store away removed and exiting members for later notification // 1. check for state-changes to update // 2. store away removed and exiting members so we can separate the pure state changes - val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting) + val (removedMembers, newMembers1) = localMembers partition (m ⇒ m.status == Exiting || m.status == Down) + val removedMembers2 = removedMembers ++ localUnreachableMembers.filter(_.status == Down) val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_)) val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) - val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty + val hasChangedState = removedMembers2.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty // removing REMOVED nodes from the 'seen' table - val newSeen = localSeen -- removedMembers.map(_.address) + val newSeen = localSeen -- removedMembers2.map(_.address) // removing REMOVED nodes from the 'unreachable' set - val newUnreachableMembers = localUnreachableMembers -- removedMembers + val newUnreachableMembers = localUnreachableMembers -- removedMembers2 val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip - (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Member.none) + (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers2, Member.none) } else if (AutoDown) { // we don't have convergence - so we might have unreachable nodes @@ -745,7 +744,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // if 'auto-down' is turned on, then try to auto-down any unreachable nodes val newUnreachableMembers = localUnreachableMembers collect { // ---------------------- - // 6. Move UNREACHABLE => DOWN (auto-downing by leader) + // Move UNREACHABLE => DOWN (auto-downing by leader) // ---------------------- case member if member.status != Down ⇒ member copy (status = Down) case downMember ⇒ downMember // no need to DOWN members already DOWN @@ -766,25 +765,25 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (hasChangedState) { // we have a change of state - version it and try to update // ---------------------- - // 6. Updating the vclock version for the changes + // Updating the vclock version for the changes // ---------------------- val versionedGossip = newGossip :+ vclockNode // ---------------------- - // 7. Updating the 'seen' table - // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED + // Updating the 'seen' table + // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED // ---------------------- val seenVersionedGossip = if (removedMembers.exists(_.address == selfAddress)) versionedGossip else versionedGossip seen selfAddress // ---------------------- - // 8. Update the state with the new gossip + // Update the state with the new gossip // ---------------------- latestGossip = seenVersionedGossip // ---------------------- - // 9. Run all the side-effecting processing + // Run all the side-effecting processing // ---------------------- // log the move of members from joining to up diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 809694588f..24adea2c7b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -282,9 +282,9 @@ object ClusterEvent { } val unreachableDownedEvents = unreachableDownMembers map MemberDowned - val removedEvents = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) map { m ⇒ - MemberRemoved(m.copy(status = Removed)) - } + val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++ + (oldGossip.overview.unreachable -- newGossip.overview.unreachable) + val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed))) (new VectorBuilder[MemberEvent]() ++= memberEvents ++= downedEvents ++= unreachableDownedEvents ++= removedEvents).result() @@ -413,9 +413,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestConvergedGossip = newGossip bufferedEvents foreach { event ⇒ event match { - case m: MemberEvent if m.isInstanceOf[MemberDowned] || m.isInstanceOf[MemberRemoved] ⇒ - // TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788 - // but right now we don't change Downed to Removed + case m: MemberEvent if m.isInstanceOf[MemberRemoved] ⇒ publish(event) // notify DeathWatch about downed node publish(AddressTerminated(m.member.address)) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index ea7328f84f..9539bcb025 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -145,12 +145,7 @@ private[cluster] case class Gossip( val mergedVClock = this.version merge that.version // 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - // FIXME allowing Down -> Joining should be adjusted as part of ticket #2788 - val mergedUnreachable = Member.pickHighestPriority( - this.overview.unreachable.filterNot(m1 ⇒ - m1.status == Down && that.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address)), - that.overview.unreachable.filterNot(m1 ⇒ - m1.status == Down && this.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address))) + val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 8a712dba0e..0e9ffdc1e0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -82,8 +82,11 @@ abstract class ClusterDeathWatchSpec enterBarrier("second-terminated") markNodeAsUnavailable(third) + awaitCond(clusterView.members.forall(_.address != address(third))) awaitCond(clusterView.unreachableMembers.exists(_.address == address(third))) cluster.down(third) + // removed + awaitCond(clusterView.unreachableMembers.forall(_.address != address(third))) expectMsg(path3) enterBarrier("third-terminated") @@ -95,8 +98,11 @@ abstract class ClusterDeathWatchSpec enterBarrier("watch-established") runOn(third) { markNodeAsUnavailable(second) + awaitCond(clusterView.members.forall(_.address != address(second))) awaitCond(clusterView.unreachableMembers.exists(_.address == address(second))) cluster.down(second) + // removed + awaitCond(clusterView.unreachableMembers.forall(_.address != address(second))) } enterBarrier("second-terminated") enterBarrier("third-terminated") @@ -131,8 +137,11 @@ abstract class ClusterDeathWatchSpec enterBarrier("hello-deployed") markNodeAsUnavailable(first) + awaitCond(clusterView.members.forall(_.address != address(first))) awaitCond(clusterView.unreachableMembers.exists(_.address == address(first))) cluster.down(first) + // removed + awaitCond(clusterView.unreachableMembers.forall(_.address != address(first))) val t = expectMsgType[Terminated] t.actor must be(hello) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 39adca3ac5..13c6b7fae8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -84,11 +84,13 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig // detect failure markNodeAsUnavailable(leaderAddress) - awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.address == leaderAddress)) + awaitCond(clusterView.unreachableMembers.exists(_.address == leaderAddress)) enterBarrier("after-unavailable" + n) // user marks the shutdown leader as DOWN cluster.down(leaderAddress) + // removed + awaitCond(clusterView.unreachableMembers.forall(_.address != leaderAddress)) enterBarrier("after-down" + n, "completed" + n) case _ if remainingRoles.contains(myself) ⇒ @@ -96,7 +98,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig val leaderAddress = address(leader) enterBarrier("before-shutdown" + n, "after-shutdown" + n) - awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.address == leaderAddress)) + awaitCond(clusterView.unreachableMembers.exists(_.address == leaderAddress)) enterBarrier("after-unavailable" + n) enterBarrier("after-down" + n) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index 15bd8d75e4..c858222b7f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -117,6 +117,7 @@ abstract class MBeanSpec runOn(first, second, third) { awaitUpConvergence(3, canNotBePartOfMemberRing = Set(fourthAddress)) assertMembers(clusterView.members, first, second, third) + awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == "") } enterBarrier("after-5") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 4e103c9a39..86e5450fe7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -108,7 +108,7 @@ abstract class RestartFirstSeedNodeSpec } runOn(seed2, seed3) { awaitUpConvergence(2, canNotBePartOfMemberRing = Set(seedNodes.head)) - awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.status == Down && m.address == seedNodes.head)) + awaitCond(clusterView.unreachableMembers.forall(_.address != seedNodes.head)) } enterBarrier("seed1-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 36b727cc1c..af860058af 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -67,7 +67,7 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) enterBarrier("after-1") } - "detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in { + "detect network partition and mark nodes on other side as unreachable and form new cluster" taggedAs LongRunningTest in within(30 seconds) { val thirdAddress = address(third) enterBarrier("before-split") @@ -86,35 +86,19 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) for (role ← side1) markNodeAsUnavailable(role) } - runOn(side1: _*) { - awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 25 seconds) - } - runOn(side2: _*) { - awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 25 seconds) - } - - enterBarrier("after-2") - } - - "auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in { - runOn(side1: _*) { // auto-down = on - awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address) awaitUpConvergence(side1.size, side2.toSet map address) assertLeader(side1: _*) } runOn(side2: _*) { // auto-down = on - awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address) awaitUpConvergence(side2.size, side1.toSet map address) assertLeader(side2: _*) } - enterBarrier("after-3") + enterBarrier("after-2") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index c7666ba395..686f1c644b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -131,6 +131,8 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod runOn(allBut(victim): _*) { awaitUpConvergence(roles.size - 1, Set(victim)) + // eventually removed + awaitCond(clusterView.unreachableMembers.isEmpty, 15 seconds) } endBarrier diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 339394bb53..2978ba6373 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -79,16 +79,6 @@ class GossipSpec extends WordSpec with MustMatchers { } - "merge by allowing Down -> Joining" in { - val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3))) - val g2 = Gossip(members = SortedSet(a1, b1, e1), overview = GossipOverview(unreachable = Set.empty)) - - val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1, b1, e1)) - merged2.members.toSeq.map(_.status) must be(Seq(Up, Up, Joining)) - merged2.overview.unreachable must be(Set.empty) - } - "start with fresh seen table after merge" in { val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address) val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address)