diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 571a8eaf68..9f241b684d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -220,25 +220,30 @@ case class Gossip( // 1. merge vector clocks val mergedVClock = this.version merge that.version - // 2. group all members by Address => Seq[Member] - val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address) - - // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups - val mergedMembers = - Gossip.emptyMembers ++ - membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ - acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) - } - - // 4. merge meta-data + // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - // 5. merge gossip overview - val mergedOverview = GossipOverview( - this.overview.seen ++ that.overview.seen, - this.overview.unreachable ++ that.overview.unreachable) + def reduceHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { + // group all members by Address => Seq[Member] + val groupedByAddress = (a ++ b).groupBy(_.address) + // pick highest MemberStatus + (groupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ + acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) + }).toSet + } - Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock) + // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups + val mergedUnreachable = reduceHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + + // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, + // and exclude unreachable + val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq). + filterNot(m ⇒ mergedUnreachable.contains(m)) + + // 5. merge seen (FIXME is this correct?) + val mergedSeen = this.overview.seen ++ that.overview.seen + + Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) } override def toString = @@ -648,11 +653,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members + val localUnreachable = localGossip.overview.unreachable - if (!localMembers.exists(_.address == node)) { + val alreadyMember = localMembers.exists(_.address == node) + val isUnreachable = localUnreachable.exists { m ⇒ + m.address == node && m.status != MemberStatus.Down && m.status != MemberStatus.Removed + } + + if (!alreadyMember && !isUnreachable) { // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node } + val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining @@ -719,8 +730,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } /** - * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there) - * and its status is set to DOWN. The node is alo removed from the 'seen' table. + * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there) + * and its status is set to DOWN. The node is also removed from the 'seen' table. * * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly * to this node and it will then go through the normal JOINING procedure. @@ -735,42 +746,34 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachableMembers = localOverview.unreachable // 1. check if the node to DOWN is in the 'members' set - var downedMember: Option[Member] = None - val newMembers = - localMembers - .map { member ⇒ - if (member.address == address) { - log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address) - val newMember = member copy (status = MemberStatus.Down) - downedMember = Some(newMember) - newMember - } else member - } - .filter(_.status != MemberStatus.Down) + val downedMember: Option[Member] = localMembers.find(_.address == address).map(m ⇒ m.copy(status = MemberStatus.Down)) + val newMembers = downedMember match { + case Some(m) ⇒ + log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) + localMembers - m + case None ⇒ localMembers + } // 2. check if the node to DOWN is in the 'unreachable' set val newUnreachableMembers = - localUnreachableMembers - .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN - .map { member ⇒ - if (member.address == address) { - log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) - member copy (status = MemberStatus.Down) - } else member - } + localUnreachableMembers.map { member ⇒ + // no need to DOWN members already DOWN + if (member.address == address && member.status != MemberStatus.Down) { + log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) + member copy (status = MemberStatus.Down) + } else member + } // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set. - val newUnreachablePlusNewlyDownedMembers = downedMember match { - case Some(member) ⇒ newUnreachableMembers + member - case None ⇒ newUnreachableMembers - } + val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember // 4. remove nodes marked as DOWN from the 'seen' table - val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member) ⇒ - currentSeen - member.address + val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { + case m if m.status == MemberStatus.Down ⇒ m.address } - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview + // update gossip overview + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip val versionedGossip = newGossip + vclockNode val newState = localState copy (latestGossip = versionedGossip seen selfAddress) @@ -831,36 +834,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private def autoJoin(): Unit = nodeToJoin foreach join - /** - * Switches the member status. - * - * @param newStatus the new member status - * @param oldState the state to change the member status in - * @return the updated new state with the new member status - */ - private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { - log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus) - - val localSelf = self - - val localGossip = state.latestGossip - val localMembers = localGossip.members - - // change my state into a "new" self - val newSelf = localSelf copy (status = newStatus) - - // change my state in 'gossip.members' - val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member } - - val newGossip = localGossip copy (members = newMembers) - - // version my changes - val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress - - state copy (latestGossip = seenVersionedGossip) - } - /** * INTERNAL API * @@ -985,8 +958,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable - val newMembers = localMembers diff newlyDetectedUnreachableMembers - val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers + val newMembers = localMembers -- newlyDetectedUnreachableMembers + val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers val newOverview = localOverview copy (unreachable = newUnreachableMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) @@ -1090,16 +1063,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 4. Move UNREACHABLE => DOWN (auto-downing by leader) // ---------------------- val newUnreachableMembers = - localUnreachableMembers - .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN - .map { member ⇒ + localUnreachableMembers.map { member ⇒ + // no need to DOWN members already DOWN + if (member.status == MemberStatus.Down) member + else { log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Down) } + } // removing nodes marked as DOWN from the 'seen' table - val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) ⇒ currentSeen - member.address) + val newSeen = localSeen -- newUnreachableMembers.collect { + case m if m.status == MemberStatus.Down ⇒ m.address + } val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview localGossip copy (overview = newOverview) // update gossip diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 77cd0c52ba..985b6d5a89 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -12,18 +12,20 @@ import scala.collection.immutable.SortedSet @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class GossipSpec extends WordSpec with MustMatchers { + import MemberStatus._ + + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val a2 = Member(Address("akka", "sys", "a", 2552), Joining) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val b2 = Member(Address("akka", "sys", "b", 2552), Removed) + val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) + val d2 = Member(Address("akka", "sys", "d", 2552), Removed) + "A Gossip" must { "merge members by status priority" in { - import MemberStatus._ - val a1 = Member(Address("akka", "sys", "a", 2552), Up) - val a2 = Member(Address("akka", "sys", "a", 2552), Joining) - val b1 = Member(Address("akka", "sys", "b", 2552), Up) - val b2 = Member(Address("akka", "sys", "b", 2552), Removed) - val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) - val c2 = Member(Address("akka", "sys", "c", 2552), Up) - val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) - val d2 = Member(Address("akka", "sys", "d", 2552), Removed) val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)) val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)) @@ -38,5 +40,38 @@ class GossipSpec extends WordSpec with MustMatchers { } + "merge unreachable by status priority" in { + + val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a1, b1, c1, d1))) + val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a2, b2, c2, d2))) + + val merged1 = g1 merge g2 + merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + + val merged2 = g2 merge g1 + merged2.overview.unreachable must be(Set(a1, b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + + } + + "merge by excluding unreachable from members" in { + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = SortedSet(c1, d1))) + val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = SortedSet(b2, d2))) + + val merged1 = g1 merge g2 + merged1.members must be(SortedSet(a1)) + merged1.members.toSeq.map(_.status) must be(Seq(Up)) + merged1.overview.unreachable must be(Set(b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) + + val merged2 = g2 merge g1 + merged2.members must be(SortedSet(a1)) + merged2.members.toSeq.map(_.status) must be(Seq(Up)) + merged2.overview.unreachable must be(Set(b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) + + } + } }