diff --git a/akka-contrib/docs/cluster-singleton.rst b/akka-contrib/docs/cluster-singleton.rst new file mode 100644 index 0000000000..943b72e9b4 --- /dev/null +++ b/akka-contrib/docs/cluster-singleton.rst @@ -0,0 +1,80 @@ +.. _cluster-singleton: + +Cluster Singleton Pattern +========================= + +For some use cases it is convenient and sometimes also mandatory to ensure that +you have exactly one actor of a certain type running somewhere in the cluster. + +Some examples: + +* single point of responsibility for certain cluster-wide consistent decisions, or + coordination of actions across the cluster system +* single entry point to an external system +* single master, many workers +* centralized naming service, or routing logic + +Using a singleton should not be the first design choice. It has several drawbacks, +such as single-point of bottleneck. Single-point of failure is also a relevant concern, +but for some cases this feature takes care of that by making sure that another singleton +instance will eventually be started. + +The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``, +which is an actor that is supposed to be started on all nodes in the cluster. +The actual singleton actor is started by the ``ClusterSingletonManager`` on the +leader node of the cluster by creating a child actor from supplied ``Props``. +``ClusterSingletonManager`` makes sure that at most one singleton instance is +running at any point in time. + +The singleton actor is always running on the leader member, which is nothing more than +the address currently sorted first in the member ring. This can change when adding +or removing members. A graceful hand over can normally be performed when joining a new +node that becomes leader or removing current leader node. Be aware that there is a short +time period when there is no active singleton during the hand over process. + +The cluster failure detector will notice when a leader node becomes unreachable due to +things like JVM crash, hard shutdown, or network failure. Then a new leader node will +take over and a new singleton actor is created. For these failure scenarios there will +not be a graceful hand over, but more than one active singletons is prevented by all +reasonable means. Some corner cases are eventually resolved by configurable timeouts. + +You access the singleton actor with ``actorFor`` using the names you have specified when +creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` events +to keep track of which node it is supposed to be running on. Alternatively the singleton +actor may broadcast its existence when it is started. + +An Example +---------- + +Assume that we need one single entry point to an external system. An actor that +receives messages from a JMS queue with the strict requirement that only one +JMS consumer must exist to be make sure that the messages are processed in order. +That is perhaps not how one would like to design things, but a typical real-world +scenario when integrating with external systems. + +On each node in the cluster you need to start the ``ClusterSingletonManager`` and +supply the ``Props`` of the singleton actor, in this case the JMS queue consumer. + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#create-singleton-manager + +The corresponding Java API for the ``singeltonProps`` function is ``akka.contrib.pattern.ClusterSingletonPropsFactory``. + +Here we use an application specific ``terminationMessage`` to be able to close the +resources before actually stopping the singleton actor. Note that ``PoisonPill`` is a +perfectly fine ``terminationMessage`` if you only need to stop the actor. + +Here is how the singleton actor handles the ``terminationMessage`` in this example. + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#consumer-end + +Note that you can send back current state to the ``ClusterSingletonManager`` before terminating. +This message will be sent over to the ``ClusterSingletonManager`` at the new leader node and it +will be passed to the ``singletonProps`` factory when creating the new singleton instance. + +With the names given above the singleton actor can be looked up by subscribing to +``LeaderChanged`` cluster event and using ``actorFor``: + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-actorFor + + +.. note:: The singleton pattern will be simplified, perhaps provided out-of-the-box, when the cluster handles automatic actor partitioning. diff --git a/akka-contrib/docs/index.rst b/akka-contrib/docs/index.rst index f0d3245187..e6701d5f35 100644 --- a/akka-contrib/docs/index.rst +++ b/akka-contrib/docs/index.rst @@ -32,6 +32,7 @@ The Current List of Modules throttle jul peek-mailbox + cluster-singleton Suggested Way of Using these Contributions ------------------------------------------ diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala new file mode 100644 index 0000000000..cc4b989328 --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -0,0 +1,596 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.contrib.pattern + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.Actor.Receive +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.FSM +import akka.actor.Props +import akka.actor.Terminated +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.AkkaException + +object ClusterSingletonManager { + + /** + * Internal API + * public due to the `with FSM` type parameters + */ + sealed trait State + /** + * Internal API + * public due to the `with FSM` type parameters + */ + sealed trait Data + + /** + * Internal API + */ + private object Internal { + /** + * Sent from new leader to previous leader to initate the + * hand over process. `HandOverInProgress` and `HandOverDone` + * are expected replies. + */ + case object HandOverToMe + /** + * Confirmation by the previous leader that the hand + * over process, shutdown of the singleton actor, has + * started. + */ + case object HandOverInProgress + /** + * Confirmation by the previous leader that the singleton + * actor has been terminated and the hand over process is + * completed. The `handOverData` holds the message, if any, + * sent from the singleton actor to its parent ClusterSingletonManager + * when shutting down. It is passed to the `singletonProps` + * factory on the new leader node. + */ + case class HandOverDone(handOverData: Option[Any]) + /** + * Sent from from previous leader to new leader to + * initiate the normal hand over process. + * Especially useful when new node joins and becomes + * leader immediately, without knowing who was previous + * leader. + */ + case object TakeOverFromMe + + case class HandOverRetry(count: Int) + case class TakeOverRetry(leaderPeer: ActorRef, count: Int) + case object Cleanup + case object StartLeaderChangedBuffer + + case object Start extends State + case object Leader extends State + case object NonLeader extends State + case object BecomingLeader extends State + case object WasLeader extends State + case object HandingOver extends State + case object TakeOver extends State + + case object Uninitialized extends Data + case class NonLeaderData(leaderOption: Option[Address]) extends Data + case class BecomingLeaderData(previousLeaderOption: Option[Address]) extends Data + case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false, + handOverData: Option[Any] = None) extends Data + case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], + newLeader: Address) extends Data + case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data + + val HandOverRetryTimer = "hand-over-retry" + val TakeOverRetryTimer = "take-over-retry" + val CleanupTimer = "cleanup" + + object LeaderChangedBuffer { + /** + * Request to deliver one more event. + */ + case object GetNext + /** + * The first event, corresponding to CurrentClusterState. + */ + case class InitialLeaderState(leader: Option[Address], memberCount: Int) + } + + /** + * Notifications of [[akka.cluster.ClusterEvent.LeaderChanged]] is tunneled + * via this actor (child of ClusterSingletonManager) to be able to deliver + * one change at a time. Avoiding simultaneous leader changes simplifies + * the process in ClusterSingletonManager. ClusterSingletonManager requests + * next event with `GetNext` when it is ready for it. Only one outstanding + * `GetNext` request is allowed. Incoming events are buffered and delivered + * upon `GetNext` request. + */ + class LeaderChangedBuffer extends Actor { + import LeaderChangedBuffer._ + import context.dispatcher + + val cluster = Cluster(context.system) + var changes = Vector.empty[AnyRef] + var memberCount = 0 + + // subscribe to LeaderChanged, re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case state: CurrentClusterState ⇒ + changes :+= InitialLeaderState(state.leader, state.members.size) + case event: LeaderChanged ⇒ + changes :+= event + case GetNext if changes.isEmpty ⇒ + context.become(deliverNext, discardOld = false) + case GetNext ⇒ + val event = changes.head + changes = changes.tail + context.parent ! event + } + + // the buffer was empty when GetNext was received, deliver next event immediately + def deliverNext: Actor.Receive = { + case state: CurrentClusterState ⇒ + context.parent ! InitialLeaderState(state.leader, state.members.size) + context.unbecome() + case event: LeaderChanged ⇒ + context.parent ! event + context.unbecome() + } + + } + + } +} + +/** + * Java API. Factory for the [[akka.actor.Props]] of the singleton + * actor instance. Used in constructor of + * [[akka.contrib.pattern.ClusterSingletonManager]] + */ +@SerialVersionUID(1L) +trait ClusterSingletonPropsFactory extends Serializable { + /** + * Create the `Props` from the `handOverData` sent from + * previous singleton. `handOverData` might be null + * when no hand over took place, or when the there is no need + * for sending data to the new singleton. + */ + def create(handOverData: Any): Props +} + +/** + * Thrown when a consistent state can't be determined within the + * defined retry limits. Eventually it will reach a stable state and + * can continue, and that is simplified by starting over with a clean + * state. Parent supervisor should typically restart the actor, i.e. + * default decision. + */ +class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(message, null) + +/** + * Manages a cluster wide singleton actor instance, i.e. + * at most one singleton instance is running at any point in time. + * The ClusterSingletonManager is supposed to be started on all + * nodes in the cluster with `actorOf`. The actual singleton is + * started on the leader node of the cluster by creating a child + * actor from the supplied `singletonProps`. + * + * The singleton actor is always running on the leader member, which is + * nothing more than the address currently sorted first in the member + * ring. This can change when adding or removing members. A graceful hand + * over can normally be performed when joining a new node that becomes + * leader or removing current leader node. Be aware that there is a + * short time period when there is no active singleton during the + * hand over process. + * + * The singleton actor can at any time send a message to its parent + * ClusterSingletonManager and this message will be passed to the + * `singletonProps` factory on the new leader node when a graceful + * hand over is performed. + * + * The cluster failure detector will notice when a leader node + * becomes unreachable due to things like JVM crash, hard shutdown, + * or network failure. Then a new leader node will take over and a + * new singleton actor is created. For these failure scenarios there + * will not be a graceful hand over, but more than one active singletons + * is prevented by all reasonable means. Some corner cases are eventually + * resolved by configurable timeouts. + * + * You access the singleton actor with `actorFor` using the names you have + * specified when creating the ClusterSingletonManager. You can subscribe to + * [[akka.cluster.ClusterEvent.LeaderChanged]] to keep track of which node + * it is supposed to be running on. Alternatively the singleton actor may + * broadcast its existence when it is started. + * + * ==Arguments== + * + * '''''singletonProps''''' Factory for [[akka.actor.Props]] of the + * singleton actor instance. The `Option` parameter is the the + * `handOverData` sent from previous singleton. `handOverData` + * might be None when no hand over took place, or when the there + * is no need for sending data to the new singleton. The `handOverData` + * is typically passed as parameter to the constructor of the + * singleton actor. + * + * '''''singletonName''''' The actor name of the child singleton actor. + * + * '''''terminationMessage''''' When handing over to a new leader node + * this `terminationMessage` is sent to the singleton actor to tell + * it to finish its work, close resources, and stop. It can sending + * a message back to the parent ClusterSingletonManager, which will + * passed to the `singletonProps` factory on the new leader node. + * The hand over to the new leader node is completed when the + * singleton actor is terminated. + * Note that [[akka.actor.PoisonPill]] is a perfectly fine + * `terminationMessage` if you only need to stop the actor. + * + * '''''maxHandOverRetries''''' When a node is becoming leader it sends + * hand over request to previous leader. This is retried with the + * `retryInterval` until the previous leader confirms that the hand + * over has started, or this `maxHandOverRetries` limit has been + * reached. If the retry limit is reached it takes the decision to be + * the new leader if previous leader is unknown (typically removed or + * downed), otherwise it initiates a new round by throwing + * [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting + * restart with fresh state. For a cluster with many members you might + * need to increase this retry limit because it takes longer time to + * propagate changes across all nodes. + * + * '''''maxTakeOverRetries''''' When a leader node is not leader any more + * it sends take over request to the new leader to initiate the normal + * hand over process. This is especially useful when new node joins and becomes + * leader immediately, without knowing who was previous leader. This is retried + * with the `retryInterval` until this retry limit has been reached. If the retry + * limit is reached it initiates a new round by throwing + * [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting + * restart with fresh state. This will also cause the singleton actor to be + * stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to + * ensure that new leader doesn't start singleton actor before previous is + * stopped for certain corner cases. + * + * '''''loggingEnabled''''' Logging of what is going on at info log level. + */ +class ClusterSingletonManager( + singletonProps: Option[Any] ⇒ Props, + singletonName: String, + terminationMessage: Any, + maxHandOverRetries: Int = 20, + maxTakeOverRetries: Int = 15, + retryInterval: FiniteDuration = 1.second, + loggingEnabled: Boolean = true) + extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] { + + // to ensure that new leader doesn't start singleton actor before previous is stopped for certain corner cases + require(maxTakeOverRetries < maxHandOverRetries, + s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]") + + /** + * Full Java API constructor. + */ + def this( + singletonName: String, + terminationMessage: Any, + maxHandOverRetries: Int, + maxTakeOverRetries: Int, + retryInterval: FiniteDuration, + loggingEnabled: Boolean, + singletonPropsFactory: ClusterSingletonPropsFactory) = + this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage, + maxHandOverRetries, maxTakeOverRetries, retryInterval) + + /** + * Java API constructor with default values. + */ + def this( + singletonName: String, + terminationMessage: Any, + singletonPropsFactory: ClusterSingletonPropsFactory) = + this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage) + + import ClusterSingletonManager._ + import ClusterSingletonManager.Internal._ + import ClusterSingletonManager.Internal.LeaderChangedBuffer._ + + val cluster = Cluster(context.system) + val selfAddressOption = Some(cluster.selfAddress) + // started when when self member is Up + var leaderChangedBuffer: ActorRef = _ + // Previous GetNext request delivered event and new GetNext is to be sent + var leaderChangedReceived = true + + // keep track of previously downed members + var downed = Map.empty[Address, Deadline] + // keep track of previously removed members + var removed = Map.empty[Address, Deadline] + + def addDowned(address: Address): Unit = + downed += address -> (Deadline.now + 15.minutes) + + def addRemoved(address: Address): Unit = + removed += address -> (Deadline.now + 15.minutes) + + def cleanupOverdueNotMemberAnyMore(): Unit = { + downed = downed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } + removed = removed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } + } + + def logInfo(message: String): Unit = + if (loggingEnabled) log.info(message) + + def logInfo(template: String, arg1: Any): Unit = + if (loggingEnabled) log.info(template, arg1) + + def logInfo(template: String, arg1: Any, arg2: Any): Unit = + if (loggingEnabled) log.info(template, arg1, arg2) + + override def preStart(): Unit = { + super.preStart() + require(!cluster.isTerminated, "Cluster node must not be terminated") + + // subscribe to cluster changes, re-subscribe when restart + cluster.subscribe(self, classOf[MemberDowned]) + cluster.subscribe(self, classOf[MemberRemoved]) + + setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) + + // defer subscription to LeaderChanged to avoid some jitter when + // starting/joining several nodes at the same time + cluster.registerOnMemberUp(self ! StartLeaderChangedBuffer) + } + + override def postStop(): Unit = { + cancelTimer(CleanupTimer) + cluster.unsubscribe(self) + super.postStop() + } + + def peer(at: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(at)) + + def getNextLeaderChanged(): Unit = + if (leaderChangedReceived) { + leaderChangedReceived = false + leaderChangedBuffer ! GetNext + } + + startWith(Start, Uninitialized) + + when(Start) { + case Event(StartLeaderChangedBuffer, _) ⇒ + leaderChangedBuffer = context.actorOf(Props[LeaderChangedBuffer].withDispatcher(context.props.dispatcher)) + getNextLeaderChanged() + stay + + case Event(InitialLeaderState(leaderOption, memberCount), _) ⇒ + leaderChangedReceived = true + if (leaderOption == selfAddressOption && memberCount == 1) + // alone, leader immediately + gotoLeader(None) + else if (leaderOption == selfAddressOption) + goto(BecomingLeader) using BecomingLeaderData(None) + else + goto(NonLeader) using NonLeaderData(leaderOption) + } + + when(NonLeader) { + case Event(LeaderChanged(leaderOption), NonLeaderData(previousLeaderOption)) ⇒ + leaderChangedReceived = true + if (leaderOption == selfAddressOption) { + logInfo("NonLeader observed LeaderChanged: [{} -> myself]", previousLeaderOption) + previousLeaderOption match { + case None ⇒ gotoLeader(None) + case Some(prev) if downed.contains(prev) ⇒ gotoLeader(None) + case Some(prev) ⇒ + peer(prev) ! HandOverToMe + goto(BecomingLeader) using BecomingLeaderData(previousLeaderOption) + } + } else { + logInfo("NonLeader observed LeaderChanged: [{} -> {}]", previousLeaderOption, leaderOption) + getNextLeaderChanged() + stay using NonLeaderData(leaderOption) + } + + case Event(MemberDowned(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ + logInfo("Previous leader downed [{}]", m.address) + addDowned(m.address) + // transition when LeaderChanged + stay using NonLeaderData(None) + + case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒ + logInfo("Self removed, stopping ClusterSingletonManager") + stop() + + } + + when(BecomingLeader) { + + case Event(HandOverInProgress, _) ⇒ + // confirmation that the hand over process has started + logInfo("Hand over in progress at [{}]", sender.path.address) + cancelTimer(HandOverRetryTimer) + stay + + case Event(HandOverDone(handOverData), BecomingLeaderData(Some(previousLeader))) ⇒ + if (sender.path.address == previousLeader) + gotoLeader(handOverData) + else { + logInfo("Ignoring HandOverDone in BecomingLeader from [{}]. Expected previous leader [{}]", + sender.path.address, previousLeader) + stay + } + + case Event(MemberDowned(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ + logInfo("Previous leader [{}] downed", previousLeader) + addDowned(m.address) + gotoLeader(None) + + case Event(TakeOverFromMe, BecomingLeaderData(None)) ⇒ + sender ! HandOverToMe + stay using BecomingLeaderData(Some(sender.path.address)) + + case Event(TakeOverFromMe, BecomingLeaderData(Some(previousLeader))) ⇒ + if (previousLeader == sender.path.address) sender ! HandOverToMe + else logInfo("Ignoring TakeOver request in BecomingLeader from [{}]. Expected previous leader [{}]", + sender.path.address, previousLeader) + stay + + case Event(HandOverRetry(count), BecomingLeaderData(previousLeaderOption)) ⇒ + if (count <= maxHandOverRetries) { + logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption) + previousLeaderOption foreach { peer(_) ! HandOverToMe } + setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false) + } else if (previousLeaderOption.isEmpty) { + // can't send HandOverToMe, previousLeader unknown for new node (or restart) + // previous leader might be down or removed, so no TakeOverFromMe message is received + logInfo("Timeout in BecomingLeader. Previous leader unknown and no TakeOver request.") + gotoLeader(None) + } else + throw new ClusterSingletonManagerIsStuck( + s"Becoming singleton leader was stuck because previous leader [${previousLeaderOption}] is unresponsive") + + } + + def gotoLeader(handOverData: Option[Any]): State = { + logInfo("Singleton manager [{}] starting singleton actor", cluster.selfAddress) + val singleton = context watch context.actorOf(singletonProps(handOverData), singletonName) + goto(Leader) using LeaderData(singleton) + } + + when(Leader) { + case Event(LeaderChanged(leaderOption), LeaderData(singleton, singletonTerminated, handOverData)) ⇒ + leaderChangedReceived = true + logInfo("Leader observed LeaderChanged: [{} -> {}]", cluster.selfAddress, leaderOption) + leaderOption match { + case Some(a) if a == cluster.selfAddress ⇒ + // already leader + stay + case Some(a) if downed.contains(a) || removed.contains(a) ⇒ + gotoHandingOver(singleton, singletonTerminated, handOverData, None) + case Some(a) ⇒ + // send TakeOver request in case the new leader doesn't know previous leader + val leaderPeer = peer(a) + leaderPeer ! TakeOverFromMe + setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false) + goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a) + case _ ⇒ + // new leader will initiate the hand over + stay + } + + case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData)) ⇒ + gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) + + case Event(singletonHandOverMessage, d @ LeaderData(singleton, _, _)) if sender == singleton ⇒ + stay using d.copy(handOverData = Some(singletonHandOverMessage)) + + case Event(Terminated(ref), d @ LeaderData(singleton, _, _)) if ref == singleton ⇒ + stay using d.copy(singletonTerminated = true) + } + + when(WasLeader) { + case Event(TakeOverRetry(leaderPeer, count), _) ⇒ + val newLeader = leaderPeer.path.address + if (count <= maxTakeOverRetries) { + logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader) + leaderPeer ! TakeOverFromMe + setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false) + stay + } else + throw new ClusterSingletonManagerIsStuck(s"Expected hand over to [${newLeader}] never occured") + + case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ + gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) + + case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader ⇒ + addDowned(m.address) + gotoHandingOver(singleton, singletonTerminated, handOverData, None) + + case Event(singletonHandOverMessage, d @ WasLeaderData(singleton, _, _, _)) if sender == singleton ⇒ + stay using d.copy(handOverData = Some(singletonHandOverMessage)) + + case Event(Terminated(ref), d @ WasLeaderData(singleton, _, _, _)) if ref == singleton ⇒ + stay using d.copy(singletonTerminated = true) + + } + + def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], handOverTo: Option[ActorRef]): State = { + if (singletonTerminated) { + handOverDone(handOverTo, handOverData) + } else { + handOverTo foreach { _ ! HandOverInProgress } + singleton ! terminationMessage + goto(HandingOver) using HandingOverData(singleton, handOverTo, handOverData) + } + } + + when(HandingOver) { + case (Event(Terminated(ref), HandingOverData(singleton, handOverTo, handOverData))) if ref == singleton ⇒ + handOverDone(handOverTo, handOverData) + + case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo, _)) if handOverTo == Some(sender) ⇒ + // retry + sender ! HandOverInProgress + stay + + case Event(singletonHandOverMessage, d @ HandingOverData(singleton, _, _)) if sender == singleton ⇒ + stay using d.copy(handOverData = Some(singletonHandOverMessage)) + + } + + def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = { + val newLeader = handOverTo.map(_.path.address) + logInfo("Singleton terminated, hand over done [{} -> {}]", cluster.selfAddress, newLeader) + handOverTo foreach { _ ! HandOverDone(handOverData) } + goto(NonLeader) using NonLeaderData(newLeader) + } + + whenUnhandled { + case Event(_: CurrentClusterState, _) ⇒ stay + case Event(MemberRemoved(m), _) ⇒ + logInfo("Member removed [{}]", m.address) + // if self removed, it will be stopped onTranstion to NonLeader + addRemoved(m.address) + stay + case Event(MemberDowned(m), _) ⇒ + logInfo("Member downed [{}]", m.address) + addDowned(m.address) + stay + case Event(TakeOverFromMe, _) ⇒ + logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender.path.address) + stay + case Event(Cleanup, _) ⇒ + cleanupOverdueNotMemberAnyMore() + stay + } + + onTransition { + case from -> to ⇒ logInfo("ClusterSingletonManager state change [{} -> {}]", from, to) + } + + onTransition { + case _ -> BecomingLeader ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false) + } + + onTransition { + case BecomingLeader -> _ ⇒ cancelTimer(HandOverRetryTimer) + case WasLeader -> _ ⇒ cancelTimer(TakeOverRetryTimer) + } + + onTransition { + case _ -> (NonLeader | Leader) ⇒ getNextLeaderChanged() + } + + onTransition { + case _ -> NonLeader if removed.contains(cluster.selfAddress) || downed.contains(cluster.selfAddress) ⇒ + logInfo("Self removed, stopping ClusterSingletonManager") + stop() + } + +} \ No newline at end of file diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala new file mode 100644 index 0000000000..83f9d4de92 --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -0,0 +1,326 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.contrib.pattern + +import language.postfixOps +import scala.collection.immutable +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.testkit.ImplicitSender +import akka.testkit.TestEvent._ +import akka.actor.Terminated + +object ClusterSingletonManagerSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + val sixth = role("sixth") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + akka.cluster.auto-down = on + """)) + + testTransport(on = true) + + object PointToPointChannel { + case object RegisterConsumer + case object UnregisterConsumer + case object RegistrationOk + case object UnexpectedRegistration + case object UnregistrationOk + case object UnexpectedUnregistration + case object Reset + case object ResetOk + } + + /** + * This channel is extremly strict with regards to + * registration and unregistration of consumer to + * be able to detect misbehaviour (e.g. two active + * singleton instances). + */ + class PointToPointChannel extends Actor with ActorLogging { + import PointToPointChannel._ + + def receive = idle + + def idle: Receive = { + case RegisterConsumer ⇒ + log.info("RegisterConsumer: [{}]", sender.path) + sender ! RegistrationOk + context.become(active(sender)) + case UnregisterConsumer ⇒ + log.info("UnexpectedUnregistration: [{}]", sender.path) + sender ! UnexpectedUnregistration + context stop self + case Reset ⇒ sender ! ResetOk + case msg ⇒ // no consumer, drop + } + + def active(consumer: ActorRef): Receive = { + case UnregisterConsumer if sender == consumer ⇒ + log.info("UnregistrationOk: [{}]", sender.path) + sender ! UnregistrationOk + context.become(idle) + case UnregisterConsumer ⇒ + log.info("UnexpectedUnregistration: [{}], expected [{}]", sender.path, consumer.path) + sender ! UnexpectedUnregistration + context stop self + case RegisterConsumer ⇒ + log.info("Unexpected RegisterConsumer [{}], active consumer [{}]", sender.path, consumer.path) + sender ! UnexpectedRegistration + context stop self + case Reset ⇒ + context.become(idle) + sender ! ResetOk + case msg ⇒ consumer ! msg + } + } + + object Consumer { + case object End + case object GetCurrent + } + + /** + * The Singleton actor + */ + class Consumer(handOverData: Option[Any], queue: ActorRef, delegateTo: ActorRef) extends Actor { + import Consumer._ + import PointToPointChannel._ + + var current: Int = handOverData match { + case Some(x: Int) ⇒ x + case Some(x) ⇒ throw new IllegalArgumentException(s"handOverData must be an Int, got [${x}]") + case None ⇒ 0 + } + + override def preStart(): Unit = queue ! RegisterConsumer + + def receive = { + case n: Int if n <= current ⇒ + context.stop(self) + case n: Int ⇒ + current = n + delegateTo ! n + case x @ (RegistrationOk | UnexpectedRegistration) ⇒ + delegateTo ! x + case GetCurrent ⇒ + sender ! current + //#consumer-end + case End ⇒ + queue ! UnregisterConsumer + case UnregistrationOk ⇒ + // reply to ClusterSingletonManager with hand over data, + // which will be passed as parameter to new leader consumer + context.parent ! current + context stop self + //#consumer-end + } + } + +} + +class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode2 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode3 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec + +class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterSingletonManagerSpec._ + import ClusterSingletonManagerSpec.PointToPointChannel._ + import ClusterSingletonManagerSpec.Consumer._ + + override def initialParticipants = roles.size + + // Sort the roles in the order used by the cluster. + lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = { + implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { + import Member.addressOrdering + def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) + } + roles.filterNot(_ == controller).toVector.sorted + } + + def queue: ActorRef = system.actorFor(node(controller) / "user" / "queue") + + def createSingleton(): ActorRef = { + //#create-singleton-manager + system.actorOf(Props(new ClusterSingletonManager( + singletonProps = handOverData ⇒ + Props(new Consumer(handOverData, queue, testActor)), + singletonName = "consumer", + terminationMessage = End)), + name = "singleton") + //#create-singleton-manager + } + + def consumer(leader: RoleName): ActorRef = { + // the reason for this complicated way of creating the path is to illustrate + // in documentation how it's typically done in user land + LeaderChanged(Some(node(leader).address)) match { + //#singleton-actorFor + case LeaderChanged(Some(leaderAddress)) ⇒ + val path = RootActorPath(leaderAddress) / "user" / "singleton" / "consumer" + val consumer = system.actorFor(path) + //#singleton-actorFor + consumer + } + } + + def verify(leader: RoleName, msg: Int, expectedCurrent: Int): Unit = { + enterBarrier("before-" + leader.name + "-verified") + runOn(leader) { + expectMsg(RegistrationOk) + consumer(leader) ! GetCurrent + expectMsg(expectedCurrent) + } + enterBarrier(leader.name + "-active") + + runOn(controller) { + queue ! msg + // make sure it's not terminated, which would be wrong + expectNoMsg(1 second) + } + runOn(leader) { + expectMsg(msg) + } + runOn(sortedClusterRoles.filterNot(_ == leader): _*) { + expectNoMsg(1 second) + } + enterBarrier(leader.name + "-verified") + } + + def crash(roles: RoleName*): Unit = { + runOn(controller) { + queue ! Reset + expectMsg(ResetOk) + roles foreach { r ⇒ + log.info("Shutdown [{}]", node(r).address) + testConductor.shutdown(r, 0).await + } + } + } + + "A ClusterSingletonManager" must { + + "startup in single member cluster" in within(10 seconds) { + log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", ")) + + runOn(controller) { + // watch that it is not terminated, which would indicate misbehaviour + watch(system.actorOf(Props[PointToPointChannel], "queue")) + } + enterBarrier("queue-started") + + runOn(sortedClusterRoles(5)) { + Cluster(system) join node(sortedClusterRoles(5)).address + createSingleton() + } + + verify(sortedClusterRoles.last, msg = 1, expectedCurrent = 0) + } + + "hand over when new leader joins to 1 node cluster" in within(15 seconds) { + val newLeaderRole = sortedClusterRoles(4) + runOn(newLeaderRole) { + Cluster(system) join node(sortedClusterRoles.last).address + createSingleton() + } + + verify(newLeaderRole, msg = 2, expectedCurrent = 1) + } + + "hand over when new leader joins to 2 nodes cluster" in within(15 seconds) { + val newLeaderRole = sortedClusterRoles(3) + runOn(newLeaderRole) { + Cluster(system) join node(sortedClusterRoles.last).address + createSingleton() + } + + verify(newLeaderRole, msg = 3, expectedCurrent = 2) + } + + "hand over when adding three new potential leaders to 3 nodes cluster" in within(30 seconds) { + runOn(sortedClusterRoles(2)) { + Cluster(system) join node(sortedClusterRoles(3)).address + createSingleton() + } + runOn(sortedClusterRoles(1)) { + Cluster(system) join node(sortedClusterRoles(4)).address + createSingleton() + } + runOn(sortedClusterRoles(0)) { + Cluster(system) join node(sortedClusterRoles(5)).address + createSingleton() + } + + verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3) + } + + "hand over when leader leaves in 6 nodes cluster " in within(20 seconds) { + val leaveRole = sortedClusterRoles(0) + val newLeaderRole = sortedClusterRoles(1) + + runOn(leaveRole) { + Cluster(system) leave node(leaveRole).address + } + + verify(newLeaderRole, msg = 5, expectedCurrent = 4) + + runOn(leaveRole) { + val singleton = system.actorFor("/user/singleton") + watch(singleton) + expectMsgType[Terminated].actor must be(singleton) + } + + enterBarrier("after-leave") + } + + "take over when leader crashes in 5 nodes cluster" in within(35 seconds) { + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*"))) + system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Disassociated.*"))) + system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Association failed.*"))) + enterBarrier("logs-muted") + + crash(sortedClusterRoles(1)) + verify(sortedClusterRoles(2), msg = 6, expectedCurrent = 0) + } + + "take over when two leaders crash in 3 nodes cluster" in within(45 seconds) { + crash(sortedClusterRoles(2), sortedClusterRoles(3)) + verify(sortedClusterRoles(4), msg = 7, expectedCurrent = 0) + } + + "take over when leader crashes in 2 nodes cluster" in within(25 seconds) { + crash(sortedClusterRoles(4)) + verify(sortedClusterRoles(5), msg = 6, expectedCurrent = 0) + } + + } +} diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala index 934d437ed7..9ddf5f4cde 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala @@ -33,7 +33,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod import ReliableProxySpec._ import ReliableProxy._ - override def initialParticipants = 2 + override def initialParticipants = roles.size override def afterEach { runOn(local) { diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index c9c158cc03..518b371f38 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -258,6 +258,17 @@ has at least the defined number of members. This callback can be used for other things than starting actors. +Cluster Singleton Pattern +^^^^^^^^^^^^^^^^^^^^^^^^^ + +For some use cases it is convenient and sometimes also mandatory to ensure that +you have exactly one actor of a certain type running somewhere in the cluster. + +This can be implemented by subscribing to ``LeaderChanged`` events, but there are +several corner cases to consider. Therefore, this specific use case is made easily +accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is, +or adjust to fit your specific needs. + Failure Detector ^^^^^^^^^^^^^^^^ @@ -351,8 +362,8 @@ The same type of router could also have been defined in code: See :ref:`cluster_configuration_java` section for further descriptions of the settings. -Router Example --------------- +Router Example with Remote Deployed Routees +------------------------------------------- Let's take a look at how to use cluster aware routers. @@ -416,25 +427,35 @@ service nodes and 1 client:: -Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" +Router Example with Lookup of Routees +------------------------------------- + The above setup is nice for this example, but we will also take a look at how to use a single master node that creates and deploys workers. To keep track of a single -master we need one additional actor: +master we use the :ref:`cluster-singleton` in the contrib module. The ``ClusterSingletonManager`` +is started on each node. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#create-singleton-manager + +We also need an actor on each node that keeps track of where current single master exists and +delegates jobs to the ``StatsService``. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java#facade The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single -master. It listens to cluster events to create or lookup the ``StatsService`` depending on if -it is on the same same node or on another node. We run the master on the same node as the leader of -the cluster members, which is nothing more than the address currently sorted first in the member ring, -i.e. it can change when new nodes join or when current leader leaves. +master. It listens to cluster events to lookup the ``StatsService`` on the leader node. The master runs +on the same node as the leader of the cluster members, which is nothing more than the address currently +sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves. -All nodes start ``StatsFacade`` and the router is now configured like this: +All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#start-router-deploy This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the `source <@github@/akka-samples/akka-sample-cluster>`_ to your -maven project, defined as in :ref:`cluster_simple_example_java`. +maven project, defined as in :ref:`cluster_simple_example_java`. Also add the `akka-contrib` dependency +to your pom.xml. + Run it by starting nodes in different terminal windows. For example, starting 3 service nodes and 1 client:: @@ -453,7 +474,7 @@ service nodes and 1 client:: -Dexec.mainClass="sample.cluster.stats.japi.StatsSampleOneMasterMain" -.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. +.. note:: The above example will be simplified when the cluster handles automatic actor partitioning. Cluster Metrics ^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index 717d084d96..70d96da22d 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -231,6 +231,17 @@ has at least the defined number of members. This callback can be used for other things than starting actors. +Cluster Singleton Pattern +^^^^^^^^^^^^^^^^^^^^^^^^^ + +For some use cases it is convenient and sometimes also mandatory to ensure that +you have exactly one actor of a certain type running somewhere in the cluster. + +This can be implemented by subscribing to ``LeaderChanged`` events, but there are +several corner cases to consider. Therefore, this specific use case is made easily +accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is, +or adjust to fit your specific needs. + Failure Detector ^^^^^^^^^^^^^^^^ @@ -326,8 +337,8 @@ The same type of router could also have been defined in code: See :ref:`cluster_configuration_scala` section for further descriptions of the settings. -Router Example --------------- +Router Example with Remote Deployed Routees +------------------------------------------- Let's take a look at how to use cluster aware routers. @@ -384,19 +395,27 @@ service nodes and 1 client:: run-main sample.cluster.stats.StatsSample +Router Example with Lookup of Routees +------------------------------------- + The above setup is nice for this example, but we will also take a look at how to use a single master node that creates and deploys workers. To keep track of a single -master we need one additional actor: +master we use the :ref:`cluster-singleton` in the contrib module. The ``ClusterSingletonManager`` +is started on each node. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#create-singleton-manager + +We also need an actor on each node that keeps track of where current single master exists and +delegates jobs to the ``StatsService``. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single -master. It listens to cluster events to create or lookup the ``StatsService`` depending on if -it is on the same same node or on another node. We run the master on the same node as the leader of -the cluster members, which is nothing more than the address currently sorted first in the member ring, -i.e. it can change when new nodes join or when current leader leaves. +master. It listens to cluster events to lookup the ``StatsService`` on the leader node. The master runs +on the same node as the leader of the cluster members, which is nothing more than the address currently +sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves. -All nodes start ``StatsFacade`` and the router is now configured like this: +All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy @@ -413,7 +432,7 @@ service nodes and 1 client:: run-main sample.cluster.stats.StatsSampleOneMaster -.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. +.. note:: The above example will be simplified when the cluster handles automatic actor partitioning. Cluster Metrics diff --git a/akka-samples/akka-sample-cluster/pom.xml b/akka-samples/akka-sample-cluster/pom.xml index 78cdd9ca2b..22f43b33b8 100644 --- a/akka-samples/akka-sample-cluster/pom.xml +++ b/akka-samples/akka-sample-cluster/pom.xml @@ -15,8 +15,13 @@ com.typesafe.akka - akka-cluster-experimental_2.10.0-RC1 - 2.1-20121016-001042 + akka-cluster-experimental_2.10.0 + 2.2-20130122-001107 + + + com.typesafe.akka + akka-contrib_2.10.0 + 2.2-20130122-001107 diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java index 8acb6aad26..7989432a1d 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java @@ -10,7 +10,7 @@ public class FactorialBackendMain { // Override the configuration of the port // when specified as program argument if (args.length > 0) - System.setProperty("akka.remote.netty.port", args[0]); + System.setProperty("akka.remoting.transports.tcp.port", args[0]); ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial")); diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterApp.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterApp.java index b216e174fa..6d76dcfe5c 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterApp.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterApp.java @@ -12,7 +12,7 @@ public class SimpleClusterApp { // Override the configuration of the port // when specified as program argument if (args.length > 0) - System.setProperty("akka.remote.netty.port", args[0]); + System.setProperty("akka.remoting.transports.tcp.port", args[0]); // Create an Akka system ActorSystem system = ActorSystem.create("ClusterSystem"); diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java index 15a271027c..9b311835d3 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java @@ -5,13 +5,11 @@ import sample.cluster.stats.japi.StatsMessages.JobFailed; import sample.cluster.stats.japi.StatsMessages.StatsJob; import akka.actor.ActorRef; import akka.actor.Address; -import akka.actor.Props; import akka.actor.UntypedActor; import akka.dispatch.Recover; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.LeaderChanged; -import akka.cluster.ClusterEvent.MemberEvent; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.util.Timeout; @@ -25,8 +23,7 @@ public class StatsFacade extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); - ActorRef currentMaster = null; - boolean currentMasterCreatedByMe = false; + Address currentMaster = null; //subscribe to cluster changes, MemberEvent @Override @@ -43,54 +40,33 @@ public class StatsFacade extends UntypedActor { @Override public void onReceive(Object message) { if (message instanceof StatsJob && currentMaster == null) { - getSender() - .tell(new JobFailed("Service unavailable, try again later"), - getSelf()); + getSender().tell(new JobFailed("Service unavailable, try again later"), + getSelf()); } else if (message instanceof StatsJob) { StatsJob job = (StatsJob) message; - Future f = ask(currentMaster, job, new Timeout(5, SECONDS)). - recover(new Recover() { - public Object recover(Throwable t) { - return new JobFailed("Service unavailable, try again later"); - } - }, getContext().dispatcher()); + ActorRef service = getContext().actorFor(currentMaster + + "/user/singleton/statsService"); + Future f = ask(service, job, new Timeout(5, SECONDS)).recover( + new Recover() { + public Object recover(Throwable t) { + return new JobFailed("Service unavailable, try again later"); + } + }, getContext().dispatcher()); pipe(f, getContext().dispatcher()).to(getSender()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; - updateCurrentMaster(state.getLeader()); + currentMaster = state.getLeader(); } else if (message instanceof LeaderChanged) { LeaderChanged leaderChanged = (LeaderChanged) message; - updateCurrentMaster(leaderChanged.getLeader()); + currentMaster = leaderChanged.getLeader(); } else { unhandled(message); } } - void updateCurrentMaster(Address leaderAddress) { - if (leaderAddress == null) - return; - - if (leaderAddress.equals(cluster.selfAddress())) { - if (!currentMasterCreatedByMe) { - log.info("Creating new statsService master at [{}]", leaderAddress); - currentMaster = getContext().actorOf( - new Props(StatsService.class), "statsService"); - currentMasterCreatedByMe = true; - } - } else { - if (currentMasterCreatedByMe) { - getContext().stop(currentMaster); - } - log.info("Using statsService master at [{}]", leaderAddress); - currentMaster = getContext().actorFor( - getSelf().path().toStringWithAddress(leaderAddress) - + "/statsService"); - currentMasterCreatedByMe = false; - } - } } //#facade diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java index cf7387b6fd..d5f2a8c4c3 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java @@ -11,7 +11,7 @@ public class StatsSampleMain { // Override the configuration of the port // when specified as program argument if (args.length > 0) - System.setProperty("akka.remote.netty.port", args[0]); + System.setProperty("akka.remoting.transports.tcp.port", args[0]); //#start-router-lookup ActorSystem system = ActorSystem.create("ClusterSystem", diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java index 7971d357d8..3c0101fbb6 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java @@ -1,14 +1,32 @@ package sample.cluster.stats.japi; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; +import akka.contrib.pattern.ClusterSingletonManager; +import akka.contrib.pattern.ClusterSingletonPropsFactory; public class StatsSampleOneMasterClientMain { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("ClusterSystem"); + + // the client is also part of the cluster + system.actorOf(new Props(new UntypedActorFactory() { + @Override + public ClusterSingletonManager create() { + return new ClusterSingletonManager("statsService", PoisonPill.getInstance(), + new ClusterSingletonPropsFactory() { + @Override + public Props create(Object handOverData) { + return new Props(StatsService.class); + } + }); + } + }), "singleton"); + system.actorOf(new Props(new UntypedActorFactory() { @Override public UntypedActor create() { diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java index 9a85c58cb5..2a496770ae 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java @@ -1,7 +1,11 @@ package sample.cluster.stats.japi; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.UntypedActorFactory; +import akka.contrib.pattern.ClusterSingletonManager; +import akka.contrib.pattern.ClusterSingletonPropsFactory; import com.typesafe.config.ConfigFactory; @@ -11,13 +15,13 @@ public class StatsSampleOneMasterMain { // Override the configuration of the port // when specified as program argument if (args.length > 0) - System.setProperty("akka.remote.netty.port", args[0]); + System.setProperty("akka.remoting.transports.tcp.port", args[0]); //#start-router-deploy ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.parseString( "akka.actor.deployment { \n" + - " /statsFacade/statsService/workerRouter { \n" + + " /singleton/statsService/workerRouter { \n" + " router = consistent-hashing \n" + " nr-of-instances = 100 \n" + " cluster { \n" + @@ -28,10 +32,24 @@ public class StatsSampleOneMasterMain { " } \n" + "} \n") .withFallback(ConfigFactory.load())); - - system.actorOf(new Props(StatsFacade.class), "statsFacade"); //#start-router-deploy - } + //#create-singleton-manager + system.actorOf(new Props(new UntypedActorFactory() { + @Override + public ClusterSingletonManager create() { + return new ClusterSingletonManager("statsService", PoisonPill.getInstance(), + new ClusterSingletonPropsFactory() { + @Override + public Props create(Object handOverData) { + return new Props(StatsService.class); + } + }); + } + }), "singleton"); + //#create-singleton-manager + system.actorOf(new Props(StatsFacade.class), "statsFacade"); + + } } diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackendMain.java index 42fa8e6c80..4973b7a70b 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackendMain.java @@ -9,7 +9,7 @@ public class TransformationBackendMain { // Override the configuration of the port // when specified as program argument if (args.length > 0) - System.setProperty("akka.remote.netty.port", args[0]); + System.setProperty("akka.remoting.transports.tcp.port", args[0]); ActorSystem system = ActorSystem.create("ClusterSystem"); diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationFrontendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationFrontendMain.java index 741d9452be..1f5387a942 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationFrontendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationFrontendMain.java @@ -18,7 +18,7 @@ public class TransformationFrontendMain { // Override the configuration of the port // when specified as program argument if (args.length > 0) - System.setProperty("akka.remote.netty.port", args[0]); + System.setProperty("akka.remoting.transports.tcp.port", args[0]); ActorSystem system = ActorSystem.create("ClusterSystem"); diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf index 507191b79c..552c44fdbc 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/application.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -1,12 +1,11 @@ # //#cluster -akka { +akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } - remote { - transport = "akka.remote.netty.NettyRemoteTransport" + remoting { log-remote-lifecycle-events = off - netty { + transports.tcp { hostname = "127.0.0.1" port = 0 } @@ -14,8 +13,8 @@ akka { cluster { seed-nodes = [ - "akka://ClusterSystem@127.0.0.1:2551", - "akka://ClusterSystem@127.0.0.1:2552"] + "tcp.akka://ClusterSystem@127.0.0.1:2551", + "tcp.akka://ClusterSystem@127.0.0.1:2552"] auto-down = on } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala index d75dcb96e1..03a655209e 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala @@ -60,7 +60,7 @@ object FactorialBackend { def main(args: Array[String]): Unit = { // Override the configuration of the port // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0)) val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial")) system.actorOf(Props[FactorialBackend], name = "factorialBackend") diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala index 6bdec7cfba..c0a55a737d 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala @@ -10,7 +10,7 @@ object SimpleClusterApp { // Override the configuration of the port // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0)) // Create an Akka system val system = ActorSystem("ClusterSystem") diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index de7eff456c..0df220b567 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -10,6 +10,7 @@ import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Address +import akka.actor.PoisonPill import akka.actor.Props import akka.actor.ReceiveTimeout import akka.actor.RelativeActorPath @@ -17,6 +18,7 @@ import akka.actor.RootActorPath import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus +import akka.contrib.pattern.ClusterSingletonManager import akka.routing.FromConfig import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope import akka.pattern.ask @@ -91,8 +93,7 @@ class StatsFacade extends Actor with ActorLogging { import context.dispatcher val cluster = Cluster(context.system) - var currentMaster: Option[ActorRef] = None - var currentMasterCreatedByMe = false + var currentMaster: Option[Address] = None // subscribe to cluster changes, LeaderChanged // re-subscribe when restart @@ -104,33 +105,15 @@ class StatsFacade extends Actor with ActorLogging { sender ! JobFailed("Service unavailable, try again later") case job: StatsJob ⇒ implicit val timeout = Timeout(5.seconds) - currentMaster foreach { - _ ? job recover { + currentMaster foreach { address ⇒ + val service = context.actorFor(RootActorPath(address) / + "user" / "singleton" / "statsService") + service ? job recover { case _ ⇒ JobFailed("Service unavailable, try again later") } pipeTo sender } - case state: CurrentClusterState ⇒ - state.leader foreach updateCurrentMaster - case LeaderChanged(Some(leaderAddress)) ⇒ - updateCurrentMaster(leaderAddress) - } - - def updateCurrentMaster(leaderAddress: Address): Unit = { - if (leaderAddress == cluster.selfAddress) { - if (!currentMasterCreatedByMe) { - log.info("Creating new statsService master at [{}]", leaderAddress) - currentMaster = Some(context.actorOf(Props[StatsService], - name = "statsService")) - currentMasterCreatedByMe = true - } - } else { - if (currentMasterCreatedByMe) - currentMaster foreach { context.stop(_) } - log.info("Using statsService master at [{}]", leaderAddress) - currentMaster = Some(context.actorFor( - self.path.toStringWithAddress(leaderAddress) + "/statsService")) - currentMasterCreatedByMe = false - } + case state: CurrentClusterState ⇒ currentMaster = state.leader + case LeaderChanged(leader) ⇒ currentMaster = leader } } @@ -140,7 +123,7 @@ object StatsSample { def main(args: Array[String]): Unit = { // Override the configuration of the port // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0)) //#start-router-lookup val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" @@ -168,12 +151,12 @@ object StatsSampleOneMaster { def main(args: Array[String]): Unit = { // Override the configuration of the port // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0)) //#start-router-deploy val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" akka.actor.deployment { - /statsFacade/statsService/workerRouter { + /singleton/statsService/workerRouter { router = consistent-hashing nr-of-instances = 100 cluster { @@ -186,6 +169,11 @@ object StatsSampleOneMaster { """).withFallback(ConfigFactory.load())) //#start-router-deploy + //#create-singleton-manager + system.actorOf(Props(new ClusterSingletonManager( + singletonProps = _ ⇒ Props[StatsService], singletonName = "statsService", + terminationMessage = PoisonPill)), name = "singleton") + //#create-singleton-manager system.actorOf(Props[StatsFacade], name = "statsFacade") } } @@ -200,6 +188,12 @@ object StatsSampleClient { object StatsSampleOneMasterClient { def main(args: Array[String]): Unit = { val system = ActorSystem("ClusterSystem") + + // the client is also part of the cluster + system.actorOf(Props(new ClusterSingletonManager( + singletonProps = _ ⇒ Props[StatsService], singletonName = "statsService", + terminationMessage = PoisonPill)), name = "singleton") + system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client") } } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala index 4fc802a626..28c19e1bdc 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala @@ -30,7 +30,7 @@ object TransformationFrontend { def main(args: Array[String]): Unit = { // Override the configuration of the port // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0)) val system = ActorSystem("ClusterSystem") val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") @@ -77,7 +77,7 @@ object TransformationBackend { def main(args: Array[String]): Unit = { // Override the configuration of the port // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0)) val system = ActorSystem("ClusterSystem") system.actorOf(Props[TransformationBackend], name = "backend") diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index 5f1c9728a3..e03c631f2e 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -2,15 +2,14 @@ package sample.cluster.stats import language.postfixOps import scala.concurrent.duration._ - import com.typesafe.config.ConfigFactory - import org.scalatest.BeforeAndAfterAll import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers - +import akka.actor.PoisonPill import akka.actor.Props import akka.actor.RootActorPath +import akka.contrib.pattern.ClusterSingletonManager import akka.cluster.Cluster import akka.cluster.Member import akka.cluster.MemberStatus @@ -37,7 +36,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector #//#router-deploy-config akka.actor.deployment { - /statsFacade/statsService/workerRouter { + /singleton/statsService/workerRouter { router = consistent-hashing nr-of-instances = 100 cluster { @@ -81,13 +80,17 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing MemberUp(Member(node(third).address, MemberStatus.Up))) Cluster(system).unsubscribe(testActor) - + + system.actorOf(Props(new ClusterSingletonManager( + singletonProps = _ ⇒ Props[StatsService], singletonName = "statsService", + terminationMessage = PoisonPill)), name = "singleton") + system.actorOf(Props[StatsFacade], "statsFacade") testConductor.enter("all-up") } - "show usage of the statsFacade" in within(20 seconds) { + "show usage of the statsFacade" in within(40 seconds) { val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") // eventually the service should be ok, diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala index ca69c1ae6c..3393db1a5f 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala @@ -2,13 +2,11 @@ package sample.cluster.stats.japi import language.postfixOps import scala.concurrent.duration._ - import com.typesafe.config.ConfigFactory - import org.scalatest.BeforeAndAfterAll import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers - +import akka.actor.PoisonPill import akka.actor.Props import akka.actor.RootActorPath import akka.cluster.Cluster @@ -16,10 +14,12 @@ import akka.cluster.Member import akka.cluster.MemberStatus import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberUp +import akka.contrib.pattern.ClusterSingletonManager import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit.ImplicitSender import sample.cluster.stats.japi.StatsMessages._ +import akka.contrib.pattern.ClusterSingletonPropsFactory object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig { // register the named roles (nodes) of the test @@ -37,7 +37,7 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig { # don't use sigar for tests, native lib not in path akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector akka.actor.deployment { - /statsFacade/statsService/workerRouter { + /singleton/statsService/workerRouter { router = consistent-hashing nr-of-instances = 100 cluster { @@ -81,12 +81,19 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample Cluster(system).unsubscribe(testActor) + system.actorOf(Props(new ClusterSingletonManager( + singletonName = "statsService", + terminationMessage = PoisonPill, + singletonPropsFactory = new ClusterSingletonPropsFactory { + def create(handOverData: Any) = Props[StatsService] + })), name = "singleton") + system.actorOf(Props[StatsFacade], "statsFacade") testConductor.enter("all-up") } - "show usage of the statsFacade" in within(20 seconds) { + "show usage of the statsFacade" in within(40 seconds) { val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") // eventually the service should be ok, diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 23b11e14f8..3859afcece 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -356,7 +356,7 @@ object AkkaBuild extends Build { lazy val clusterSample = Project( id = "akka-sample-cluster-experimental", base = file("akka-samples/akka-sample-cluster"), - dependencies = Seq(cluster, remoteTests % "test", testkit % "test"), + dependencies = Seq(cluster, contrib, remoteTests % "test", testkit % "test"), settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq( // sigar is in Typesafe repo resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", @@ -419,7 +419,7 @@ object AkkaBuild extends Build { lazy val contrib = Project( id = "akka-contrib", base = file("akka-contrib"), - dependencies = Seq(remote, remoteTests % "compile;test->test"), + dependencies = Seq(remote, remoteTests % "compile;test->test", cluster), settings = defaultSettings ++ multiJvmSettings ++ Seq( libraryDependencies ++= Dependencies.contrib, testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),