diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index f0381063f6..292be74935 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -686,6 +686,11 @@ class ClusterSingletonManager( case Event(HandOverToMe, OldestData(singleton, singletonTerminated)) ⇒ gotoHandingOver(singleton, singletonTerminated, Some(sender())) + case Event(TakeOverFromMe, _) ⇒ + // already oldest, so confirm and continue like that + sender() ! HandOverToMe + stay + case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton ⇒ stay using d.copy(singletonTerminated = true) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 9a14fa3be6..0c41223723 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -1041,7 +1041,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (upNumber == 0) { // It is alright to use same upNumber as already used by a removed member, since the upNumber // is only used for comparing age of current cluster members (Member.isOlderThan) - val youngest = latestGossip.youngestMember + val youngest = membershipState.youngestMember upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber) } else { upNumber += 1 diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 82c777c019..b7c705af55 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -204,11 +204,6 @@ private[cluster] final case class Gossip( def hasMember(node: UniqueAddress): Boolean = membersMap.contains(node) - def youngestMember: Member = { - require(members.nonEmpty, "No youngest when no members") - members.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) - } - def removeAll(nodes: Iterable[UniqueAddress], removalTimestamp: Long): Gossip = { nodes.foldLeft(this)((gossip, node) ⇒ gossip.remove(node, removalTimestamp)) } diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index 1cf99419c4..fd62ea5ccd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -159,12 +159,21 @@ import scala.util.Random def isInSameDc(node: UniqueAddress): Boolean = node == selfUniqueAddress || latestGossip.member(node).dataCenter == selfDc + def membersInSameDc: immutable.SortedSet[Member] = + members.filter(_.dataCenter == selfDc) + def validNodeForGossip(node: UniqueAddress): Boolean = node != selfUniqueAddress && ((isInSameDc(node) && isReachableExcludingDownedObservers(node)) || // if cross DC we need to check pairwise unreachable observation overview.reachability.isReachable(selfUniqueAddress, node)) + def youngestMember: Member = { + val mbrs = membersInSameDc + require(mbrs.nonEmpty, "No youngest when no members") + mbrs.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) + } + } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 68b379ed71..21563754dc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -204,12 +204,12 @@ class GossipSpec extends WordSpec with Matchers { // a2 and e1 is Joining val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability = Reachability.empty.unreachable(a2.uniqueAddress, e1.uniqueAddress))) - g1.youngestMember should ===(b1) + state(g1).youngestMember should ===(b1) val g2 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability = Reachability.empty.unreachable(a2.uniqueAddress, b1.uniqueAddress).unreachable(a2.uniqueAddress, e1.uniqueAddress))) - g2.youngestMember should ===(b1) + state(g2).youngestMember should ===(b1) val g3 = Gossip(members = SortedSet(a2, b1.copyUp(3), e2.copyUp(4))) - g3.youngestMember should ===(e2) + state(g3).youngestMember should ===(e2) } "reach convergence per data center" in { @@ -344,8 +344,7 @@ class GossipSpec extends WordSpec with Matchers { overview = GossipOverview(reachability = Reachability.empty .unreachable(dc1b1.uniqueAddress, dc2d2.uniqueAddress) - .unreachable(dc2d2.uniqueAddress, dc1b1.uniqueAddress) - )) + .unreachable(dc2d2.uniqueAddress, dc1b1.uniqueAddress))) .:+(VectorClock.Node(Gossip.vclockName(dc1b1.uniqueAddress))) .:+(VectorClock.Node(Gossip.vclockName(dc2d2.uniqueAddress))) .remove(dc1b1.uniqueAddress, System.currentTimeMillis())