From c48d9c058e499690ebb7b5563002b2b975da1424 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 Jan 2013 08:47:52 +0100 Subject: [PATCH] Clarifications of cluster singleton docs, see #2895 --- akka-contrib/docs/cluster-singleton.rst | 22 ++++++--- .../pattern/ClusterSingletonManager.scala | 36 +++++++-------- .../pattern/ClusterSingletonManagerSpec.scala | 45 +++++++++++++------ 3 files changed, 67 insertions(+), 36 deletions(-) diff --git a/akka-contrib/docs/cluster-singleton.rst b/akka-contrib/docs/cluster-singleton.rst index 943b72e9b4..47a372115b 100644 --- a/akka-contrib/docs/cluster-singleton.rst +++ b/akka-contrib/docs/cluster-singleton.rst @@ -33,9 +33,9 @@ node that becomes leader or removing current leader node. Be aware that there is time period when there is no active singleton during the hand over process. The cluster failure detector will notice when a leader node becomes unreachable due to -things like JVM crash, hard shutdown, or network failure. Then a new leader node will +things like JVM crash, hard shut down, or network failure. Then a new leader node will take over and a new singleton actor is created. For these failure scenarios there will -not be a graceful hand over, but more than one active singletons is prevented by all +not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts. You access the singleton actor with ``actorFor`` using the names you have specified when @@ -71,10 +71,22 @@ Note that you can send back current state to the ``ClusterSingletonManager`` bef This message will be sent over to the ``ClusterSingletonManager`` at the new leader node and it will be passed to the ``singletonProps`` factory when creating the new singleton instance. -With the names given above the singleton actor can be looked up by subscribing to -``LeaderChanged`` cluster event and using ``actorFor``: +With the names given above the path of singleton actor can be constructed by subscribing to +``LeaderChanged`` cluster event and the actor reference is then looked up using ``actorFor``: -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-actorFor +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy +Note that the hand-over might still be in progress and the singleton actor might not be started yet +when you receive the ``LeaderChanged`` event. + +To test scenarios where the cluster leader node is removed or shut down you can use :ref:`multi-node-testing` and +utilize the fact that the leader is supposed to be the first member when sorted by member address. + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#sort-cluster-roles + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#test-leave + +Also, make sure that you don't shut down the first role, which is running the test conductor controller. +Use a dedicated role for the controller, which is not a cluster member. .. note:: The singleton pattern will be simplified, perhaps provided out-of-the-box, when the cluster handles automatic actor partitioning. diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index cc4b989328..c641524644 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -36,19 +36,19 @@ object ClusterSingletonManager { private object Internal { /** * Sent from new leader to previous leader to initate the - * hand over process. `HandOverInProgress` and `HandOverDone` + * hand-over process. `HandOverInProgress` and `HandOverDone` * are expected replies. */ case object HandOverToMe /** * Confirmation by the previous leader that the hand - * over process, shutdown of the singleton actor, has + * over process, shut down of the singleton actor, has * started. */ case object HandOverInProgress /** * Confirmation by the previous leader that the singleton - * actor has been terminated and the hand over process is + * actor has been terminated and the hand-over process is * completed. The `handOverData` holds the message, if any, * sent from the singleton actor to its parent ClusterSingletonManager * when shutting down. It is passed to the `singletonProps` @@ -57,7 +57,7 @@ object ClusterSingletonManager { case class HandOverDone(handOverData: Option[Any]) /** * Sent from from previous leader to new leader to - * initiate the normal hand over process. + * initiate the normal hand-over process. * Especially useful when new node joins and becomes * leader immediately, without knowing who was previous * leader. @@ -160,7 +160,7 @@ trait ClusterSingletonPropsFactory extends Serializable { /** * Create the `Props` from the `handOverData` sent from * previous singleton. `handOverData` might be null - * when no hand over took place, or when the there is no need + * when no hand-over took place, or when the there is no need * for sending data to the new singleton. */ def create(handOverData: Any): Props @@ -189,18 +189,18 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * over can normally be performed when joining a new node that becomes * leader or removing current leader node. Be aware that there is a * short time period when there is no active singleton during the - * hand over process. + * hand-over process. * * The singleton actor can at any time send a message to its parent * ClusterSingletonManager and this message will be passed to the * `singletonProps` factory on the new leader node when a graceful - * hand over is performed. + * hand-over is performed. * * The cluster failure detector will notice when a leader node - * becomes unreachable due to things like JVM crash, hard shutdown, + * becomes unreachable due to things like JVM crash, hard shut down, * or network failure. Then a new leader node will take over and a * new singleton actor is created. For these failure scenarios there - * will not be a graceful hand over, but more than one active singletons + * will not be a graceful hand-over, but more than one active singletons * is prevented by all reasonable means. Some corner cases are eventually * resolved by configurable timeouts. * @@ -215,7 +215,7 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * '''''singletonProps''''' Factory for [[akka.actor.Props]] of the * singleton actor instance. The `Option` parameter is the the * `handOverData` sent from previous singleton. `handOverData` - * might be None when no hand over took place, or when the there + * might be None when no hand-over took place, or when the there * is no need for sending data to the new singleton. The `handOverData` * is typically passed as parameter to the constructor of the * singleton actor. @@ -227,13 +227,13 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * it to finish its work, close resources, and stop. It can sending * a message back to the parent ClusterSingletonManager, which will * passed to the `singletonProps` factory on the new leader node. - * The hand over to the new leader node is completed when the + * The hand-over to the new leader node is completed when the * singleton actor is terminated. * Note that [[akka.actor.PoisonPill]] is a perfectly fine * `terminationMessage` if you only need to stop the actor. * * '''''maxHandOverRetries''''' When a node is becoming leader it sends - * hand over request to previous leader. This is retried with the + * hand-over request to previous leader. This is retried with the * `retryInterval` until the previous leader confirms that the hand * over has started, or this `maxHandOverRetries` limit has been * reached. If the retry limit is reached it takes the decision to be @@ -246,7 +246,7 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * * '''''maxTakeOverRetries''''' When a leader node is not leader any more * it sends take over request to the new leader to initiate the normal - * hand over process. This is especially useful when new node joins and becomes + * hand-over process. This is especially useful when new node joins and becomes * leader immediately, without knowing who was previous leader. This is retried * with the `retryInterval` until this retry limit has been reached. If the retry * limit is reached it initiates a new round by throwing @@ -412,8 +412,8 @@ class ClusterSingletonManager( when(BecomingLeader) { case Event(HandOverInProgress, _) ⇒ - // confirmation that the hand over process has started - logInfo("Hand over in progress at [{}]", sender.path.address) + // confirmation that the hand-over process has started + logInfo("Hand-over in progress at [{}]", sender.path.address) cancelTimer(HandOverRetryTimer) stay @@ -480,7 +480,7 @@ class ClusterSingletonManager( setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false) goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a) case _ ⇒ - // new leader will initiate the hand over + // new leader will initiate the hand-over stay } @@ -503,7 +503,7 @@ class ClusterSingletonManager( setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false) stay } else - throw new ClusterSingletonManagerIsStuck(s"Expected hand over to [${newLeader}] never occured") + throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeader}] never occured") case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) @@ -546,7 +546,7 @@ class ClusterSingletonManager( def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = { val newLeader = handOverTo.map(_.path.address) - logInfo("Singleton terminated, hand over done [{} -> {}]", cluster.selfAddress, newLeader) + logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newLeader) handOverTo foreach { _ ! HandOverDone(handOverData) } goto(NonLeader) using NonLeaderData(newLeader) } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 83f9d4de92..672ca7037d 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -11,6 +11,7 @@ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef +import akka.actor.Address import akka.actor.Props import akka.actor.RootActorPath import akka.cluster.Cluster @@ -141,6 +142,29 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { } } + // documentation of how to keep track of the leader address in user land + //#singleton-proxy + class ConsumerProxy extends Actor { + // subscribe to LeaderChanged, re-subscribe when restart + override def preStart(): Unit = + Cluster(context.system).subscribe(self, classOf[LeaderChanged]) + override def postStop(): Unit = + Cluster(context.system).unsubscribe(self) + + var leaderAddress: Option[Address] = None + + def receive = { + case state: CurrentClusterState ⇒ leaderAddress = state.leader + case LeaderChanged(leader) ⇒ leaderAddress = leader + case other => consumer foreach { _ forward other } + } + + def consumer: Option[ActorRef] = + leaderAddress map (a => context.actorFor(RootActorPath(a) / + "user" / "singleton" / "consumer")) + } + //#singleton-proxy + } class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec @@ -158,14 +182,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS override def initialParticipants = roles.size + //#sort-cluster-roles // Sort the roles in the order used by the cluster. lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = { implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { import Member.addressOrdering - def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) + def compare(x: RoleName, y: RoleName) = + addressOrdering.compare(node(x).address, node(y).address) } roles.filterNot(_ == controller).toVector.sorted } + //#sort-cluster-roles def queue: ActorRef = system.actorFor(node(controller) / "user" / "queue") @@ -180,18 +207,8 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS //#create-singleton-manager } - def consumer(leader: RoleName): ActorRef = { - // the reason for this complicated way of creating the path is to illustrate - // in documentation how it's typically done in user land - LeaderChanged(Some(node(leader).address)) match { - //#singleton-actorFor - case LeaderChanged(Some(leaderAddress)) ⇒ - val path = RootActorPath(leaderAddress) / "user" / "singleton" / "consumer" - val consumer = system.actorFor(path) - //#singleton-actorFor - consumer - } - } + def consumer(leader: RoleName): ActorRef = + system.actorFor(RootActorPath(node(leader).address) / "user" / "singleton" / "consumer") def verify(leader: RoleName, msg: Int, expectedCurrent: Int): Unit = { enterBarrier("before-" + leader.name + "-verified") @@ -284,12 +301,14 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } "hand over when leader leaves in 6 nodes cluster " in within(20 seconds) { + //#test-leave val leaveRole = sortedClusterRoles(0) val newLeaderRole = sortedClusterRoles(1) runOn(leaveRole) { Cluster(system) leave node(leaveRole).address } + //#test-leave verify(newLeaderRole, msg = 5, expectedCurrent = 4)