diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 6bb76ecb7d..8c300838a0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -255,7 +255,7 @@ class ShardRegion( val cluster = Cluster(context.system) // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + val ageOrdering = Member.ageOrdering var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) var regions = Map.empty[ActorRef, Set[ShardId]] @@ -325,7 +325,7 @@ class ShardRegion( def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match { case MemberUp(m) ⇒ if (matchingRole(m)) - changeMembers(membersByAge + m) + changeMembers(membersByAge - m + m) // replace case MemberRemoved(m, _) ⇒ if (m.uniqueAddress == cluster.selfUniqueAddress) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index dfcbe2b702..29e18fbc26 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -226,7 +226,7 @@ object ClusterSingletonManager { val cluster = Cluster(context.system) // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + val ageOrdering = Member.ageOrdering var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) var changes = Vector.empty[AnyRef] @@ -260,7 +260,10 @@ object ClusterSingletonManager { def add(m: Member): Unit = { if (matchingRole(m)) - trackChange { () ⇒ membersByAge += m } + trackChange { () ⇒ + membersByAge -= m // replace + membersByAge += m + } } def remove(m: Member): Unit = { diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 43b396e96b..3ac53d593e 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -139,9 +139,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste val cluster = Cluster(context.system) var singleton: Option[ActorRef] = None // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { - (a, b) ⇒ a.isOlderThan(b) - } + val ageOrdering = Member.ageOrdering var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) var buffer = new java.util.LinkedList[(Any, ActorRef)] @@ -203,8 +201,9 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste */ def add(m: Member): Unit = { if (matchingRole(m)) - trackChange { - () ⇒ membersByAge += m + trackChange { () ⇒ + membersByAge -= m // replace + membersByAge += m } } diff --git a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java index e02e9f24d9..e363757e90 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java @@ -46,77 +46,6 @@ public class ClusterSingletonManagerTest { //#create-singleton-proxy } - static//documentation of how to keep track of the oldest member in user land - //#singleton-proxy - public class ConsumerProxy extends UntypedActor { - - final Cluster cluster = Cluster.get(getContext().system()); - - final Comparator ageComparator = new Comparator() { - public int compare(Member a, Member b) { - if (a.isOlderThan(b)) - return -1; - else if (b.isOlderThan(a)) - return 1; - else - return 0; - } - }; - final SortedSet membersByAge = new TreeSet(ageComparator); - - final String role = "worker"; - - //subscribe to cluster changes - @Override - public void preStart() { - cluster.subscribe(getSelf(), MemberEvent.class); - } - - //re-subscribe when restart - @Override - public void postStop() { - cluster.unsubscribe(getSelf()); - } - - @Override - public void onReceive(Object message) { - if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - List members = new ArrayList(); - for (Member m : state.getMembers()) { - if (m.status().equals(MemberStatus.up()) && m.hasRole(role)) - members.add(m); - } - membersByAge.clear(); - membersByAge.addAll(members); - - } else if (message instanceof MemberUp) { - Member m = ((MemberUp) message).member(); - if (m.hasRole(role)) - membersByAge.add(m); - - } else if (message instanceof MemberRemoved) { - Member m = ((MemberUp) message).member(); - if (m.hasRole(role)) - membersByAge.remove(m); - - } else if (message instanceof MemberEvent) { - // not interesting - - } else if (!membersByAge.isEmpty()) { - currentMaster().tell(message, getSender()); - - } - } - - ActorSelection currentMaster() { - return getContext().actorSelection(membersByAge.first().address() + "/user/singleton/statsService"); - } - - } - - //#singleton-proxy - public static class End { } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7a09543853..4c752c5b12 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -511,7 +511,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with updateLatestGossip(newGossip) logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) - if (node != selfUniqueAddress) + if (node == selfUniqueAddress) { + if (localMembers.isEmpty) + leaderActions() // important for deterministic oldest when bootstrapping + } else sender() ! Welcome(selfUniqueAddress, latestGossip) publish(latestGossip) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index be95f74148..91d51ad420 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -275,7 +275,8 @@ object ClusterEvent { val newMembers = newGossip.members -- oldGossip.members val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress) val changedMembers = membersGroupedByAddress collect { - case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember + case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status || newMember.upNumber != oldMember.upNumber ⇒ + newMember } val memberEvents = (newMembers ++ changedMembers) collect { case m if m.status == WeaklyUp ⇒ MemberWeaklyUp(m) diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 8dc08dde6f..6fb54cbdd7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -47,7 +47,11 @@ class Member private[cluster] ( * cluster. A member that joined after removal of another member may be * considered older than the removed member. */ - def isOlderThan(other: Member): Boolean = upNumber < other.upNumber + def isOlderThan(other: Member): Boolean = + if (upNumber == other.upNumber) + Member.addressOrdering.compare(address, other.address) < 0 + else + upNumber < other.upNumber def copy(status: MemberStatus): Member = { val oldStatus = this.status @@ -123,6 +127,13 @@ object Member { } } + /** + * Sort members by age, i.e. using [[Member#isOlderThan]]. + */ + val ageOrdering: Ordering[Member] = Ordering.fromLessThan[Member] { + (a, b) ⇒ a.isOlderThan(b) + } + def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { // group all members by Address => Seq[Member] val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress) @@ -141,20 +152,25 @@ object Member { /** * Picks the Member with the highest "priority" MemberStatus. */ - def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match { - case (Removed, _) ⇒ m1 - case (_, Removed) ⇒ m2 - case (Down, _) ⇒ m1 - case (_, Down) ⇒ m2 - case (Exiting, _) ⇒ m1 - case (_, Exiting) ⇒ m2 - case (Leaving, _) ⇒ m1 - case (_, Leaving) ⇒ m2 - case (Joining, _) ⇒ m2 - case (_, Joining) ⇒ m1 - case (WeaklyUp, _) ⇒ m2 - case (_, WeaklyUp) ⇒ m1 - case (Up, Up) ⇒ m1 + def highestPriorityOf(m1: Member, m2: Member): Member = { + if (m1.status == m2.status) + // preserve the oldest in case of different upNumber + if (m1.isOlderThan(m2)) m1 else m2 + else (m1.status, m2.status) match { + case (Removed, _) ⇒ m1 + case (_, Removed) ⇒ m2 + case (Down, _) ⇒ m1 + case (_, Down) ⇒ m2 + case (Exiting, _) ⇒ m1 + case (_, Exiting) ⇒ m2 + case (Leaving, _) ⇒ m1 + case (_, Leaving) ⇒ m2 + case (Joining, _) ⇒ m2 + case (_, Joining) ⇒ m1 + case (WeaklyUp, _) ⇒ m2 + case (_, WeaklyUp) ⇒ m1 + case (Up, Up) ⇒ m1 + } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/DeterministicOldestWhenJoiningSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/DeterministicOldestWhenJoiningSpec.scala new file mode 100644 index 0000000000..51b5d1c28e --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/DeterministicOldestWhenJoiningSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import scala.concurrent.duration._ +import akka.actor.Address +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.CurrentClusterState + +object DeterministicOldestWhenJoiningMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val seed3 = role("seed3") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + # not too quick to trigger problematic scenario more often + akka.cluster.leader-actions-interval = 2000 ms + akka.cluster.gossip-interval = 500 ms + """)).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class DeterministicOldestWhenJoiningMultiJvmNode1 extends DeterministicOldestWhenJoiningSpec +class DeterministicOldestWhenJoiningMultiJvmNode2 extends DeterministicOldestWhenJoiningSpec +class DeterministicOldestWhenJoiningMultiJvmNode3 extends DeterministicOldestWhenJoiningSpec + +abstract class DeterministicOldestWhenJoiningSpec + extends MultiNodeSpec(DeterministicOldestWhenJoiningMultiJvmSpec) + with MultiNodeClusterSpec { + + import DeterministicOldestWhenJoiningMultiJvmSpec._ + + // reverse order because that expose the bug in issue #18554 + def seedNodes: immutable.IndexedSeq[Address] = + Vector(address(seed1), address(seed2), address(seed3)).sorted(Member.addressOrdering).reverse + val roleByAddress = Map(address(seed1) -> seed1, address(seed2) -> seed2, address(seed3) -> seed3) + + "Joining a cluster" must { + "result in deterministic oldest node" taggedAs LongRunningTest in { + cluster.subscribe(testActor, classOf[MemberUp]) + expectMsgType[CurrentClusterState] + + runOn(roleByAddress(seedNodes.head)) { + cluster.joinSeedNodes(seedNodes) + } + enterBarrier("first-seed-joined") + + runOn(roleByAddress(seedNodes(1)), roleByAddress(roleByAddress(seedNodes(2)))) { + cluster.joinSeedNodes(seedNodes) + } + + within(10.seconds) { + val ups = List(expectMsgType[MemberUp], expectMsgType[MemberUp], expectMsgType[MemberUp]) + ups.map(_.member).sorted(Member.ageOrdering).head.address should ===(seedNodes.head) + } + + enterBarrier("after-1") + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 6668ab272f..c2d650a608 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -115,8 +115,7 @@ abstract class TransitionSpec runOn(first) { cluster join myself - awaitMemberStatus(myself, Joining) - leaderActions() + // first joining itself will immediately be moved to Up awaitMemberStatus(myself, Up) awaitCond(clusterView.isSingletonCluster) }