Merge pull request #23596 from akka/wip-23582-up-assignment-patriknw

use right youngest when moving to Up, #23582
This commit is contained in:
Patrik Nordwall 2017-09-04 17:16:31 +02:00 committed by GitHub
commit 6a464c474a
5 changed files with 19 additions and 11 deletions

View file

@ -686,6 +686,11 @@ class ClusterSingletonManager(
case Event(HandOverToMe, OldestData(singleton, singletonTerminated))
gotoHandingOver(singleton, singletonTerminated, Some(sender()))
case Event(TakeOverFromMe, _)
// already oldest, so confirm and continue like that
sender() ! HandOverToMe
stay
case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)

View file

@ -1041,7 +1041,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = latestGossip.youngestMember
val youngest = membershipState.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1

View file

@ -204,11 +204,6 @@ private[cluster] final case class Gossip(
def hasMember(node: UniqueAddress): Boolean = membersMap.contains(node)
def youngestMember: Member = {
require(members.nonEmpty, "No youngest when no members")
members.maxBy(m if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
}
def removeAll(nodes: Iterable[UniqueAddress], removalTimestamp: Long): Gossip = {
nodes.foldLeft(this)((gossip, node) gossip.remove(node, removalTimestamp))
}

View file

@ -159,12 +159,21 @@ import scala.util.Random
def isInSameDc(node: UniqueAddress): Boolean =
node == selfUniqueAddress || latestGossip.member(node).dataCenter == selfDc
def membersInSameDc: immutable.SortedSet[Member] =
members.filter(_.dataCenter == selfDc)
def validNodeForGossip(node: UniqueAddress): Boolean =
node != selfUniqueAddress &&
((isInSameDc(node) && isReachableExcludingDownedObservers(node)) ||
// if cross DC we need to check pairwise unreachable observation
overview.reachability.isReachable(selfUniqueAddress, node))
def youngestMember: Member = {
val mbrs = membersInSameDc
require(mbrs.nonEmpty, "No youngest when no members")
mbrs.maxBy(m if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
}
}
/**

View file

@ -204,12 +204,12 @@ class GossipSpec extends WordSpec with Matchers {
// a2 and e1 is Joining
val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability =
Reachability.empty.unreachable(a2.uniqueAddress, e1.uniqueAddress)))
g1.youngestMember should ===(b1)
state(g1).youngestMember should ===(b1)
val g2 = Gossip(members = SortedSet(a2, b1.copyUp(3), e1), overview = GossipOverview(reachability =
Reachability.empty.unreachable(a2.uniqueAddress, b1.uniqueAddress).unreachable(a2.uniqueAddress, e1.uniqueAddress)))
g2.youngestMember should ===(b1)
state(g2).youngestMember should ===(b1)
val g3 = Gossip(members = SortedSet(a2, b1.copyUp(3), e2.copyUp(4)))
g3.youngestMember should ===(e2)
state(g3).youngestMember should ===(e2)
}
"reach convergence per data center" in {
@ -344,8 +344,7 @@ class GossipSpec extends WordSpec with Matchers {
overview = GossipOverview(reachability =
Reachability.empty
.unreachable(dc1b1.uniqueAddress, dc2d2.uniqueAddress)
.unreachable(dc2d2.uniqueAddress, dc1b1.uniqueAddress)
))
.unreachable(dc2d2.uniqueAddress, dc1b1.uniqueAddress)))
.:+(VectorClock.Node(Gossip.vclockName(dc1b1.uniqueAddress)))
.:+(VectorClock.Node(Gossip.vclockName(dc2d2.uniqueAddress)))
.remove(dc1b1.uniqueAddress, System.currentTimeMillis())