diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 2e7b6e7151..1b9340c493 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -139,8 +139,9 @@ message GossipOverview { */ message Member { required int32 addressIndex = 1; - required MemberStatus status = 2; - repeated int32 rolesIndexes = 3 [packed = true]; + required int32 upNumber = 2; + required MemberStatus status = 3; + repeated int32 rolesIndexes = 4 [packed = true]; } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index e31a6fae87..f1932ec2e4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -748,16 +748,29 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // transform the node member ring val newMembers = localMembers collect { - // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) - // and minimum number of nodes have joined the cluster - case member if isJoiningToUp(member) ⇒ member copy (status = Up) - // Move LEAVING => EXITING (once we have a convergence on LEAVING - // *and* if we have a successful partition handoff) - case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ - member copy (status = Exiting) - // Everyone else that is not Exiting stays as they are - case member if member.status != Exiting && member.status != Down ⇒ member - // Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table + var upNumber = 0 + + { + // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) + // and minimum number of nodes have joined the cluster + case member if isJoiningToUp(member) ⇒ + if (upNumber == 0) { + // It is alright to use same upNumber as already used by a removed member, since the upNumber + // is only used for comparing age of current cluster members (Member.isOlderThan) + val youngest = localGossip.youngestMember + upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber) + } else { + upNumber += 1 + } + member.copyUp(upNumber) + // Move LEAVING => EXITING (once we have a convergence on LEAVING + // *and* if we have a successful partition handoff) + case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ + member copy (status = Exiting) + // Everyone else that is not Exiting stays as they are + case member if member.status != Exiting && member.status != Down ⇒ member + // Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table + } } // ---------------------- diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index c9ccf23922..c37b7fd1be 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -213,6 +213,16 @@ private[cluster] case class Gossip( getOrElse(Member.removed(node)) // placeholder for removed member } + def youngestMember: Member = { + require(members.nonEmpty, "No youngest when no members") + def maxByUpNumber(mbrs: Iterable[Member]): Member = + mbrs.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) + if (overview.unreachable.isEmpty) + maxByUpNumber(members) + else + maxByUpNumber(members ++ overview.unreachable) + } + override def toString = s"Gossip(members = [${members.mkString(", ")}], overview = ${overview}, version = ${version})" } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 6726daea94..75ef83ba05 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -21,6 +21,8 @@ import MemberStatus._ class Member private[cluster] ( /** INTERNAL API **/ private[cluster] val uniqueAddress: UniqueAddress, + /** INTERNAL API **/ + private[cluster] val upNumber: Int, val status: MemberStatus, val roles: Set[String]) extends Serializable { @@ -41,15 +43,27 @@ class Member private[cluster] ( def getRoles: java.util.Set[String] = scala.collection.JavaConverters.setAsJavaSetConverter(roles).asJava + /** + * Is this member older, has been part of cluster longer, than another + * member. It is only correct when comparing two existing members in a + * cluster. A member that joined after removal of another member may be + * considered older than the removed member. + */ + def isOlderThan(other: Member): Boolean = upNumber < other.upNumber + def copy(status: MemberStatus): Member = { val oldStatus = this.status if (status == oldStatus) this else { require(allowedTransitions(oldStatus)(status), s"Invalid member status transition [ ${this} -> ${status}]") - new Member(uniqueAddress, status, roles) + new Member(uniqueAddress, upNumber, status, roles) } } + + def copyUp(upNumber: Int): Member = { + new Member(uniqueAddress, upNumber, status, roles).copy(Up) + } } /** @@ -64,12 +78,12 @@ object Member { * Create a new member with status Joining. */ private[cluster] def apply(uniqueAddress: UniqueAddress, roles: Set[String]): Member = - new Member(uniqueAddress, Joining, roles) + new Member(uniqueAddress, Int.MaxValue, Joining, roles) /** * INTERNAL API */ - private[cluster] def removed(node: UniqueAddress): Member = new Member(node, Removed, Set.empty) + private[cluster] def removed(node: UniqueAddress): Member = new Member(node, Int.MaxValue, Removed, Set.empty) /** * `Address` ordering type class, sorts addresses by host and port. diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 1028b117ba..c11fbc3a3a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -145,7 +145,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role") def memberToProto(member: Member) = { - msg.Member(mapUniqueAddress(member.uniqueAddress), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) + msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber, + msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) } def seenToProto(seen: (UniqueAddress, VectorClock)) = seen match { @@ -194,7 +195,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val hashMapping = gossip.allHashes def memberFromProto(member: msg.Member) = { - new Member(addressMapping(member.addressIndex), memberStatusFromInt(member.status.id), + new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id), member.rolesIndexes.map(roleMapping).to[Set]) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 21c87c59c4..383c3d9caf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -58,5 +58,18 @@ abstract class NodeMembershipSpec enterBarrier("after-2") } + + "correct member age" taggedAs LongRunningTest in { + val firstMember = clusterView.members.find(_.address == address(first)).get + val secondMember = clusterView.members.find(_.address == address(second)).get + val thirdMember = clusterView.members.find(_.address == address(third)).get + firstMember.isOlderThan(thirdMember) must be(true) + thirdMember.isOlderThan(firstMember) must be(false) + secondMember.isOlderThan(thirdMember) must be(true) + thirdMember.isOlderThan(secondMember) must be(false) + + enterBarrier("after-3") + + } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index f7e6294c5e..a95e9c5ca4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -121,5 +121,15 @@ class GossipSpec extends WordSpec with MustMatchers { checkMerged(g3 merge g2) checkMerged(g2 merge g3) } + + "know who is youngest" in { + // a2 and e1 is Joining + val g1 = Gossip(members = SortedSet(a2, b1.copyUp(3)), overview = GossipOverview(unreachable = Set(e1))) + g1.youngestMember must be(b1) + val g2 = Gossip(members = SortedSet(a2), overview = GossipOverview(unreachable = Set(b1.copyUp(3), e1))) + g2.youngestMember must be(b1) + val g3 = Gossip(members = SortedSet(a2, b1.copyUp(3), e2.copyUp(4))) + g3.youngestMember must be(e2) + } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala index 2200f42fef..a6168006bc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala +++ b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala @@ -10,5 +10,5 @@ object TestMember { apply(address, status, Set.empty) def apply(address: Address, status: MemberStatus, roles: Set[String]): Member = - new Member(UniqueAddress(address, 0), status, roles) + new Member(UniqueAddress(address, 0), Int.MaxValue, status, roles) } \ No newline at end of file diff --git a/akka-contrib/docs/cluster-singleton.rst b/akka-contrib/docs/cluster-singleton.rst index 9452cf624f..1bed5a29e1 100644 --- a/akka-contrib/docs/cluster-singleton.rst +++ b/akka-contrib/docs/cluster-singleton.rst @@ -23,25 +23,25 @@ The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSi It manages singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role. ``ClusterSingletonManager`` is an actor that is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The actual singleton actor is -started by the ``ClusterSingletonManager`` on the leader node by creating a child actor from +started by the ``ClusterSingletonManager`` on the oldest node by creating a child actor from supplied ``Props``. ``ClusterSingletonManager`` makes sure that at most one singleton instance is running at any point in time. -The singleton actor is always running on the leader member, which is nothing more than -the address currently sorted first in the member ring. This can change when adding -or removing members. A graceful hand over can normally be performed when joining a new -node that becomes leader or removing current leader node. Be aware that there is a short -time period when there is no active singleton during the hand over process. +The singleton actor is always running on the oldest member, which can be determined by +``Member#isOlderThan``. This can change when removing members. A graceful hand over can normally +be performed when current oldest node is leaving the cluster. Be aware that there is a short +time period when there is no active singleton during the hand-over process. -The cluster failure detector will notice when a leader node becomes unreachable due to -things like JVM crash, hard shut down, or network failure. Then a new leader node will +The cluster failure detector will notice when oldest node becomes unreachable due to +things like JVM crash, hard shut down, or network failure. Then a new oldest node will take over and a new singleton actor is created. For these failure scenarios there will not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts. -You access the singleton actor with ``actorSelection`` using the names you have specified when -creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` or -``RoleLeaderChanged`` events to keep track of which node it is supposed to be running on. +You access the singleton actor with ``actorSelection`` using the names you have +specified when creating the ClusterSingletonManager. You can subscribe to +``akka.cluster.ClusterEvent.MemberEvent`` and sort the members by age +(``Member#isOlderThan``) to keep track of oldest member. Alternatively the singleton actor may broadcast its existence when it is started. An Example @@ -56,6 +56,8 @@ scenario when integrating with external systems. On each node in the cluster you need to start the ``ClusterSingletonManager`` and supply the ``Props`` of the singleton actor, in this case the JMS queue consumer. +In Scala: + .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#create-singleton-manager Here we limit the singleton to nodes tagged with the ``"worker"`` role, but all nodes, independent of @@ -65,6 +67,10 @@ The corresponding Java API for the ``singeltonProps`` function is ``akka.contrib The Java API takes a plain String for the role parameter and ``null`` means that all nodes, independent of role, are used. +In Java: + +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java#create-singleton-manager + Here we use an application specific ``terminationMessage`` to be able to close the resources before actually stopping the singleton actor. Note that ``PoisonPill`` is a perfectly fine ``terminationMessage`` if you only need to stop the actor. @@ -74,32 +80,28 @@ Here is how the singleton actor handles the ``terminationMessage`` in this examp .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#consumer-end Note that you can send back current state to the ``ClusterSingletonManager`` before terminating. -This message will be sent over to the ``ClusterSingletonManager`` at the new leader node and it +This message will be sent over to the ``ClusterSingletonManager`` at the new oldest node and it will be passed to the ``singletonProps`` factory when creating the new singleton instance. With the names given above the path of singleton actor can be constructed by subscribing to -``RoleLeaderChanged`` cluster event and the actor reference is then looked up using ``actorSelection``: +``MemberEvent`` cluster event and sort the members by age to keep track of oldest member. -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy2 +In Scala: -Subscribe to ``LeaderChanged`` instead of ``RoleLeaderChanged`` if you don't limit the singleton to -the group of members tagged with a specific role. +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy + +In Java: + +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java#singleton-proxy + +The checks of ``role`` can be omitted if you don't limit the singleton to the group of members +tagged with a specific role. Note that the hand-over might still be in progress and the singleton actor might not be started yet -when you receive the ``LeaderChanged`` / ``RoleLeaderChanged`` event. +when you receive the member event. A nice alternative to the above proxy is to use :ref:`distributed-pub-sub`. Let the singleton actor register itself to the mediator with ``DistributedPubSubMediator.Put`` message when it is started. Send messages to the singleton actor via the mediator with ``DistributedPubSubMediator.SendToAll``. -To test scenarios where the cluster leader node is removed or shut down you can use :ref:`multi-node-testing` and -utilize the fact that the leader is supposed to be the first member when sorted by member address. - -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#sort-cluster-roles - -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#test-leave - -Also, make sure that you don't shut down the first role, which is running the test conductor controller. -Use a dedicated role for the controller, which is not a cluster member. - .. note:: The singleton pattern will be simplified, perhaps provided out-of-the-box, when the cluster handles automatic actor partitioning. diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index 36a0ba316b..384c1ffcc2 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -5,6 +5,7 @@ package akka.contrib.pattern import scala.concurrent.duration._ +import scala.collection.immutable import akka.actor.Actor import akka.actor.Actor.Receive import akka.actor.ActorLogging @@ -16,6 +17,8 @@ import akka.actor.Props import akka.actor.Terminated import akka.cluster.Cluster import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.cluster.MemberStatus import akka.AkkaException object ClusterSingletonManager { @@ -78,55 +81,55 @@ object ClusterSingletonManager { */ private object Internal { /** - * Sent from new leader to previous leader to initate the + * Sent from new oldest to previous oldest to initate the * hand-over process. `HandOverInProgress` and `HandOverDone` * are expected replies. */ case object HandOverToMe /** - * Confirmation by the previous leader that the hand + * Confirmation by the previous oldest that the hand * over process, shut down of the singleton actor, has * started. */ case object HandOverInProgress /** - * Confirmation by the previous leader that the singleton + * Confirmation by the previous oldest that the singleton * actor has been terminated and the hand-over process is * completed. The `handOverData` holds the message, if any, * sent from the singleton actor to its parent ClusterSingletonManager * when shutting down. It is passed to the `singletonProps` - * factory on the new leader node. + * factory on the new oldest node. */ case class HandOverDone(handOverData: Option[Any]) /** - * Sent from from previous leader to new leader to + * Sent from from previous oldest to new oldest to * initiate the normal hand-over process. * Especially useful when new node joins and becomes - * leader immediately, without knowing who was previous - * leader. + * oldest immediately, without knowing who was previous + * oldest. */ case object TakeOverFromMe case class HandOverRetry(count: Int) case class TakeOverRetry(count: Int) case object Cleanup - case object StartLeaderChangedBuffer + case object StartOldestChangedBuffer case object Start extends State - case object Leader extends State - case object NonLeader extends State - case object BecomingLeader extends State - case object WasLeader extends State + case object Oldest extends State + case object Younger extends State + case object BecomingOldest extends State + case object WasOldest extends State case object HandingOver extends State case object TakeOver extends State case object Uninitialized extends Data - case class NonLeaderData(leaderOption: Option[Address]) extends Data - case class BecomingLeaderData(previousLeaderOption: Option[Address]) extends Data - case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false, + case class YoungerData(oldestOption: Option[Address]) extends Data + case class BecomingOldestData(previousOldestOption: Option[Address]) extends Data + case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false, handOverData: Option[Any] = None) extends Data - case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], - newLeaderOption: Option[Address]) extends Data + case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], + newOldestOption: Option[Address]) extends Data case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data val HandOverRetryTimer = "hand-over-retry" @@ -138,7 +141,7 @@ object ClusterSingletonManager { case _ ⇒ Some(role) } - object LeaderChangedBuffer { + object OldestChangedBuffer { /** * Request to deliver one more event. */ @@ -146,67 +149,101 @@ object ClusterSingletonManager { /** * The first event, corresponding to CurrentClusterState. */ - case class InitialLeaderState(leader: Option[Address], memberCount: Int) + case class InitialOldestState(oldest: Option[Address], memberCount: Int) + + case class OldestChanged(oldest: Option[Address]) } /** - * Notifications of [[akka.cluster.ClusterEvent.LeaderChanged]] is tunneled + * Notifications of member events that track oldest member is tunneled * via this actor (child of ClusterSingletonManager) to be able to deliver - * one change at a time. Avoiding simultaneous leader changes simplifies + * one change at a time. Avoiding simultaneous changes simplifies * the process in ClusterSingletonManager. ClusterSingletonManager requests * next event with `GetNext` when it is ready for it. Only one outstanding * `GetNext` request is allowed. Incoming events are buffered and delivered * upon `GetNext` request. */ - class LeaderChangedBuffer(role: Option[String]) extends Actor { - import LeaderChangedBuffer._ + class OldestChangedBuffer(role: Option[String]) extends Actor { + import OldestChangedBuffer._ import context.dispatcher val cluster = Cluster(context.system) - var changes = Vector.empty[AnyRef] - var memberCount = 0 + // sort by age, oldest first + val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) - // subscribe to LeaderChanged, re-subscribe when restart - override def preStart(): Unit = role match { - case None ⇒ cluster.subscribe(self, classOf[LeaderChanged]) - case Some(_) ⇒ cluster.subscribe(self, classOf[RoleLeaderChanged]) + var changes = Vector.empty[AnyRef] + + // subscribe to MemberEvent, re-subscribe when restart + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) } override def postStop(): Unit = cluster.unsubscribe(self) + def matchingRole(member: Member): Boolean = role match { + case None ⇒ true + case Some(r) ⇒ member.hasRole(r) + } + + def trackChange(block: () ⇒ Unit): Unit = { + val before = membersByAge.headOption + block() + val after = membersByAge.headOption + if (before != after) + changes :+= OldestChanged(after.map(_.address)) + } + + def handleInitial(state: CurrentClusterState): Unit = { + membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { + case m if m.status == MemberStatus.Up && matchingRole(m) ⇒ m + } + val initial = InitialOldestState(membersByAge.headOption.map(_.address), membersByAge.size) + changes :+= initial + } + + def add(m: Member): Unit = { + if (matchingRole(m)) + trackChange { () ⇒ membersByAge += m } + } + + def remove(m: Member): Unit = { + if (matchingRole(m)) + trackChange { () ⇒ membersByAge -= m } + } + + def sendFirstChange(): Unit = { + val event = changes.head + changes = changes.tail + context.parent ! event + } + def receive = { - case state: CurrentClusterState ⇒ - val initial = role match { - case None ⇒ InitialLeaderState(state.leader, state.members.size) - case Some(r) ⇒ InitialLeaderState(state.roleLeader(r), state.members.count(_.hasRole(r))) - } - changes :+= initial - case event: LeaderChanged ⇒ - changes :+= event - case RoleLeaderChanged(r, leader) ⇒ - if (role.orNull == r) changes :+= LeaderChanged(leader) + case state: CurrentClusterState ⇒ handleInitial(state) + case MemberUp(m) ⇒ add(m) + case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) ⇒ + remove(mEvent.member) case GetNext if changes.isEmpty ⇒ context.become(deliverNext, discardOld = false) case GetNext ⇒ - val event = changes.head - changes = changes.tail - context.parent ! event + sendFirstChange() } // the buffer was empty when GetNext was received, deliver next event immediately def deliverNext: Actor.Receive = { case state: CurrentClusterState ⇒ - val initial = role match { - case None ⇒ InitialLeaderState(state.leader, state.members.size) - case Some(r) ⇒ InitialLeaderState(state.roleLeader(r), state.members.count(_.hasRole(r))) + handleInitial(state) + sendFirstChange() + context.unbecome() + case MemberUp(m) ⇒ + add(m) + if (changes.nonEmpty) { + sendFirstChange() + context.unbecome() } - context.parent ! initial - context.unbecome() - case event: LeaderChanged ⇒ - context.parent ! event - context.unbecome() - case RoleLeaderChanged(r, leader) ⇒ - if (role.orNull == r) { - context.parent ! LeaderChanged(leader) + case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) ⇒ + remove(mEvent.member) + if (changes.nonEmpty) { + sendFirstChange() context.unbecome() } } @@ -248,25 +285,24 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * * The ClusterSingletonManager is supposed to be started on all nodes, * or all nodes with specified role, in the cluster with `actorOf`. - * The actual singleton is started on the leader node by creating a child + * The actual singleton is started on the oldest node by creating a child * actor from the supplied `singletonProps`. * - * The singleton actor is always running on the leader member, which is - * nothing more than the address currently sorted first in the member - * ring. This can change when adding or removing members. A graceful hand - * over can normally be performed when joining a new node that becomes - * leader or removing current leader node. Be aware that there is a - * short time period when there is no active singleton during the + * The singleton actor is always running on the oldest member, which can + * be determined by [[akka.cluster.Member#isOlderThan]]. + * This can change when removing members. A graceful hand over can normally + * be performed when current oldest node is leaving the cluster. Be aware that + * there is a short time period when there is no active singleton during the * hand-over process. * * The singleton actor can at any time send a message to its parent * ClusterSingletonManager and this message will be passed to the - * `singletonProps` factory on the new leader node when a graceful + * `singletonProps` factory on the new oldest node when a graceful * hand-over is performed. * - * The cluster failure detector will notice when a leader node + * The cluster failure detector will notice when oldest node * becomes unreachable due to things like JVM crash, hard shut down, - * or network failure. Then a new leader node will take over and a + * or network failure. Then a new oldest node will take over and a * new singleton actor is created. For these failure scenarios there * will not be a graceful hand-over, but more than one active singletons * is prevented by all reasonable means. Some corner cases are eventually @@ -274,10 +310,9 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * * You access the singleton actor with `actorSelection` using the names you have * specified when creating the ClusterSingletonManager. You can subscribe to - * [[akka.cluster.ClusterEvent.LeaderChanged]] or - * [[akka.cluster.ClusterEvent.RoleLeaderChanged]] to keep track of which node - * it is supposed to be running on. Alternatively the singleton actor may - * broadcast its existence when it is started. + * [[akka.cluster.ClusterEvent.MemberEvent]] and sort the members by age + * ([[akka.cluster.ClusterEvent.Member#isOlderThan]]) to keep track of oldest member. + * Alternatively the singleton actor may broadcast its existence when it is started. * * Use factory method [[ClusterSingletonManager#props] to create the * [[akka.actor.Props]] for the actor. @@ -294,12 +329,12 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * * '''''singletonName''''' The actor name of the child singleton actor. * - * '''''terminationMessage''''' When handing over to a new leader node + * '''''terminationMessage''''' When handing over to a new oldest node * this `terminationMessage` is sent to the singleton actor to tell * it to finish its work, close resources, and stop. It can sending * a message back to the parent ClusterSingletonManager, which will - * passed to the `singletonProps` factory on the new leader node. - * The hand-over to the new leader node is completed when the + * passed to the `singletonProps` factory on the new oldest node. + * The hand-over to the new oldest node is completed when the * singleton actor is terminated. * Note that [[akka.actor.PoisonPill]] is a perfectly fine * `terminationMessage` if you only need to stop the actor. @@ -308,28 +343,28 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * If the role is not specified it's a singleton among all nodes in * the cluster. * - * '''''maxHandOverRetries''''' When a node is becoming leader it sends - * hand-over request to previous leader. This is retried with the - * `retryInterval` until the previous leader confirms that the hand + * '''''maxHandOverRetries''''' When a node is becoming oldest it sends + * hand-over request to previous oldest. This is retried with the + * `retryInterval` until the previous oldest confirms that the hand * over has started, or this `maxHandOverRetries` limit has been * reached. If the retry limit is reached it takes the decision to be - * the new leader if previous leader is unknown (typically removed), + * the new oldest if previous oldest is unknown (typically removed), * otherwise it initiates a new round by throwing * [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting * restart with fresh state. For a cluster with many members you might * need to increase this retry limit because it takes longer time to * propagate changes across all nodes. * - * '''''maxTakeOverRetries''''' When a leader node is not leader any more - * it sends take over request to the new leader to initiate the normal + * '''''maxTakeOverRetries''''' When a oldest node is not oldest any more + * it sends take over request to the new oldest to initiate the normal * hand-over process. This is especially useful when new node joins and becomes - * leader immediately, without knowing who was previous leader. This is retried + * oldest immediately, without knowing who was previous oldest. This is retried * with the `retryInterval` until this retry limit has been reached. If the retry * limit is reached it initiates a new round by throwing * [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting * restart with fresh state. This will also cause the singleton actor to be * stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to - * ensure that new leader doesn't start singleton actor before previous is + * ensure that new oldest doesn't start singleton actor before previous is * stopped for certain corner cases. * * '''''loggingEnabled''''' Logging of what is going on at info log level. @@ -345,13 +380,13 @@ class ClusterSingletonManager( loggingEnabled: Boolean) extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] { - // to ensure that new leader doesn't start singleton actor before previous is stopped for certain corner cases + // to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases require(maxTakeOverRetries < maxHandOverRetries, s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]") import ClusterSingletonManager._ import ClusterSingletonManager.Internal._ - import ClusterSingletonManager.Internal.LeaderChangedBuffer._ + import ClusterSingletonManager.Internal.OldestChangedBuffer._ val cluster = Cluster(context.system) val selfAddressOption = Some(cluster.selfAddress) @@ -360,9 +395,9 @@ class ClusterSingletonManager( s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") // started when when self member is Up - var leaderChangedBuffer: ActorRef = _ + var oldestChangedBuffer: ActorRef = _ // Previous GetNext request delivered event and new GetNext is to be sent - var leaderChangedReceived = true + var oldestChangedReceived = true // keep track of previously removed members var removed = Map.empty[Address, Deadline] @@ -392,9 +427,9 @@ class ClusterSingletonManager( setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) - // defer subscription to LeaderChanged to avoid some jitter when + // defer subscription to avoid some jitter when // starting/joining several nodes at the same time - cluster.registerOnMemberUp(self ! StartLeaderChangedBuffer) + cluster.registerOnMemberUp(self ! StartOldestChangedBuffer) } override def postStop(): Unit = { @@ -405,55 +440,55 @@ class ClusterSingletonManager( def peer(at: Address): ActorSelection = context.actorSelection(self.path.toStringWithAddress(at)) - def getNextLeaderChanged(): Unit = - if (leaderChangedReceived) { - leaderChangedReceived = false - leaderChangedBuffer ! GetNext + def getNextOldestChanged(): Unit = + if (oldestChangedReceived) { + oldestChangedReceived = false + oldestChangedBuffer ! GetNext } startWith(Start, Uninitialized) when(Start) { - case Event(StartLeaderChangedBuffer, _) ⇒ - leaderChangedBuffer = context.actorOf(Props(classOf[LeaderChangedBuffer], role). + case Event(StartOldestChangedBuffer, _) ⇒ + oldestChangedBuffer = context.actorOf(Props(classOf[OldestChangedBuffer], role). withDispatcher(context.props.dispatcher)) - getNextLeaderChanged() + getNextOldestChanged() stay - case Event(InitialLeaderState(leaderOption, memberCount), _) ⇒ - leaderChangedReceived = true - if (leaderOption == selfAddressOption && memberCount == 1) - // alone, leader immediately - gotoLeader(None) - else if (leaderOption == selfAddressOption) - goto(BecomingLeader) using BecomingLeaderData(None) + case Event(InitialOldestState(oldestOption, memberCount), _) ⇒ + oldestChangedReceived = true + if (oldestOption == selfAddressOption && memberCount == 1) + // alone, oldest immediately + gotoOldest(None) + else if (oldestOption == selfAddressOption) + goto(BecomingOldest) using BecomingOldestData(None) else - goto(NonLeader) using NonLeaderData(leaderOption) + goto(Younger) using YoungerData(oldestOption) } - when(NonLeader) { - case Event(LeaderChanged(leaderOption), NonLeaderData(previousLeaderOption)) ⇒ - leaderChangedReceived = true - if (leaderOption == selfAddressOption) { - logInfo("NonLeader observed LeaderChanged: [{} -> myself]", previousLeaderOption) - previousLeaderOption match { - case None ⇒ gotoLeader(None) - case Some(prev) if removed.contains(prev) ⇒ gotoLeader(None) + when(Younger) { + case Event(OldestChanged(oldestOption), YoungerData(previousOldestOption)) ⇒ + oldestChangedReceived = true + if (oldestOption == selfAddressOption) { + logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption) + previousOldestOption match { + case None ⇒ gotoOldest(None) + case Some(prev) if removed.contains(prev) ⇒ gotoOldest(None) case Some(prev) ⇒ peer(prev) ! HandOverToMe - goto(BecomingLeader) using BecomingLeaderData(previousLeaderOption) + goto(BecomingOldest) using BecomingOldestData(previousOldestOption) } } else { - logInfo("NonLeader observed LeaderChanged: [{} -> {}]", previousLeaderOption, leaderOption) - getNextLeaderChanged() - stay using NonLeaderData(leaderOption) + logInfo("Younger observed OldestChanged: [{} -> {}]", previousOldestOption, oldestOption) + getNextOldestChanged() + stay using YoungerData(oldestOption) } - case Event(MemberRemoved(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ - logInfo("Previous leader removed [{}]", m.address) + case Event(MemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒ + logInfo("Previous oldest removed [{}]", m.address) addRemoved(m.address) - // transition when LeaderChanged - stay using NonLeaderData(None) + // transition when OldestChanged + stay using YoungerData(None) case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") @@ -461,7 +496,7 @@ class ClusterSingletonManager( } - when(BecomingLeader) { + when(BecomingOldest) { case Event(HandOverInProgress, _) ⇒ // confirmation that the hand-over process has started @@ -469,105 +504,105 @@ class ClusterSingletonManager( cancelTimer(HandOverRetryTimer) stay - case Event(HandOverDone(handOverData), BecomingLeaderData(Some(previousLeader))) ⇒ - if (sender.path.address == previousLeader) - gotoLeader(handOverData) + case Event(HandOverDone(handOverData), BecomingOldestData(Some(previousOldest))) ⇒ + if (sender.path.address == previousOldest) + gotoOldest(handOverData) else { - logInfo("Ignoring HandOverDone in BecomingLeader from [{}]. Expected previous leader [{}]", - sender.path.address, previousLeader) + logInfo("Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]", + sender.path.address, previousOldest) stay } - case Event(MemberRemoved(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ - logInfo("Previous leader [{}] removed", previousLeader) + case Event(MemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒ + logInfo("Previous oldest [{}] removed", previousOldest) addRemoved(m.address) stay - case Event(TakeOverFromMe, BecomingLeaderData(None)) ⇒ + case Event(TakeOverFromMe, BecomingOldestData(None)) ⇒ sender ! HandOverToMe - stay using BecomingLeaderData(Some(sender.path.address)) + stay using BecomingOldestData(Some(sender.path.address)) - case Event(TakeOverFromMe, BecomingLeaderData(Some(previousLeader))) ⇒ - if (previousLeader == sender.path.address) sender ! HandOverToMe - else logInfo("Ignoring TakeOver request in BecomingLeader from [{}]. Expected previous leader [{}]", - sender.path.address, previousLeader) + case Event(TakeOverFromMe, BecomingOldestData(Some(previousOldest))) ⇒ + if (previousOldest == sender.path.address) sender ! HandOverToMe + else logInfo("Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]", + sender.path.address, previousOldest) stay - case Event(HandOverRetry(count), BecomingLeaderData(previousLeaderOption)) ⇒ + case Event(HandOverRetry(count), BecomingOldestData(previousOldestOption)) ⇒ if (count <= maxHandOverRetries) { - logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption) - previousLeaderOption foreach { peer(_) ! HandOverToMe } + logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption) + previousOldestOption foreach { peer(_) ! HandOverToMe } setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false) stay() - } else if (previousLeaderOption forall removed.contains) { - // can't send HandOverToMe, previousLeader unknown for new node (or restart) - // previous leader might be down or removed, so no TakeOverFromMe message is received - logInfo("Timeout in BecomingLeader. Previous leader unknown, removed and no TakeOver request.") - gotoLeader(None) + } else if (previousOldestOption forall removed.contains) { + // can't send HandOverToMe, previousOldest unknown for new node (or restart) + // previous oldest might be down or removed, so no TakeOverFromMe message is received + logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.") + gotoOldest(None) } else throw new ClusterSingletonManagerIsStuck( - s"Becoming singleton leader was stuck because previous leader [${previousLeaderOption}] is unresponsive") + s"Becoming singleton oldest was stuck because previous oldest [${previousOldestOption}] is unresponsive") } - def gotoLeader(handOverData: Option[Any]): State = { + def gotoOldest(handOverData: Option[Any]): State = { logInfo("Singleton manager [{}] starting singleton actor", cluster.selfAddress) val singleton = context watch context.actorOf(singletonProps(handOverData), singletonName) - goto(Leader) using LeaderData(singleton) + goto(Oldest) using OldestData(singleton) } - when(Leader) { - case Event(LeaderChanged(leaderOption), LeaderData(singleton, singletonTerminated, handOverData)) ⇒ - leaderChangedReceived = true - logInfo("Leader observed LeaderChanged: [{} -> {}]", cluster.selfAddress, leaderOption) - leaderOption match { + when(Oldest) { + case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated, handOverData)) ⇒ + oldestChangedReceived = true + logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption) + oldestOption match { case Some(a) if a == cluster.selfAddress ⇒ - // already leader + // already oldest stay case Some(a) if removed.contains(a) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, None) case Some(a) ⇒ - // send TakeOver request in case the new leader doesn't know previous leader + // send TakeOver request in case the new oldest doesn't know previous oldest peer(a) ! TakeOverFromMe setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) - goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = Some(a)) + goto(WasOldest) using WasOldestData(singleton, singletonTerminated, handOverData, newOldestOption = Some(a)) case None ⇒ - // new leader will initiate the hand-over + // new oldest will initiate the hand-over setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) - goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = None) + goto(WasOldest) using WasOldestData(singleton, singletonTerminated, handOverData, newOldestOption = None) } - case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData)) ⇒ + case Event(HandOverToMe, OldestData(singleton, singletonTerminated, handOverData)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(singletonHandOverMessage, d @ LeaderData(singleton, _, _)) if sender == singleton ⇒ + case Event(singletonHandOverMessage, d @ OldestData(singleton, _, _)) if sender == singleton ⇒ stay using d.copy(handOverData = Some(singletonHandOverMessage)) - case Event(Terminated(ref), d @ LeaderData(singleton, _, _)) if ref == singleton ⇒ + case Event(Terminated(ref), d @ OldestData(singleton, _, _)) if ref == singleton ⇒ stay using d.copy(singletonTerminated = true) } - when(WasLeader) { - case Event(TakeOverRetry(count), WasLeaderData(_, _, _, newLeaderOption)) ⇒ + when(WasOldest) { + case Event(TakeOverRetry(count), WasOldestData(_, _, _, newOldestOption)) ⇒ if (count <= maxTakeOverRetries) { - logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeaderOption) - newLeaderOption foreach { peer(_) ! TakeOverFromMe } + logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption) + newOldestOption foreach { peer(_) ! TakeOverFromMe } setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), retryInterval, repeat = false) stay } else - throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeaderOption}] never occured") + throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured") - case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ + case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(MemberRemoved(m), WasLeaderData(singleton, singletonTerminated, handOverData, Some(newLeader))) if m.address == newLeader ⇒ + case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if m.address == newOldest ⇒ addRemoved(m.address) gotoHandingOver(singleton, singletonTerminated, handOverData, None) - case Event(singletonHandOverMessage, d @ WasLeaderData(singleton, _, _, _)) if sender == singleton ⇒ + case Event(singletonHandOverMessage, d @ WasOldestData(singleton, _, _, _)) if sender == singleton ⇒ stay using d.copy(handOverData = Some(singletonHandOverMessage)) - case Event(Terminated(ref), d @ WasLeaderData(singleton, _, _, _)) if ref == singleton ⇒ + case Event(Terminated(ref), d @ WasOldestData(singleton, _, _, _)) if ref == singleton ⇒ stay using d.copy(singletonTerminated = true) } @@ -597,10 +632,10 @@ class ClusterSingletonManager( } def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = { - val newLeader = handOverTo.map(_.path.address) - logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newLeader) + val newOldest = handOverTo.map(_.path.address) + logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest) handOverTo foreach { _ ! HandOverDone(handOverData) } - goto(NonLeader) using NonLeaderData(newLeader) + goto(Younger) using YoungerData(newOldest) } whenUnhandled { @@ -622,20 +657,20 @@ class ClusterSingletonManager( } onTransition { - case _ -> BecomingLeader ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false) + case _ -> BecomingOldest ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false) } onTransition { - case BecomingLeader -> _ ⇒ cancelTimer(HandOverRetryTimer) - case WasLeader -> _ ⇒ cancelTimer(TakeOverRetryTimer) + case BecomingOldest -> _ ⇒ cancelTimer(HandOverRetryTimer) + case WasOldest -> _ ⇒ cancelTimer(TakeOverRetryTimer) } onTransition { - case _ -> (NonLeader | Leader) ⇒ getNextLeaderChanged() + case _ -> (Younger | Oldest) ⇒ getNextOldestChanged() } onTransition { - case _ -> NonLeader if removed.contains(cluster.selfAddress) ⇒ + case _ -> Younger if removed.contains(cluster.selfAddress) ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala index 0601e62d48..6f3d5dc43f 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala @@ -70,16 +70,6 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan override def initialParticipants = roles.size - // Sort the roles in the order used by the cluster. - lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = { - implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { - import Member.addressOrdering - def compare(x: RoleName, y: RoleName) = - addressOrdering.compare(node(x).address, node(y).address) - } - roles.filterNot(_ == controller).toVector.sorted - } - def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system) join node(to).address @@ -105,52 +95,77 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan } } - def echo(leader: RoleName): ActorSelection = - system.actorSelection(RootActorPath(node(leader).address) / "user" / "singleton" / "echo") + def echo(oldest: RoleName): ActorSelection = + system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "echo") - def verify(leader: RoleName): Unit = { - enterBarrier("before-" + leader.name + "-verified") - runOn(leader) { - expectMsg(EchoStarted) + def awaitMemberUp(memberProbe: TestProbe, nodes: RoleName*): Unit = { + runOn(nodes.filterNot(_ == nodes.head): _*) { + memberProbe.expectMsgType[MemberUp](15.seconds).member.address must be(node(nodes.head).address) } - enterBarrier(leader.name + "-active") - - runOn(sortedClusterRoles.filterNot(_ == leader): _*) { - echo(leader) ! "hello" - fishForMessage() { - case _: ActorRef ⇒ true - case EchoStarted ⇒ false - } match { - case echoRef: ActorRef ⇒ echoRef.path.address must be(node(leader).address) - } + runOn(nodes.head) { + memberProbe.receiveN(nodes.size, 15.seconds).collect { case MemberUp(m) ⇒ m.address }.toSet must be( + nodes.map(node(_).address).toSet) } - enterBarrier(leader.name + "-verified") + enterBarrier(nodes.head.name + "-up") } "A ClusterSingletonManager in chaotic cluster" must { - "startup 3 node cluster" in within(90 seconds) { - log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", ")) + "startup 6 node cluster" in within(60 seconds) { + val memberProbe = TestProbe() + Cluster(system).subscribe(memberProbe.ref, classOf[MemberUp]) + memberProbe.expectMsgClass(classOf[CurrentClusterState]) - join(sortedClusterRoles(5), sortedClusterRoles.last) - join(sortedClusterRoles(4), sortedClusterRoles.last) - join(sortedClusterRoles(3), sortedClusterRoles.last) + join(first, first) + awaitMemberUp(memberProbe, first) + runOn(first) { + expectMsg(EchoStarted) + } + enterBarrier("first-started") + + join(second, first) + awaitMemberUp(memberProbe, second, first) + + join(third, first) + awaitMemberUp(memberProbe, third, second, first) + + join(fourth, first) + awaitMemberUp(memberProbe, fourth, third, second, first) + + join(fifth, first) + awaitMemberUp(memberProbe, fifth, fourth, third, second, first) + + join(sixth, first) + awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, first) + + runOn(controller) { + echo(first) ! "hello" + expectMsgType[ActorRef](3.seconds).path.address must be(node(first).address) + } + enterBarrier("first-verified") - verify(sortedClusterRoles(3)) } - "hand over when joining 3 more nodes" in within(90 seconds) { - join(sortedClusterRoles(2), sortedClusterRoles(3)) - join(sortedClusterRoles(1), sortedClusterRoles(4)) - join(sortedClusterRoles(0), sortedClusterRoles(5)) + "take over when three oldest nodes crash in 6 nodes cluster" in within(90 seconds) { + // FIXME change those to DeadLetterFilter + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*"))) + system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Disassociated.*"))) + system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Association failed.*"))) + enterBarrier("logs-muted") + + crash(first, second, third) + enterBarrier("after-crash") + runOn(fourth) { + expectMsg(EchoStarted) + } + enterBarrier("fourth-active") + + runOn(controller) { + echo(fourth) ! "hello" + expectMsgType[ActorRef](3.seconds).path.address must be(node(fourth).address) + } + enterBarrier("fourth-verified") - verify(sortedClusterRoles(0)) } - - "take over when three leaders crash in 6 nodes cluster" in within(90 seconds) { - crash(sortedClusterRoles(0), sortedClusterRoles(1), sortedClusterRoles(2)) - verify(sortedClusterRoles(3)) - } - } } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 5973a968d4..65930c80b1 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -139,59 +139,42 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { queue ! UnregisterConsumer case UnregistrationOk ⇒ // reply to ClusterSingletonManager with hand over data, - // which will be passed as parameter to new leader consumer + // which will be passed as parameter to new consumer singleton context.parent ! current context stop self //#consumer-end } } - // documentation of how to keep track of the leader address in user land + // documentation of how to keep track of the oldest member in user land //#singleton-proxy class ConsumerProxy extends Actor { - // subscribe to LeaderChanged, re-subscribe when restart + // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = - Cluster(context.system).subscribe(self, classOf[LeaderChanged]) - override def postStop(): Unit = - Cluster(context.system).unsubscribe(self) - - var leaderAddress: Option[Address] = None - - def receive = { - case state: CurrentClusterState ⇒ leaderAddress = state.leader - case LeaderChanged(leader) ⇒ leaderAddress = leader - case other ⇒ consumer foreach { _.tell(other, sender) } - } - - def consumer: Option[ActorSelection] = - leaderAddress map (a ⇒ context.actorSelection(RootActorPath(a) / - "user" / "singleton" / "consumer")) - } - //#singleton-proxy - - // documentation of how to keep track of the role leader address in user land - //#singleton-proxy2 - class ConsumerProxy2 extends Actor { - // subscribe to RoleLeaderChanged, re-subscribe when restart - override def preStart(): Unit = - Cluster(context.system).subscribe(self, classOf[RoleLeaderChanged]) + Cluster(context.system).subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = Cluster(context.system).unsubscribe(self) val role = "worker" - var leaderAddress: Option[Address] = None + // sort by age, oldest first + val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) def receive = { - case state: CurrentClusterState ⇒ leaderAddress = state.roleLeader(role) - case RoleLeaderChanged(r, leader) ⇒ if (r == role) leaderAddress = leader - case other ⇒ consumer foreach { _.tell(other, sender) } + case state: CurrentClusterState ⇒ + membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { + case m if m.hasRole(role) ⇒ m + } + case MemberUp(m) ⇒ if (m.hasRole(role)) membersByAge += m + case MemberRemoved(m) ⇒ if (m.hasRole(role)) membersByAge -= m + case other ⇒ consumer foreach { _.tell(other, sender) } } def consumer: Option[ActorSelection] = - leaderAddress map (a ⇒ context.actorSelection(RootActorPath(a) / + membersByAge.headOption map (m ⇒ context.actorSelection(RootActorPath(m.address) / "user" / "singleton" / "consumer")) } - //#singleton-proxy2 + //#singleton-proxy } @@ -213,18 +196,6 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS val identifyProbe = TestProbe() - //#sort-cluster-roles - // Sort the roles in the order used by the cluster. - lazy val sortedWorkerNodes: immutable.IndexedSeq[RoleName] = { - implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { - import Member.addressOrdering - def compare(x: RoleName, y: RoleName) = - addressOrdering.compare(node(x).address, node(y).address) - } - roles.filterNot(r ⇒ r == controller || r == observer).toVector.sorted - } - //#sort-cluster-roles - def queue: ActorRef = { system.actorSelection(node(controller) / "user" / "queue").tell(Identify("queue"), identifyProbe.ref) identifyProbe.expectMsgType[ActorIdentity].ref.get @@ -237,6 +208,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } } + def awaitMemberUp(memberProbe: TestProbe, nodes: RoleName*): Unit = { + runOn(nodes.filterNot(_ == nodes.head): _*) { + memberProbe.expectMsgType[MemberUp](15.seconds).member.address must be(node(nodes.head).address) + } + runOn(nodes.head) { + memberProbe.receiveN(nodes.size, 15.seconds).collect { case MemberUp(m) ⇒ m.address }.toSet must be( + nodes.map(node(_).address).toSet) + } + enterBarrier(nodes.head.name + "-up") + } + def createSingleton(): ActorRef = { //#create-singleton-manager system.actorOf(ClusterSingletonManager.props( @@ -249,30 +231,34 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS //#create-singleton-manager } - def consumer(leader: RoleName): ActorSelection = - system.actorSelection(RootActorPath(node(leader).address) / "user" / "singleton" / "consumer") + def consumer(oldest: RoleName): ActorSelection = + system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "consumer") - def verify(leader: RoleName, msg: Int, expectedCurrent: Int): Unit = { - enterBarrier("before-" + leader.name + "-verified") - runOn(leader) { + def verifyRegistration(oldest: RoleName, expectedCurrent: Int): Unit = { + enterBarrier("before-" + oldest.name + "-registration-verified") + runOn(oldest) { expectMsg(RegistrationOk) - consumer(leader) ! GetCurrent + consumer(oldest) ! GetCurrent expectMsg(expectedCurrent) } - enterBarrier(leader.name + "-active") + enterBarrier("after-" + oldest.name + "-registration-verified") + } + + def verifyMsg(oldest: RoleName, msg: Int): Unit = { + enterBarrier("before-" + msg + "-verified") runOn(controller) { queue ! msg // make sure it's not terminated, which would be wrong expectNoMsg(1 second) } - runOn(leader) { - expectMsg(msg) + runOn(oldest) { + expectMsg(5.seconds, msg) } - runOn(sortedWorkerNodes.filterNot(_ == leader): _*) { + runOn(roles.filterNot(r ⇒ r == oldest || r == controller || r == observer): _*) { expectNoMsg(1 second) } - enterBarrier(leader.name + "-verified") + enterBarrier("after-" + msg + "-verified") } def crash(roles: RoleName*): Unit = { @@ -288,8 +274,11 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS "A ClusterSingletonManager" must { - "startup in single member cluster" in within(10 seconds) { - log.info("Sorted cluster nodes [{}]", sortedWorkerNodes.map(node(_).address).mkString(", ")) + "startup 6 node cluster" in within(60 seconds) { + + val memberProbe = TestProbe() + Cluster(system).subscribe(memberProbe.ref, classOf[MemberUp]) + memberProbe.expectMsgClass(classOf[CurrentClusterState]) runOn(controller) { // watch that it is not terminated, which would indicate misbehaviour @@ -297,55 +286,48 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } enterBarrier("queue-started") - join(sortedWorkerNodes.last, sortedWorkerNodes.last) - verify(sortedWorkerNodes.last, msg = 1, expectedCurrent = 0) + join(first, first) + awaitMemberUp(memberProbe, first) + verifyRegistration(first, expectedCurrent = 0) + verifyMsg(first, msg = 1) // join the observer node as well, which should not influence since it doesn't have the "worker" role - join(observer, sortedWorkerNodes.last) + join(observer, first) + awaitMemberUp(memberProbe, observer, first) + + join(second, first) + awaitMemberUp(memberProbe, second, observer, first) + verifyMsg(first, msg = 2) + + join(third, first) + awaitMemberUp(memberProbe, third, second, observer, first) + verifyMsg(first, msg = 3) + + join(fourth, first) + awaitMemberUp(memberProbe, fourth, third, second, observer, first) + verifyMsg(first, msg = 4) + + join(fifth, first) + awaitMemberUp(memberProbe, fifth, fourth, third, second, observer, first) + verifyMsg(first, msg = 5) + + join(sixth, first) + awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, observer, first) + verifyMsg(first, msg = 6) + enterBarrier("after-1") } - "hand over when new leader joins to 1 node cluster" in within(15 seconds) { - val newLeaderRole = sortedWorkerNodes(4) - join(newLeaderRole, sortedWorkerNodes.last) - verify(newLeaderRole, msg = 2, expectedCurrent = 1) - } - - "hand over when new leader joins to 2 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedWorkerNodes(3) - join(newLeaderRole, sortedWorkerNodes.last) - verify(newLeaderRole, msg = 3, expectedCurrent = 2) - } - - "hand over when new leader joins to 3 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedWorkerNodes(2) - join(newLeaderRole, sortedWorkerNodes.last) - verify(newLeaderRole, msg = 4, expectedCurrent = 3) - } - - "hand over when new leader joins to 4 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedWorkerNodes(1) - join(newLeaderRole, sortedWorkerNodes.last) - verify(newLeaderRole, msg = 5, expectedCurrent = 4) - } - - "hand over when new leader joins to 5 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedWorkerNodes(0) - join(newLeaderRole, sortedWorkerNodes.last) - verify(newLeaderRole, msg = 6, expectedCurrent = 5) - } - - "hand over when leader leaves in 6 nodes cluster " in within(30 seconds) { - //#test-leave - val leaveRole = sortedWorkerNodes(0) - val newLeaderRole = sortedWorkerNodes(1) + "hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) { + val leaveRole = first + val newOldestRole = second runOn(leaveRole) { Cluster(system) leave node(leaveRole).address } - //#test-leave - verify(newLeaderRole, msg = 7, expectedCurrent = 6) + verifyRegistration(second, expectedCurrent = 6) + verifyMsg(second, msg = 7) runOn(leaveRole) { system.actorSelection("/user/singleton").tell(Identify("singleton"), identifyProbe.ref) @@ -360,24 +342,28 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS enterBarrier("after-leave") } - "take over when leader crashes in 5 nodes cluster" in within(60 seconds) { + "take over when oldest crashes in 5 nodes cluster" in within(60 seconds) { + // FIXME change those to DeadLetterFilter system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*"))) system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Disassociated.*"))) system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Association failed.*"))) enterBarrier("logs-muted") - crash(sortedWorkerNodes(1)) - verify(sortedWorkerNodes(2), msg = 8, expectedCurrent = 0) + crash(second) + verifyRegistration(third, expectedCurrent = 0) + verifyMsg(third, msg = 8) } - "take over when two leaders crash in 3 nodes cluster" in within(60 seconds) { - crash(sortedWorkerNodes(2), sortedWorkerNodes(3)) - verify(sortedWorkerNodes(4), msg = 9, expectedCurrent = 0) + "take over when two oldest crash in 3 nodes cluster" in within(60 seconds) { + crash(third, fourth) + verifyRegistration(fifth, expectedCurrent = 0) + verifyMsg(fifth, msg = 9) } - "take over when leader crashes in 2 nodes cluster" in within(60 seconds) { - crash(sortedWorkerNodes(4)) - verify(sortedWorkerNodes(5), msg = 10, expectedCurrent = 0) + "take over when oldest crashes in 2 nodes cluster" in within(60 seconds) { + crash(fifth) + verifyRegistration(sixth, expectedCurrent = 0) + verifyMsg(sixth, msg = 10) } } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java new file mode 100644 index 0000000000..8ed3cfd069 --- /dev/null +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.contrib.pattern; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import akka.actor.ActorSystem; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.cluster.Member; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.ClusterEvent.MemberEvent; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.MemberRemoved; + +public class ClusterSingletonManagerTest { + + public void demo() { + final ActorSystem system = null; + final ActorRef queue = null; + final ActorRef testActor = null; + + //#create-singleton-manager + system.actorOf( + ClusterSingletonManager.defaultProps("consumer", new End(), "worker", + new ClusterSingletonPropsFactory() { + @Override + public Props create(Object handOverData) { + return Props.create(Consumer.class, handOverData, queue, testActor); + } + }), "singleton"); + //#create-singleton-manager + } + + static//documentation of how to keep track of the oldest member in user land + //#singleton-proxy + public class ConsumerProxy extends UntypedActor { + + final Cluster cluster = Cluster.get(getContext().system()); + + final Comparator ageComparator = new Comparator() { + public int compare(Member a, Member b) { + if (a.isOlderThan(b)) + return -1; + else if (b.isOlderThan(a)) + return 1; + else + return 0; + } + }; + final SortedSet membersByAge = new TreeSet(ageComparator); + + final String role = "worker"; + + //subscribe to cluster changes + @Override + public void preStart() { + cluster.subscribe(getSelf(), MemberEvent.class); + } + + //re-subscribe when restart + @Override + public void postStop() { + cluster.unsubscribe(getSelf()); + } + + @Override + public void onReceive(Object message) { + if (message instanceof CurrentClusterState) { + CurrentClusterState state = (CurrentClusterState) message; + List members = new ArrayList(); + for (Member m : state.getMembers()) { + if (m.hasRole(role)) + members.add(m); + } + membersByAge.clear(); + membersByAge.addAll(members); + + } else if (message instanceof MemberUp) { + Member m = ((MemberUp) message).member(); + if (m.hasRole(role)) + membersByAge.add(m); + + } else if (message instanceof MemberRemoved) { + Member m = ((MemberUp) message).member(); + if (m.hasRole(role)) + membersByAge.remove(m); + + } else if (message instanceof MemberEvent) { + // not interesting + + } else if (!membersByAge.isEmpty()) { + currentMaster().tell(message, getSender()); + + } + } + + ActorSelection currentMaster() { + return getContext().actorSelection(membersByAge.first().address() + + "/user/singleton/statsService"); + } + + } + //#singleton-proxy + + + public static class End {} + + public static class Consumer {} +} diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 3abce558e3..14068b062d 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -275,10 +275,10 @@ Cluster Singleton Pattern For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster. -This can be implemented by subscribing to ``LeaderChanged`` or ``RoleLeaderChanged`` -events, but there are several corner cases to consider. Therefore, this specific use -case is made easily accessible by the :ref:`cluster-singleton` in the contrib module. -You can use it as is, or adjust to fit your specific needs. +This can be implemented by subscribing to member events, but there are several corner +cases to consider. Therefore, this specific use case is made easily accessible by the +:ref:`cluster-singleton` in the contrib module. You can use it as is, or adjust to fit +your specific needs. Distributed Publish Subscribe Pattern ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -473,9 +473,7 @@ delegates jobs to the ``StatsService``. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java#facade The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single -master. It listens to cluster events to lookup the ``StatsService`` on the leader node. The master runs -on the same node as the leader of the cluster members, which is nothing more than the address currently -sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves. +master. It listens to cluster events to lookup the ``StatsService`` on the oldest node. All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 6d423c155b..a1deb575f3 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -263,10 +263,10 @@ Cluster Singleton Pattern For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster. -This can be implemented by subscribing to ``LeaderChanged`` or ``RoleLeaderChanged`` -events, but there are several corner cases to consider. Therefore, this specific use -case is made easily accessible by the :ref:`cluster-singleton` in the contrib module. -You can use it as is, or adjust to fit your specific needs. +This can be implemented by subscribing to member events, but there are several corner +cases to consider. Therefore, this specific use case is made easily accessible by the +:ref:`cluster-singleton` in the contrib module. You can use it as is, or adjust to fit +your specific needs. Distributed Publish Subscribe Pattern ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -456,9 +456,7 @@ delegates jobs to the ``StatsService``. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single -master. It listens to cluster events to lookup the ``StatsService`` on the leader node. The master runs -on the same node as the leader of the cluster members, which is nothing more than the address currently -sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves. +master. It listens to cluster events to lookup the ``StatsService`` on the oldest node. All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java index 584c4269f8..c62e48d8e6 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java @@ -1,13 +1,21 @@ package sample.cluster.stats.japi; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + import sample.cluster.stats.japi.StatsMessages.JobFailed; import sample.cluster.stats.japi.StatsMessages.StatsJob; import akka.actor.ActorSelection; -import akka.actor.Address; import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.RoleLeaderChanged; +import akka.cluster.ClusterEvent.MemberEvent; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.MemberRemoved; +import akka.cluster.Member; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -15,15 +23,23 @@ import akka.event.LoggingAdapter; //#facade public class StatsFacade extends UntypedActor { - LoggingAdapter log = Logging.getLogger(getContext().system(), this); - Cluster cluster = Cluster.get(getContext().system()); + final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + final Cluster cluster = Cluster.get(getContext().system()); - ActorSelection currentMaster = null; + final Comparator ageComparator = new Comparator() { + public int compare(Member a, Member b) { + if (a.isOlderThan(b)) return -1; + else if (b.isOlderThan(a)) return 1; + else return 0; + } + }; + final SortedSet membersByAge = new TreeSet(ageComparator); - //subscribe to cluster changes, RoleLeaderChanged + + //subscribe to cluster changes @Override public void preStart() { - cluster.subscribe(getSelf(), RoleLeaderChanged.class); + cluster.subscribe(getSelf(), MemberEvent.class); } //re-subscribe when restart @@ -34,33 +50,41 @@ public class StatsFacade extends UntypedActor { @Override public void onReceive(Object message) { - if (message instanceof StatsJob && currentMaster == null) { + if (message instanceof StatsJob && membersByAge.isEmpty()) { getSender().tell(new JobFailed("Service unavailable, try again later"), getSelf()); } else if (message instanceof StatsJob) { - currentMaster.tell(message, getSender()); + currentMaster().tell(message, getSender()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; - setCurrentMaster(state.getRoleLeader("compute")); + List members = new ArrayList(); + for (Member m : state.getMembers()) { + if (m.hasRole("compute")) members.add(m); + } + membersByAge.clear(); + membersByAge.addAll(members); - } else if (message instanceof RoleLeaderChanged) { - RoleLeaderChanged leaderChanged = (RoleLeaderChanged) message; - if (leaderChanged.role().equals("compute")) - setCurrentMaster(leaderChanged.getLeader()); + } else if (message instanceof MemberUp) { + Member m = ((MemberUp) message).member(); + if (m.hasRole("compute")) membersByAge.add(m); + + } else if (message instanceof MemberRemoved) { + Member m = ((MemberUp) message).member(); + if (m.hasRole("compute")) membersByAge.remove(m); + + } else if (message instanceof MemberEvent) { + // not interesting } else { unhandled(message); } } - void setCurrentMaster(Address address) { - if (address == null) - currentMaster = null; - else - currentMaster = getContext().actorSelection(address + - "/user/singleton/statsService"); + ActorSelection currentMaster() { + return getContext().actorSelection(membersByAge.first().address() + + "/user/singleton/statsService"); } } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index a248b0a4c2..bfa8cbae09 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -2,6 +2,7 @@ package sample.cluster.stats //#imports import language.postfixOps +import scala.collection.immutable import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -19,6 +20,7 @@ import akka.actor.RootActorPath import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus +import akka.cluster.Member import akka.contrib.pattern.ClusterSingletonManager import akka.routing.FromConfig import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope @@ -91,29 +93,32 @@ class StatsFacade extends Actor with ActorLogging { import context.dispatcher val cluster = Cluster(context.system) - var currentMaster: Option[ActorSelection] = None + // sort by age, oldest first + val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) - // subscribe to cluster changes, RoleLeaderChanged + // subscribe to cluster changes // re-subscribe when restart - override def preStart(): Unit = cluster.subscribe(self, classOf[RoleLeaderChanged]) + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = cluster.unsubscribe(self) def receive = { - case job: StatsJob if currentMaster.isEmpty ⇒ + case job: StatsJob if membersByAge.isEmpty ⇒ sender ! JobFailed("Service unavailable, try again later") case job: StatsJob ⇒ - currentMaster foreach { _.tell(job, sender) } + currentMaster.tell(job, sender) case state: CurrentClusterState ⇒ - setCurrentMaster(state.roleLeader("compute")) - case RoleLeaderChanged(role, leader) ⇒ - if (role == "compute") - setCurrentMaster(leader) + membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { + case m if m.hasRole("compute") ⇒ m + } + case MemberUp(m) ⇒ if (m.hasRole("compute")) membersByAge += m + case MemberRemoved(m) ⇒ if (m.hasRole("compute")) membersByAge -= m + case _: MemberEvent ⇒ // not interesting } - def setCurrentMaster(address: Option[Address]): Unit = { - currentMaster = address.map(a ⇒ context.actorSelection(RootActorPath(a) / - "user" / "singleton" / "statsService")) - } + def currentMaster: ActorSelection = + context.actorSelection(RootActorPath(membersByAge.head.address) / + "user" / "singleton" / "statsService") } //#facade