=clu #18554 Make oldest assignment deterministic when joining
* the reported issue is fixed by the immediate leaderActions (moving to Up) when joining the first node to itself * the other changes are precautions just in case
This commit is contained in:
parent
1bacae3cac
commit
9380983d3c
9 changed files with 116 additions and 99 deletions
|
|
@ -255,7 +255,7 @@ class ShardRegion(
|
|||
val cluster = Cluster(context.system)
|
||||
|
||||
// sort by age, oldest first
|
||||
val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) }
|
||||
val ageOrdering = Member.ageOrdering
|
||||
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
|
||||
|
||||
var regions = Map.empty[ActorRef, Set[ShardId]]
|
||||
|
|
@ -325,7 +325,7 @@ class ShardRegion(
|
|||
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
|
||||
case MemberUp(m) ⇒
|
||||
if (matchingRole(m))
|
||||
changeMembers(membersByAge + m)
|
||||
changeMembers(membersByAge - m + m) // replace
|
||||
|
||||
case MemberRemoved(m, _) ⇒
|
||||
if (m.uniqueAddress == cluster.selfUniqueAddress)
|
||||
|
|
|
|||
|
|
@ -226,7 +226,7 @@ object ClusterSingletonManager {
|
|||
|
||||
val cluster = Cluster(context.system)
|
||||
// sort by age, oldest first
|
||||
val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) }
|
||||
val ageOrdering = Member.ageOrdering
|
||||
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
|
||||
|
||||
var changes = Vector.empty[AnyRef]
|
||||
|
|
@ -260,7 +260,10 @@ object ClusterSingletonManager {
|
|||
|
||||
def add(m: Member): Unit = {
|
||||
if (matchingRole(m))
|
||||
trackChange { () ⇒ membersByAge += m }
|
||||
trackChange { () ⇒
|
||||
membersByAge -= m // replace
|
||||
membersByAge += m
|
||||
}
|
||||
}
|
||||
|
||||
def remove(m: Member): Unit = {
|
||||
|
|
|
|||
|
|
@ -139,9 +139,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
|
|||
val cluster = Cluster(context.system)
|
||||
var singleton: Option[ActorRef] = None
|
||||
// sort by age, oldest first
|
||||
val ageOrdering = Ordering.fromLessThan[Member] {
|
||||
(a, b) ⇒ a.isOlderThan(b)
|
||||
}
|
||||
val ageOrdering = Member.ageOrdering
|
||||
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
|
||||
|
||||
var buffer = new java.util.LinkedList[(Any, ActorRef)]
|
||||
|
|
@ -203,8 +201,9 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
|
|||
*/
|
||||
def add(m: Member): Unit = {
|
||||
if (matchingRole(m))
|
||||
trackChange {
|
||||
() ⇒ membersByAge += m
|
||||
trackChange { () ⇒
|
||||
membersByAge -= m // replace
|
||||
membersByAge += m
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,77 +46,6 @@ public class ClusterSingletonManagerTest {
|
|||
//#create-singleton-proxy
|
||||
}
|
||||
|
||||
static//documentation of how to keep track of the oldest member in user land
|
||||
//#singleton-proxy
|
||||
public class ConsumerProxy extends UntypedActor {
|
||||
|
||||
final Cluster cluster = Cluster.get(getContext().system());
|
||||
|
||||
final Comparator<Member> ageComparator = new Comparator<Member>() {
|
||||
public int compare(Member a, Member b) {
|
||||
if (a.isOlderThan(b))
|
||||
return -1;
|
||||
else if (b.isOlderThan(a))
|
||||
return 1;
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
final SortedSet<Member> membersByAge = new TreeSet<Member>(ageComparator);
|
||||
|
||||
final String role = "worker";
|
||||
|
||||
//subscribe to cluster changes
|
||||
@Override
|
||||
public void preStart() {
|
||||
cluster.subscribe(getSelf(), MemberEvent.class);
|
||||
}
|
||||
|
||||
//re-subscribe when restart
|
||||
@Override
|
||||
public void postStop() {
|
||||
cluster.unsubscribe(getSelf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof CurrentClusterState) {
|
||||
CurrentClusterState state = (CurrentClusterState) message;
|
||||
List<Member> members = new ArrayList<Member>();
|
||||
for (Member m : state.getMembers()) {
|
||||
if (m.status().equals(MemberStatus.up()) && m.hasRole(role))
|
||||
members.add(m);
|
||||
}
|
||||
membersByAge.clear();
|
||||
membersByAge.addAll(members);
|
||||
|
||||
} else if (message instanceof MemberUp) {
|
||||
Member m = ((MemberUp) message).member();
|
||||
if (m.hasRole(role))
|
||||
membersByAge.add(m);
|
||||
|
||||
} else if (message instanceof MemberRemoved) {
|
||||
Member m = ((MemberUp) message).member();
|
||||
if (m.hasRole(role))
|
||||
membersByAge.remove(m);
|
||||
|
||||
} else if (message instanceof MemberEvent) {
|
||||
// not interesting
|
||||
|
||||
} else if (!membersByAge.isEmpty()) {
|
||||
currentMaster().tell(message, getSender());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
ActorSelection currentMaster() {
|
||||
return getContext().actorSelection(membersByAge.first().address() + "/user/singleton/statsService");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#singleton-proxy
|
||||
|
||||
public static class End {
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -511,7 +511,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
updateLatestGossip(newGossip)
|
||||
|
||||
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress)
|
||||
if (node == selfUniqueAddress) {
|
||||
if (localMembers.isEmpty)
|
||||
leaderActions() // important for deterministic oldest when bootstrapping
|
||||
} else
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
|
||||
publish(latestGossip)
|
||||
|
|
|
|||
|
|
@ -275,7 +275,8 @@ object ClusterEvent {
|
|||
val newMembers = newGossip.members -- oldGossip.members
|
||||
val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress)
|
||||
val changedMembers = membersGroupedByAddress collect {
|
||||
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember
|
||||
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status || newMember.upNumber != oldMember.upNumber ⇒
|
||||
newMember
|
||||
}
|
||||
val memberEvents = (newMembers ++ changedMembers) collect {
|
||||
case m if m.status == WeaklyUp ⇒ MemberWeaklyUp(m)
|
||||
|
|
|
|||
|
|
@ -47,7 +47,11 @@ class Member private[cluster] (
|
|||
* cluster. A member that joined after removal of another member may be
|
||||
* considered older than the removed member.
|
||||
*/
|
||||
def isOlderThan(other: Member): Boolean = upNumber < other.upNumber
|
||||
def isOlderThan(other: Member): Boolean =
|
||||
if (upNumber == other.upNumber)
|
||||
Member.addressOrdering.compare(address, other.address) < 0
|
||||
else
|
||||
upNumber < other.upNumber
|
||||
|
||||
def copy(status: MemberStatus): Member = {
|
||||
val oldStatus = this.status
|
||||
|
|
@ -123,6 +127,13 @@ object Member {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort members by age, i.e. using [[Member#isOlderThan]].
|
||||
*/
|
||||
val ageOrdering: Ordering[Member] = Ordering.fromLessThan[Member] {
|
||||
(a, b) ⇒ a.isOlderThan(b)
|
||||
}
|
||||
|
||||
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
|
||||
// group all members by Address => Seq[Member]
|
||||
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress)
|
||||
|
|
@ -141,20 +152,25 @@ object Member {
|
|||
/**
|
||||
* Picks the Member with the highest "priority" MemberStatus.
|
||||
*/
|
||||
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
|
||||
case (Removed, _) ⇒ m1
|
||||
case (_, Removed) ⇒ m2
|
||||
case (Down, _) ⇒ m1
|
||||
case (_, Down) ⇒ m2
|
||||
case (Exiting, _) ⇒ m1
|
||||
case (_, Exiting) ⇒ m2
|
||||
case (Leaving, _) ⇒ m1
|
||||
case (_, Leaving) ⇒ m2
|
||||
case (Joining, _) ⇒ m2
|
||||
case (_, Joining) ⇒ m1
|
||||
case (WeaklyUp, _) ⇒ m2
|
||||
case (_, WeaklyUp) ⇒ m1
|
||||
case (Up, Up) ⇒ m1
|
||||
def highestPriorityOf(m1: Member, m2: Member): Member = {
|
||||
if (m1.status == m2.status)
|
||||
// preserve the oldest in case of different upNumber
|
||||
if (m1.isOlderThan(m2)) m1 else m2
|
||||
else (m1.status, m2.status) match {
|
||||
case (Removed, _) ⇒ m1
|
||||
case (_, Removed) ⇒ m2
|
||||
case (Down, _) ⇒ m1
|
||||
case (_, Down) ⇒ m2
|
||||
case (Exiting, _) ⇒ m1
|
||||
case (_, Exiting) ⇒ m2
|
||||
case (Leaving, _) ⇒ m1
|
||||
case (_, Leaving) ⇒ m2
|
||||
case (Joining, _) ⇒ m2
|
||||
case (_, Joining) ⇒ m1
|
||||
case (WeaklyUp, _) ⇒ m2
|
||||
case (_, WeaklyUp) ⇒ m1
|
||||
case (Up, Up) ⇒ m1
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.Address
|
||||
import akka.cluster.ClusterEvent.MemberUp
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||
|
||||
object DeterministicOldestWhenJoiningMultiJvmSpec extends MultiNodeConfig {
|
||||
val seed1 = role("seed1")
|
||||
val seed2 = role("seed2")
|
||||
val seed3 = role("seed3")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
||||
# not too quick to trigger problematic scenario more often
|
||||
akka.cluster.leader-actions-interval = 2000 ms
|
||||
akka.cluster.gossip-interval = 500 ms
|
||||
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class DeterministicOldestWhenJoiningMultiJvmNode1 extends DeterministicOldestWhenJoiningSpec
|
||||
class DeterministicOldestWhenJoiningMultiJvmNode2 extends DeterministicOldestWhenJoiningSpec
|
||||
class DeterministicOldestWhenJoiningMultiJvmNode3 extends DeterministicOldestWhenJoiningSpec
|
||||
|
||||
abstract class DeterministicOldestWhenJoiningSpec
|
||||
extends MultiNodeSpec(DeterministicOldestWhenJoiningMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import DeterministicOldestWhenJoiningMultiJvmSpec._
|
||||
|
||||
// reverse order because that expose the bug in issue #18554
|
||||
def seedNodes: immutable.IndexedSeq[Address] =
|
||||
Vector(address(seed1), address(seed2), address(seed3)).sorted(Member.addressOrdering).reverse
|
||||
val roleByAddress = Map(address(seed1) -> seed1, address(seed2) -> seed2, address(seed3) -> seed3)
|
||||
|
||||
"Joining a cluster" must {
|
||||
"result in deterministic oldest node" taggedAs LongRunningTest in {
|
||||
cluster.subscribe(testActor, classOf[MemberUp])
|
||||
expectMsgType[CurrentClusterState]
|
||||
|
||||
runOn(roleByAddress(seedNodes.head)) {
|
||||
cluster.joinSeedNodes(seedNodes)
|
||||
}
|
||||
enterBarrier("first-seed-joined")
|
||||
|
||||
runOn(roleByAddress(seedNodes(1)), roleByAddress(roleByAddress(seedNodes(2)))) {
|
||||
cluster.joinSeedNodes(seedNodes)
|
||||
}
|
||||
|
||||
within(10.seconds) {
|
||||
val ups = List(expectMsgType[MemberUp], expectMsgType[MemberUp], expectMsgType[MemberUp])
|
||||
ups.map(_.member).sorted(Member.ageOrdering).head.address should ===(seedNodes.head)
|
||||
}
|
||||
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -115,8 +115,7 @@ abstract class TransitionSpec
|
|||
|
||||
runOn(first) {
|
||||
cluster join myself
|
||||
awaitMemberStatus(myself, Joining)
|
||||
leaderActions()
|
||||
// first joining itself will immediately be moved to Up
|
||||
awaitMemberStatus(myself, Up)
|
||||
awaitCond(clusterView.isSingletonCluster)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue