Cluster singleton actor pattern, see #2895

* ClusterSingletonManager
* ClusterSingletonManagerSpec multi-node test
* Use in cluster router with single master sample
* Extensive logging to be able to understand what is
  going on
* Java api
* Add cluster dependency to contrib
* Add contrib dependency to sample
* Scaladoc
* rst docs in contrib area, ref from cluster docs
This commit is contained in:
Patrik Nordwall 2013-01-14 14:09:53 +01:00
parent 3031173370
commit 0c38a76c37
24 changed files with 1184 additions and 121 deletions

View file

@ -0,0 +1,80 @@
.. _cluster-singleton:
Cluster Singleton Pattern
=========================
For some use cases it is convenient and sometimes also mandatory to ensure that
you have exactly one actor of a certain type running somewhere in the cluster.
Some examples:
* single point of responsibility for certain cluster-wide consistent decisions, or
coordination of actions across the cluster system
* single entry point to an external system
* single master, many workers
* centralized naming service, or routing logic
Using a singleton should not be the first design choice. It has several drawbacks,
such as single-point of bottleneck. Single-point of failure is also a relevant concern,
but for some cases this feature takes care of that by making sure that another singleton
instance will eventually be started.
The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``,
which is an actor that is supposed to be started on all nodes in the cluster.
The actual singleton actor is started by the ``ClusterSingletonManager`` on the
leader node of the cluster by creating a child actor from supplied ``Props``.
``ClusterSingletonManager`` makes sure that at most one singleton instance is
running at any point in time.
The singleton actor is always running on the leader member, which is nothing more than
the address currently sorted first in the member ring. This can change when adding
or removing members. A graceful hand over can normally be performed when joining a new
node that becomes leader or removing current leader node. Be aware that there is a short
time period when there is no active singleton during the hand over process.
The cluster failure detector will notice when a leader node becomes unreachable due to
things like JVM crash, hard shutdown, or network failure. Then a new leader node will
take over and a new singleton actor is created. For these failure scenarios there will
not be a graceful hand over, but more than one active singletons is prevented by all
reasonable means. Some corner cases are eventually resolved by configurable timeouts.
You access the singleton actor with ``actorFor`` using the names you have specified when
creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` events
to keep track of which node it is supposed to be running on. Alternatively the singleton
actor may broadcast its existence when it is started.
An Example
----------
Assume that we need one single entry point to an external system. An actor that
receives messages from a JMS queue with the strict requirement that only one
JMS consumer must exist to be make sure that the messages are processed in order.
That is perhaps not how one would like to design things, but a typical real-world
scenario when integrating with external systems.
On each node in the cluster you need to start the ``ClusterSingletonManager`` and
supply the ``Props`` of the singleton actor, in this case the JMS queue consumer.
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#create-singleton-manager
The corresponding Java API for the ``singeltonProps`` function is ``akka.contrib.pattern.ClusterSingletonPropsFactory``.
Here we use an application specific ``terminationMessage`` to be able to close the
resources before actually stopping the singleton actor. Note that ``PoisonPill`` is a
perfectly fine ``terminationMessage`` if you only need to stop the actor.
Here is how the singleton actor handles the ``terminationMessage`` in this example.
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#consumer-end
Note that you can send back current state to the ``ClusterSingletonManager`` before terminating.
This message will be sent over to the ``ClusterSingletonManager`` at the new leader node and it
will be passed to the ``singletonProps`` factory when creating the new singleton instance.
With the names given above the singleton actor can be looked up by subscribing to
``LeaderChanged`` cluster event and using ``actorFor``:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-actorFor
.. note:: The singleton pattern will be simplified, perhaps provided out-of-the-box, when the cluster handles automatic actor partitioning.

View file

@ -32,6 +32,7 @@ The Current List of Modules
throttle throttle
jul jul
peek-mailbox peek-mailbox
cluster-singleton
Suggested Way of Using these Contributions Suggested Way of Using these Contributions
------------------------------------------ ------------------------------------------

View file

@ -0,0 +1,596 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.Actor.Receive
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.FSM
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.AkkaException
object ClusterSingletonManager {
/**
* Internal API
* public due to the `with FSM` type parameters
*/
sealed trait State
/**
* Internal API
* public due to the `with FSM` type parameters
*/
sealed trait Data
/**
* Internal API
*/
private object Internal {
/**
* Sent from new leader to previous leader to initate the
* hand over process. `HandOverInProgress` and `HandOverDone`
* are expected replies.
*/
case object HandOverToMe
/**
* Confirmation by the previous leader that the hand
* over process, shutdown of the singleton actor, has
* started.
*/
case object HandOverInProgress
/**
* Confirmation by the previous leader that the singleton
* actor has been terminated and the hand over process is
* completed. The `handOverData` holds the message, if any,
* sent from the singleton actor to its parent ClusterSingletonManager
* when shutting down. It is passed to the `singletonProps`
* factory on the new leader node.
*/
case class HandOverDone(handOverData: Option[Any])
/**
* Sent from from previous leader to new leader to
* initiate the normal hand over process.
* Especially useful when new node joins and becomes
* leader immediately, without knowing who was previous
* leader.
*/
case object TakeOverFromMe
case class HandOverRetry(count: Int)
case class TakeOverRetry(leaderPeer: ActorRef, count: Int)
case object Cleanup
case object StartLeaderChangedBuffer
case object Start extends State
case object Leader extends State
case object NonLeader extends State
case object BecomingLeader extends State
case object WasLeader extends State
case object HandingOver extends State
case object TakeOver extends State
case object Uninitialized extends Data
case class NonLeaderData(leaderOption: Option[Address]) extends Data
case class BecomingLeaderData(previousLeaderOption: Option[Address]) extends Data
case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false,
handOverData: Option[Any] = None) extends Data
case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any],
newLeader: Address) extends Data
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data
val HandOverRetryTimer = "hand-over-retry"
val TakeOverRetryTimer = "take-over-retry"
val CleanupTimer = "cleanup"
object LeaderChangedBuffer {
/**
* Request to deliver one more event.
*/
case object GetNext
/**
* The first event, corresponding to CurrentClusterState.
*/
case class InitialLeaderState(leader: Option[Address], memberCount: Int)
}
/**
* Notifications of [[akka.cluster.ClusterEvent.LeaderChanged]] is tunneled
* via this actor (child of ClusterSingletonManager) to be able to deliver
* one change at a time. Avoiding simultaneous leader changes simplifies
* the process in ClusterSingletonManager. ClusterSingletonManager requests
* next event with `GetNext` when it is ready for it. Only one outstanding
* `GetNext` request is allowed. Incoming events are buffered and delivered
* upon `GetNext` request.
*/
class LeaderChangedBuffer extends Actor {
import LeaderChangedBuffer._
import context.dispatcher
val cluster = Cluster(context.system)
var changes = Vector.empty[AnyRef]
var memberCount = 0
// subscribe to LeaderChanged, re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case state: CurrentClusterState
changes :+= InitialLeaderState(state.leader, state.members.size)
case event: LeaderChanged
changes :+= event
case GetNext if changes.isEmpty
context.become(deliverNext, discardOld = false)
case GetNext
val event = changes.head
changes = changes.tail
context.parent ! event
}
// the buffer was empty when GetNext was received, deliver next event immediately
def deliverNext: Actor.Receive = {
case state: CurrentClusterState
context.parent ! InitialLeaderState(state.leader, state.members.size)
context.unbecome()
case event: LeaderChanged
context.parent ! event
context.unbecome()
}
}
}
}
/**
* Java API. Factory for the [[akka.actor.Props]] of the singleton
* actor instance. Used in constructor of
* [[akka.contrib.pattern.ClusterSingletonManager]]
*/
@SerialVersionUID(1L)
trait ClusterSingletonPropsFactory extends Serializable {
/**
* Create the `Props` from the `handOverData` sent from
* previous singleton. `handOverData` might be null
* when no hand over took place, or when the there is no need
* for sending data to the new singleton.
*/
def create(handOverData: Any): Props
}
/**
* Thrown when a consistent state can't be determined within the
* defined retry limits. Eventually it will reach a stable state and
* can continue, and that is simplified by starting over with a clean
* state. Parent supervisor should typically restart the actor, i.e.
* default decision.
*/
class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(message, null)
/**
* Manages a cluster wide singleton actor instance, i.e.
* at most one singleton instance is running at any point in time.
* The ClusterSingletonManager is supposed to be started on all
* nodes in the cluster with `actorOf`. The actual singleton is
* started on the leader node of the cluster by creating a child
* actor from the supplied `singletonProps`.
*
* The singleton actor is always running on the leader member, which is
* nothing more than the address currently sorted first in the member
* ring. This can change when adding or removing members. A graceful hand
* over can normally be performed when joining a new node that becomes
* leader or removing current leader node. Be aware that there is a
* short time period when there is no active singleton during the
* hand over process.
*
* The singleton actor can at any time send a message to its parent
* ClusterSingletonManager and this message will be passed to the
* `singletonProps` factory on the new leader node when a graceful
* hand over is performed.
*
* The cluster failure detector will notice when a leader node
* becomes unreachable due to things like JVM crash, hard shutdown,
* or network failure. Then a new leader node will take over and a
* new singleton actor is created. For these failure scenarios there
* will not be a graceful hand over, but more than one active singletons
* is prevented by all reasonable means. Some corner cases are eventually
* resolved by configurable timeouts.
*
* You access the singleton actor with `actorFor` using the names you have
* specified when creating the ClusterSingletonManager. You can subscribe to
* [[akka.cluster.ClusterEvent.LeaderChanged]] to keep track of which node
* it is supposed to be running on. Alternatively the singleton actor may
* broadcast its existence when it is started.
*
* ==Arguments==
*
* '''''singletonProps''''' Factory for [[akka.actor.Props]] of the
* singleton actor instance. The `Option` parameter is the the
* `handOverData` sent from previous singleton. `handOverData`
* might be None when no hand over took place, or when the there
* is no need for sending data to the new singleton. The `handOverData`
* is typically passed as parameter to the constructor of the
* singleton actor.
*
* '''''singletonName''''' The actor name of the child singleton actor.
*
* '''''terminationMessage''''' When handing over to a new leader node
* this `terminationMessage` is sent to the singleton actor to tell
* it to finish its work, close resources, and stop. It can sending
* a message back to the parent ClusterSingletonManager, which will
* passed to the `singletonProps` factory on the new leader node.
* The hand over to the new leader node is completed when the
* singleton actor is terminated.
* Note that [[akka.actor.PoisonPill]] is a perfectly fine
* `terminationMessage` if you only need to stop the actor.
*
* '''''maxHandOverRetries''''' When a node is becoming leader it sends
* hand over request to previous leader. This is retried with the
* `retryInterval` until the previous leader confirms that the hand
* over has started, or this `maxHandOverRetries` limit has been
* reached. If the retry limit is reached it takes the decision to be
* the new leader if previous leader is unknown (typically removed or
* downed), otherwise it initiates a new round by throwing
* [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting
* restart with fresh state. For a cluster with many members you might
* need to increase this retry limit because it takes longer time to
* propagate changes across all nodes.
*
* '''''maxTakeOverRetries''''' When a leader node is not leader any more
* it sends take over request to the new leader to initiate the normal
* hand over process. This is especially useful when new node joins and becomes
* leader immediately, without knowing who was previous leader. This is retried
* with the `retryInterval` until this retry limit has been reached. If the retry
* limit is reached it initiates a new round by throwing
* [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting
* restart with fresh state. This will also cause the singleton actor to be
* stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
* ensure that new leader doesn't start singleton actor before previous is
* stopped for certain corner cases.
*
* '''''loggingEnabled''''' Logging of what is going on at info log level.
*/
class ClusterSingletonManager(
singletonProps: Option[Any] Props,
singletonName: String,
terminationMessage: Any,
maxHandOverRetries: Int = 20,
maxTakeOverRetries: Int = 15,
retryInterval: FiniteDuration = 1.second,
loggingEnabled: Boolean = true)
extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] {
// to ensure that new leader doesn't start singleton actor before previous is stopped for certain corner cases
require(maxTakeOverRetries < maxHandOverRetries,
s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]")
/**
* Full Java API constructor.
*/
def this(
singletonName: String,
terminationMessage: Any,
maxHandOverRetries: Int,
maxTakeOverRetries: Int,
retryInterval: FiniteDuration,
loggingEnabled: Boolean,
singletonPropsFactory: ClusterSingletonPropsFactory) =
this(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
maxHandOverRetries, maxTakeOverRetries, retryInterval)
/**
* Java API constructor with default values.
*/
def this(
singletonName: String,
terminationMessage: Any,
singletonPropsFactory: ClusterSingletonPropsFactory) =
this(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage)
import ClusterSingletonManager._
import ClusterSingletonManager.Internal._
import ClusterSingletonManager.Internal.LeaderChangedBuffer._
val cluster = Cluster(context.system)
val selfAddressOption = Some(cluster.selfAddress)
// started when when self member is Up
var leaderChangedBuffer: ActorRef = _
// Previous GetNext request delivered event and new GetNext is to be sent
var leaderChangedReceived = true
// keep track of previously downed members
var downed = Map.empty[Address, Deadline]
// keep track of previously removed members
var removed = Map.empty[Address, Deadline]
def addDowned(address: Address): Unit =
downed += address -> (Deadline.now + 15.minutes)
def addRemoved(address: Address): Unit =
removed += address -> (Deadline.now + 15.minutes)
def cleanupOverdueNotMemberAnyMore(): Unit = {
downed = downed filter { case (address, deadline) deadline.hasTimeLeft }
removed = removed filter { case (address, deadline) deadline.hasTimeLeft }
}
def logInfo(message: String): Unit =
if (loggingEnabled) log.info(message)
def logInfo(template: String, arg1: Any): Unit =
if (loggingEnabled) log.info(template, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
if (loggingEnabled) log.info(template, arg1, arg2)
override def preStart(): Unit = {
super.preStart()
require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart
cluster.subscribe(self, classOf[MemberDowned])
cluster.subscribe(self, classOf[MemberRemoved])
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
// defer subscription to LeaderChanged to avoid some jitter when
// starting/joining several nodes at the same time
cluster.registerOnMemberUp(self ! StartLeaderChangedBuffer)
}
override def postStop(): Unit = {
cancelTimer(CleanupTimer)
cluster.unsubscribe(self)
super.postStop()
}
def peer(at: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(at))
def getNextLeaderChanged(): Unit =
if (leaderChangedReceived) {
leaderChangedReceived = false
leaderChangedBuffer ! GetNext
}
startWith(Start, Uninitialized)
when(Start) {
case Event(StartLeaderChangedBuffer, _)
leaderChangedBuffer = context.actorOf(Props[LeaderChangedBuffer].withDispatcher(context.props.dispatcher))
getNextLeaderChanged()
stay
case Event(InitialLeaderState(leaderOption, memberCount), _)
leaderChangedReceived = true
if (leaderOption == selfAddressOption && memberCount == 1)
// alone, leader immediately
gotoLeader(None)
else if (leaderOption == selfAddressOption)
goto(BecomingLeader) using BecomingLeaderData(None)
else
goto(NonLeader) using NonLeaderData(leaderOption)
}
when(NonLeader) {
case Event(LeaderChanged(leaderOption), NonLeaderData(previousLeaderOption))
leaderChangedReceived = true
if (leaderOption == selfAddressOption) {
logInfo("NonLeader observed LeaderChanged: [{} -> myself]", previousLeaderOption)
previousLeaderOption match {
case None gotoLeader(None)
case Some(prev) if downed.contains(prev) gotoLeader(None)
case Some(prev)
peer(prev) ! HandOverToMe
goto(BecomingLeader) using BecomingLeaderData(previousLeaderOption)
}
} else {
logInfo("NonLeader observed LeaderChanged: [{} -> {}]", previousLeaderOption, leaderOption)
getNextLeaderChanged()
stay using NonLeaderData(leaderOption)
}
case Event(MemberDowned(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader
logInfo("Previous leader downed [{}]", m.address)
addDowned(m.address)
// transition when LeaderChanged
stay using NonLeaderData(None)
case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
}
when(BecomingLeader) {
case Event(HandOverInProgress, _)
// confirmation that the hand over process has started
logInfo("Hand over in progress at [{}]", sender.path.address)
cancelTimer(HandOverRetryTimer)
stay
case Event(HandOverDone(handOverData), BecomingLeaderData(Some(previousLeader)))
if (sender.path.address == previousLeader)
gotoLeader(handOverData)
else {
logInfo("Ignoring HandOverDone in BecomingLeader from [{}]. Expected previous leader [{}]",
sender.path.address, previousLeader)
stay
}
case Event(MemberDowned(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader
logInfo("Previous leader [{}] downed", previousLeader)
addDowned(m.address)
gotoLeader(None)
case Event(TakeOverFromMe, BecomingLeaderData(None))
sender ! HandOverToMe
stay using BecomingLeaderData(Some(sender.path.address))
case Event(TakeOverFromMe, BecomingLeaderData(Some(previousLeader)))
if (previousLeader == sender.path.address) sender ! HandOverToMe
else logInfo("Ignoring TakeOver request in BecomingLeader from [{}]. Expected previous leader [{}]",
sender.path.address, previousLeader)
stay
case Event(HandOverRetry(count), BecomingLeaderData(previousLeaderOption))
if (count <= maxHandOverRetries) {
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption)
previousLeaderOption foreach { peer(_) ! HandOverToMe }
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false)
} else if (previousLeaderOption.isEmpty) {
// can't send HandOverToMe, previousLeader unknown for new node (or restart)
// previous leader might be down or removed, so no TakeOverFromMe message is received
logInfo("Timeout in BecomingLeader. Previous leader unknown and no TakeOver request.")
gotoLeader(None)
} else
throw new ClusterSingletonManagerIsStuck(
s"Becoming singleton leader was stuck because previous leader [${previousLeaderOption}] is unresponsive")
}
def gotoLeader(handOverData: Option[Any]): State = {
logInfo("Singleton manager [{}] starting singleton actor", cluster.selfAddress)
val singleton = context watch context.actorOf(singletonProps(handOverData), singletonName)
goto(Leader) using LeaderData(singleton)
}
when(Leader) {
case Event(LeaderChanged(leaderOption), LeaderData(singleton, singletonTerminated, handOverData))
leaderChangedReceived = true
logInfo("Leader observed LeaderChanged: [{} -> {}]", cluster.selfAddress, leaderOption)
leaderOption match {
case Some(a) if a == cluster.selfAddress
// already leader
stay
case Some(a) if downed.contains(a) || removed.contains(a)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
case Some(a)
// send TakeOver request in case the new leader doesn't know previous leader
val leaderPeer = peer(a)
leaderPeer ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false)
goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a)
case _
// new leader will initiate the hand over
stay
}
case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(singletonHandOverMessage, d @ LeaderData(singleton, _, _)) if sender == singleton
stay using d.copy(handOverData = Some(singletonHandOverMessage))
case Event(Terminated(ref), d @ LeaderData(singleton, _, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)
}
when(WasLeader) {
case Event(TakeOverRetry(leaderPeer, count), _)
val newLeader = leaderPeer.path.address
if (count <= maxTakeOverRetries) {
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader)
leaderPeer ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false)
stay
} else
throw new ClusterSingletonManagerIsStuck(s"Expected hand over to [${newLeader}] never occured")
case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader
addDowned(m.address)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
case Event(singletonHandOverMessage, d @ WasLeaderData(singleton, _, _, _)) if sender == singleton
stay using d.copy(handOverData = Some(singletonHandOverMessage))
case Event(Terminated(ref), d @ WasLeaderData(singleton, _, _, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)
}
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], handOverTo: Option[ActorRef]): State = {
if (singletonTerminated) {
handOverDone(handOverTo, handOverData)
} else {
handOverTo foreach { _ ! HandOverInProgress }
singleton ! terminationMessage
goto(HandingOver) using HandingOverData(singleton, handOverTo, handOverData)
}
}
when(HandingOver) {
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo, handOverData))) if ref == singleton
handOverDone(handOverTo, handOverData)
case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo, _)) if handOverTo == Some(sender)
// retry
sender ! HandOverInProgress
stay
case Event(singletonHandOverMessage, d @ HandingOverData(singleton, _, _)) if sender == singleton
stay using d.copy(handOverData = Some(singletonHandOverMessage))
}
def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = {
val newLeader = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand over done [{} -> {}]", cluster.selfAddress, newLeader)
handOverTo foreach { _ ! HandOverDone(handOverData) }
goto(NonLeader) using NonLeaderData(newLeader)
}
whenUnhandled {
case Event(_: CurrentClusterState, _) stay
case Event(MemberRemoved(m), _)
logInfo("Member removed [{}]", m.address)
// if self removed, it will be stopped onTranstion to NonLeader
addRemoved(m.address)
stay
case Event(MemberDowned(m), _)
logInfo("Member downed [{}]", m.address)
addDowned(m.address)
stay
case Event(TakeOverFromMe, _)
logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender.path.address)
stay
case Event(Cleanup, _)
cleanupOverdueNotMemberAnyMore()
stay
}
onTransition {
case from -> to logInfo("ClusterSingletonManager state change [{} -> {}]", from, to)
}
onTransition {
case _ -> BecomingLeader setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false)
}
onTransition {
case BecomingLeader -> _ cancelTimer(HandOverRetryTimer)
case WasLeader -> _ cancelTimer(TakeOverRetryTimer)
}
onTransition {
case _ -> (NonLeader | Leader) getNextLeaderChanged()
}
onTransition {
case _ -> NonLeader if removed.contains(cluster.selfAddress) || downed.contains(cluster.selfAddress)
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
}
}

View file

@ -0,0 +1,326 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.ImplicitSender
import akka.testkit.TestEvent._
import akka.actor.Terminated
object ClusterSingletonManagerSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
akka.cluster.auto-down = on
"""))
testTransport(on = true)
object PointToPointChannel {
case object RegisterConsumer
case object UnregisterConsumer
case object RegistrationOk
case object UnexpectedRegistration
case object UnregistrationOk
case object UnexpectedUnregistration
case object Reset
case object ResetOk
}
/**
* This channel is extremly strict with regards to
* registration and unregistration of consumer to
* be able to detect misbehaviour (e.g. two active
* singleton instances).
*/
class PointToPointChannel extends Actor with ActorLogging {
import PointToPointChannel._
def receive = idle
def idle: Receive = {
case RegisterConsumer
log.info("RegisterConsumer: [{}]", sender.path)
sender ! RegistrationOk
context.become(active(sender))
case UnregisterConsumer
log.info("UnexpectedUnregistration: [{}]", sender.path)
sender ! UnexpectedUnregistration
context stop self
case Reset sender ! ResetOk
case msg // no consumer, drop
}
def active(consumer: ActorRef): Receive = {
case UnregisterConsumer if sender == consumer
log.info("UnregistrationOk: [{}]", sender.path)
sender ! UnregistrationOk
context.become(idle)
case UnregisterConsumer
log.info("UnexpectedUnregistration: [{}], expected [{}]", sender.path, consumer.path)
sender ! UnexpectedUnregistration
context stop self
case RegisterConsumer
log.info("Unexpected RegisterConsumer [{}], active consumer [{}]", sender.path, consumer.path)
sender ! UnexpectedRegistration
context stop self
case Reset
context.become(idle)
sender ! ResetOk
case msg consumer ! msg
}
}
object Consumer {
case object End
case object GetCurrent
}
/**
* The Singleton actor
*/
class Consumer(handOverData: Option[Any], queue: ActorRef, delegateTo: ActorRef) extends Actor {
import Consumer._
import PointToPointChannel._
var current: Int = handOverData match {
case Some(x: Int) x
case Some(x) throw new IllegalArgumentException(s"handOverData must be an Int, got [${x}]")
case None 0
}
override def preStart(): Unit = queue ! RegisterConsumer
def receive = {
case n: Int if n <= current
context.stop(self)
case n: Int
current = n
delegateTo ! n
case x @ (RegistrationOk | UnexpectedRegistration)
delegateTo ! x
case GetCurrent
sender ! current
//#consumer-end
case End
queue ! UnregisterConsumer
case UnregistrationOk
// reply to ClusterSingletonManager with hand over data,
// which will be passed as parameter to new leader consumer
context.parent ! current
context stop self
//#consumer-end
}
}
}
class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode2 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode3 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerSpec._
import ClusterSingletonManagerSpec.PointToPointChannel._
import ClusterSingletonManagerSpec.Consumer._
override def initialParticipants = roles.size
// Sort the roles in the order used by the cluster.
lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = {
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import Member.addressOrdering
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address)
}
roles.filterNot(_ == controller).toVector.sorted
}
def queue: ActorRef = system.actorFor(node(controller) / "user" / "queue")
def createSingleton(): ActorRef = {
//#create-singleton-manager
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = handOverData
Props(new Consumer(handOverData, queue, testActor)),
singletonName = "consumer",
terminationMessage = End)),
name = "singleton")
//#create-singleton-manager
}
def consumer(leader: RoleName): ActorRef = {
// the reason for this complicated way of creating the path is to illustrate
// in documentation how it's typically done in user land
LeaderChanged(Some(node(leader).address)) match {
//#singleton-actorFor
case LeaderChanged(Some(leaderAddress))
val path = RootActorPath(leaderAddress) / "user" / "singleton" / "consumer"
val consumer = system.actorFor(path)
//#singleton-actorFor
consumer
}
}
def verify(leader: RoleName, msg: Int, expectedCurrent: Int): Unit = {
enterBarrier("before-" + leader.name + "-verified")
runOn(leader) {
expectMsg(RegistrationOk)
consumer(leader) ! GetCurrent
expectMsg(expectedCurrent)
}
enterBarrier(leader.name + "-active")
runOn(controller) {
queue ! msg
// make sure it's not terminated, which would be wrong
expectNoMsg(1 second)
}
runOn(leader) {
expectMsg(msg)
}
runOn(sortedClusterRoles.filterNot(_ == leader): _*) {
expectNoMsg(1 second)
}
enterBarrier(leader.name + "-verified")
}
def crash(roles: RoleName*): Unit = {
runOn(controller) {
queue ! Reset
expectMsg(ResetOk)
roles foreach { r
log.info("Shutdown [{}]", node(r).address)
testConductor.shutdown(r, 0).await
}
}
}
"A ClusterSingletonManager" must {
"startup in single member cluster" in within(10 seconds) {
log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", "))
runOn(controller) {
// watch that it is not terminated, which would indicate misbehaviour
watch(system.actorOf(Props[PointToPointChannel], "queue"))
}
enterBarrier("queue-started")
runOn(sortedClusterRoles(5)) {
Cluster(system) join node(sortedClusterRoles(5)).address
createSingleton()
}
verify(sortedClusterRoles.last, msg = 1, expectedCurrent = 0)
}
"hand over when new leader joins to 1 node cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(4)
runOn(newLeaderRole) {
Cluster(system) join node(sortedClusterRoles.last).address
createSingleton()
}
verify(newLeaderRole, msg = 2, expectedCurrent = 1)
}
"hand over when new leader joins to 2 nodes cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(3)
runOn(newLeaderRole) {
Cluster(system) join node(sortedClusterRoles.last).address
createSingleton()
}
verify(newLeaderRole, msg = 3, expectedCurrent = 2)
}
"hand over when adding three new potential leaders to 3 nodes cluster" in within(30 seconds) {
runOn(sortedClusterRoles(2)) {
Cluster(system) join node(sortedClusterRoles(3)).address
createSingleton()
}
runOn(sortedClusterRoles(1)) {
Cluster(system) join node(sortedClusterRoles(4)).address
createSingleton()
}
runOn(sortedClusterRoles(0)) {
Cluster(system) join node(sortedClusterRoles(5)).address
createSingleton()
}
verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3)
}
"hand over when leader leaves in 6 nodes cluster " in within(20 seconds) {
val leaveRole = sortedClusterRoles(0)
val newLeaderRole = sortedClusterRoles(1)
runOn(leaveRole) {
Cluster(system) leave node(leaveRole).address
}
verify(newLeaderRole, msg = 5, expectedCurrent = 4)
runOn(leaveRole) {
val singleton = system.actorFor("/user/singleton")
watch(singleton)
expectMsgType[Terminated].actor must be(singleton)
}
enterBarrier("after-leave")
}
"take over when leader crashes in 5 nodes cluster" in within(35 seconds) {
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*")))
system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Disassociated.*")))
system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Association failed.*")))
enterBarrier("logs-muted")
crash(sortedClusterRoles(1))
verify(sortedClusterRoles(2), msg = 6, expectedCurrent = 0)
}
"take over when two leaders crash in 3 nodes cluster" in within(45 seconds) {
crash(sortedClusterRoles(2), sortedClusterRoles(3))
verify(sortedClusterRoles(4), msg = 7, expectedCurrent = 0)
}
"take over when leader crashes in 2 nodes cluster" in within(25 seconds) {
crash(sortedClusterRoles(4))
verify(sortedClusterRoles(5), msg = 6, expectedCurrent = 0)
}
}
}

View file

@ -33,7 +33,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
import ReliableProxySpec._ import ReliableProxySpec._
import ReliableProxy._ import ReliableProxy._
override def initialParticipants = 2 override def initialParticipants = roles.size
override def afterEach { override def afterEach {
runOn(local) { runOn(local) {

View file

@ -258,6 +258,17 @@ has at least the defined number of members.
This callback can be used for other things than starting actors. This callback can be used for other things than starting actors.
Cluster Singleton Pattern
^^^^^^^^^^^^^^^^^^^^^^^^^
For some use cases it is convenient and sometimes also mandatory to ensure that
you have exactly one actor of a certain type running somewhere in the cluster.
This can be implemented by subscribing to ``LeaderChanged`` events, but there are
several corner cases to consider. Therefore, this specific use case is made easily
accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is,
or adjust to fit your specific needs.
Failure Detector Failure Detector
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
@ -351,8 +362,8 @@ The same type of router could also have been defined in code:
See :ref:`cluster_configuration_java` section for further descriptions of the settings. See :ref:`cluster_configuration_java` section for further descriptions of the settings.
Router Example Router Example with Remote Deployed Routees
-------------- -------------------------------------------
Let's take a look at how to use cluster aware routers. Let's take a look at how to use cluster aware routers.
@ -416,25 +427,35 @@ service nodes and 1 client::
-Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" -Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain"
Router Example with Lookup of Routees
-------------------------------------
The above setup is nice for this example, but we will also take a look at how to use The above setup is nice for this example, but we will also take a look at how to use
a single master node that creates and deploys workers. To keep track of a single a single master node that creates and deploys workers. To keep track of a single
master we need one additional actor: master we use the :ref:`cluster-singleton` in the contrib module. The ``ClusterSingletonManager``
is started on each node.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#create-singleton-manager
We also need an actor on each node that keeps track of where current single master exists and
delegates jobs to the ``StatsService``.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java#facade .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java#facade
The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single
master. It listens to cluster events to create or lookup the ``StatsService`` depending on if master. It listens to cluster events to lookup the ``StatsService`` on the leader node. The master runs
it is on the same same node or on another node. We run the master on the same node as the leader of on the same node as the leader of the cluster members, which is nothing more than the address currently
the cluster members, which is nothing more than the address currently sorted first in the member ring, sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves.
i.e. it can change when new nodes join or when current leader leaves.
All nodes start ``StatsFacade`` and the router is now configured like this: All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#start-router-deploy .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#start-router-deploy
This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the
`source <@github@/akka-samples/akka-sample-cluster>`_ to your `source <@github@/akka-samples/akka-sample-cluster>`_ to your
maven project, defined as in :ref:`cluster_simple_example_java`. maven project, defined as in :ref:`cluster_simple_example_java`. Also add the `akka-contrib` dependency
to your pom.xml.
Run it by starting nodes in different terminal windows. For example, starting 3 Run it by starting nodes in different terminal windows. For example, starting 3
service nodes and 1 client:: service nodes and 1 client::
@ -453,7 +474,7 @@ service nodes and 1 client::
-Dexec.mainClass="sample.cluster.stats.japi.StatsSampleOneMasterMain" -Dexec.mainClass="sample.cluster.stats.japi.StatsSampleOneMasterMain"
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. .. note:: The above example will be simplified when the cluster handles automatic actor partitioning.
Cluster Metrics Cluster Metrics
^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^

View file

@ -231,6 +231,17 @@ has at least the defined number of members.
This callback can be used for other things than starting actors. This callback can be used for other things than starting actors.
Cluster Singleton Pattern
^^^^^^^^^^^^^^^^^^^^^^^^^
For some use cases it is convenient and sometimes also mandatory to ensure that
you have exactly one actor of a certain type running somewhere in the cluster.
This can be implemented by subscribing to ``LeaderChanged`` events, but there are
several corner cases to consider. Therefore, this specific use case is made easily
accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is,
or adjust to fit your specific needs.
Failure Detector Failure Detector
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
@ -326,8 +337,8 @@ The same type of router could also have been defined in code:
See :ref:`cluster_configuration_scala` section for further descriptions of the settings. See :ref:`cluster_configuration_scala` section for further descriptions of the settings.
Router Example Router Example with Remote Deployed Routees
-------------- -------------------------------------------
Let's take a look at how to use cluster aware routers. Let's take a look at how to use cluster aware routers.
@ -384,19 +395,27 @@ service nodes and 1 client::
run-main sample.cluster.stats.StatsSample run-main sample.cluster.stats.StatsSample
Router Example with Lookup of Routees
-------------------------------------
The above setup is nice for this example, but we will also take a look at how to use The above setup is nice for this example, but we will also take a look at how to use
a single master node that creates and deploys workers. To keep track of a single a single master node that creates and deploys workers. To keep track of a single
master we need one additional actor: master we use the :ref:`cluster-singleton` in the contrib module. The ``ClusterSingletonManager``
is started on each node.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#create-singleton-manager
We also need an actor on each node that keeps track of where current single master exists and
delegates jobs to the ``StatsService``.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade
The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single
master. It listens to cluster events to create or lookup the ``StatsService`` depending on if master. It listens to cluster events to lookup the ``StatsService`` on the leader node. The master runs
it is on the same same node or on another node. We run the master on the same node as the leader of on the same node as the leader of the cluster members, which is nothing more than the address currently
the cluster members, which is nothing more than the address currently sorted first in the member ring, sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves.
i.e. it can change when new nodes join or when current leader leaves.
All nodes start ``StatsFacade`` and the router is now configured like this: All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy
@ -413,7 +432,7 @@ service nodes and 1 client::
run-main sample.cluster.stats.StatsSampleOneMaster run-main sample.cluster.stats.StatsSampleOneMaster
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. .. note:: The above example will be simplified when the cluster handles automatic actor partitioning.
Cluster Metrics Cluster Metrics

View file

@ -15,8 +15,13 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-experimental_2.10.0-RC1</artifactId> <artifactId>akka-cluster-experimental_2.10.0</artifactId>
<version>2.1-20121016-001042</version> <version>2.2-20130122-001107</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-contrib_2.10.0</artifactId>
<version>2.2-20130122-001107</version>
</dependency> </dependency>
</dependencies> </dependencies>
<repositories> <repositories>

View file

@ -10,7 +10,7 @@ public class FactorialBackendMain {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remoting.transports.tcp.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial")); ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial"));

View file

@ -12,7 +12,7 @@ public class SimpleClusterApp {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remoting.transports.tcp.port", args[0]);
// Create an Akka system // Create an Akka system
ActorSystem system = ActorSystem.create("ClusterSystem"); ActorSystem system = ActorSystem.create("ClusterSystem");

View file

@ -5,13 +5,11 @@ import sample.cluster.stats.japi.StatsMessages.JobFailed;
import sample.cluster.stats.japi.StatsMessages.StatsJob; import sample.cluster.stats.japi.StatsMessages.StatsJob;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Address; import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.dispatch.Recover; import akka.dispatch.Recover;
import akka.cluster.Cluster; import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.LeaderChanged; import akka.cluster.ClusterEvent.LeaderChanged;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.util.Timeout; import akka.util.Timeout;
@ -25,8 +23,7 @@ public class StatsFacade extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this); LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system()); Cluster cluster = Cluster.get(getContext().system());
ActorRef currentMaster = null; Address currentMaster = null;
boolean currentMasterCreatedByMe = false;
//subscribe to cluster changes, MemberEvent //subscribe to cluster changes, MemberEvent
@Override @Override
@ -43,54 +40,33 @@ public class StatsFacade extends UntypedActor {
@Override @Override
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof StatsJob && currentMaster == null) { if (message instanceof StatsJob && currentMaster == null) {
getSender() getSender().tell(new JobFailed("Service unavailable, try again later"),
.tell(new JobFailed("Service unavailable, try again later"), getSelf());
getSelf());
} else if (message instanceof StatsJob) { } else if (message instanceof StatsJob) {
StatsJob job = (StatsJob) message; StatsJob job = (StatsJob) message;
Future<Object> f = ask(currentMaster, job, new Timeout(5, SECONDS)). ActorRef service = getContext().actorFor(currentMaster +
recover(new Recover<Object>() { "/user/singleton/statsService");
public Object recover(Throwable t) { Future<Object> f = ask(service, job, new Timeout(5, SECONDS)).recover(
return new JobFailed("Service unavailable, try again later"); new Recover<Object>() {
} public Object recover(Throwable t) {
}, getContext().dispatcher()); return new JobFailed("Service unavailable, try again later");
}
}, getContext().dispatcher());
pipe(f, getContext().dispatcher()).to(getSender()); pipe(f, getContext().dispatcher()).to(getSender());
} else if (message instanceof CurrentClusterState) { } else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message; CurrentClusterState state = (CurrentClusterState) message;
updateCurrentMaster(state.getLeader()); currentMaster = state.getLeader();
} else if (message instanceof LeaderChanged) { } else if (message instanceof LeaderChanged) {
LeaderChanged leaderChanged = (LeaderChanged) message; LeaderChanged leaderChanged = (LeaderChanged) message;
updateCurrentMaster(leaderChanged.getLeader()); currentMaster = leaderChanged.getLeader();
} else { } else {
unhandled(message); unhandled(message);
} }
} }
void updateCurrentMaster(Address leaderAddress) {
if (leaderAddress == null)
return;
if (leaderAddress.equals(cluster.selfAddress())) {
if (!currentMasterCreatedByMe) {
log.info("Creating new statsService master at [{}]", leaderAddress);
currentMaster = getContext().actorOf(
new Props(StatsService.class), "statsService");
currentMasterCreatedByMe = true;
}
} else {
if (currentMasterCreatedByMe) {
getContext().stop(currentMaster);
}
log.info("Using statsService master at [{}]", leaderAddress);
currentMaster = getContext().actorFor(
getSelf().path().toStringWithAddress(leaderAddress)
+ "/statsService");
currentMasterCreatedByMe = false;
}
}
} }
//#facade //#facade

View file

@ -11,7 +11,7 @@ public class StatsSampleMain {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remoting.transports.tcp.port", args[0]);
//#start-router-lookup //#start-router-lookup
ActorSystem system = ActorSystem.create("ClusterSystem", ActorSystem system = ActorSystem.create("ClusterSystem",

View file

@ -1,14 +1,32 @@
package sample.cluster.stats.japi; package sample.cluster.stats.japi;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.ClusterSingletonPropsFactory;
public class StatsSampleOneMasterClientMain { public class StatsSampleOneMasterClientMain {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("ClusterSystem"); ActorSystem system = ActorSystem.create("ClusterSystem");
// the client is also part of the cluster
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public ClusterSingletonManager create() {
return new ClusterSingletonManager("statsService", PoisonPill.getInstance(),
new ClusterSingletonPropsFactory() {
@Override
public Props create(Object handOverData) {
return new Props(StatsService.class);
}
});
}
}), "singleton");
system.actorOf(new Props(new UntypedActorFactory() { system.actorOf(new Props(new UntypedActorFactory() {
@Override @Override
public UntypedActor create() { public UntypedActor create() {

View file

@ -1,7 +1,11 @@
package sample.cluster.stats.japi; package sample.cluster.stats.japi;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActorFactory;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.ClusterSingletonPropsFactory;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
@ -11,13 +15,13 @@ public class StatsSampleOneMasterMain {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remoting.transports.tcp.port", args[0]);
//#start-router-deploy //#start-router-deploy
ActorSystem system = ActorSystem.create("ClusterSystem", ActorSystem system = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString( ConfigFactory.parseString(
"akka.actor.deployment { \n" + "akka.actor.deployment { \n" +
" /statsFacade/statsService/workerRouter { \n" + " /singleton/statsService/workerRouter { \n" +
" router = consistent-hashing \n" + " router = consistent-hashing \n" +
" nr-of-instances = 100 \n" + " nr-of-instances = 100 \n" +
" cluster { \n" + " cluster { \n" +
@ -28,10 +32,24 @@ public class StatsSampleOneMasterMain {
" } \n" + " } \n" +
"} \n") "} \n")
.withFallback(ConfigFactory.load())); .withFallback(ConfigFactory.load()));
system.actorOf(new Props(StatsFacade.class), "statsFacade");
//#start-router-deploy //#start-router-deploy
} //#create-singleton-manager
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public ClusterSingletonManager create() {
return new ClusterSingletonManager("statsService", PoisonPill.getInstance(),
new ClusterSingletonPropsFactory() {
@Override
public Props create(Object handOverData) {
return new Props(StatsService.class);
}
});
}
}), "singleton");
//#create-singleton-manager
system.actorOf(new Props(StatsFacade.class), "statsFacade");
}
} }

View file

@ -9,7 +9,7 @@ public class TransformationBackendMain {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remoting.transports.tcp.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem"); ActorSystem system = ActorSystem.create("ClusterSystem");

View file

@ -18,7 +18,7 @@ public class TransformationFrontendMain {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remoting.transports.tcp.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem"); ActorSystem system = ActorSystem.create("ClusterSystem");

View file

@ -3,10 +3,9 @@ akka {
actor { actor {
provider = "akka.cluster.ClusterActorRefProvider" provider = "akka.cluster.ClusterActorRefProvider"
} }
remote { remoting {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off log-remote-lifecycle-events = off
netty { transports.tcp {
hostname = "127.0.0.1" hostname = "127.0.0.1"
port = 0 port = 0
} }
@ -14,8 +13,8 @@ akka {
cluster { cluster {
seed-nodes = [ seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551", "tcp.akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"] "tcp.akka://ClusterSystem@127.0.0.1:2552"]
auto-down = on auto-down = on
} }

View file

@ -60,7 +60,7 @@ object FactorialBackend {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0))
val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial")) val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial"))
system.actorOf(Props[FactorialBackend], name = "factorialBackend") system.actorOf(Props[FactorialBackend], name = "factorialBackend")

View file

@ -10,7 +10,7 @@ object SimpleClusterApp {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0))
// Create an Akka system // Create an Akka system
val system = ActorSystem("ClusterSystem") val system = ActorSystem("ClusterSystem")

View file

@ -10,6 +10,7 @@ import akka.actor.ActorLogging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.Address import akka.actor.Address
import akka.actor.PoisonPill
import akka.actor.Props import akka.actor.Props
import akka.actor.ReceiveTimeout import akka.actor.ReceiveTimeout
import akka.actor.RelativeActorPath import akka.actor.RelativeActorPath
@ -17,6 +18,7 @@ import akka.actor.RootActorPath
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.contrib.pattern.ClusterSingletonManager
import akka.routing.FromConfig import akka.routing.FromConfig
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.pattern.ask import akka.pattern.ask
@ -91,8 +93,7 @@ class StatsFacade extends Actor with ActorLogging {
import context.dispatcher import context.dispatcher
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
var currentMaster: Option[ActorRef] = None var currentMaster: Option[Address] = None
var currentMasterCreatedByMe = false
// subscribe to cluster changes, LeaderChanged // subscribe to cluster changes, LeaderChanged
// re-subscribe when restart // re-subscribe when restart
@ -104,33 +105,15 @@ class StatsFacade extends Actor with ActorLogging {
sender ! JobFailed("Service unavailable, try again later") sender ! JobFailed("Service unavailable, try again later")
case job: StatsJob case job: StatsJob
implicit val timeout = Timeout(5.seconds) implicit val timeout = Timeout(5.seconds)
currentMaster foreach { currentMaster foreach { address
_ ? job recover { val service = context.actorFor(RootActorPath(address) /
"user" / "singleton" / "statsService")
service ? job recover {
case _ JobFailed("Service unavailable, try again later") case _ JobFailed("Service unavailable, try again later")
} pipeTo sender } pipeTo sender
} }
case state: CurrentClusterState case state: CurrentClusterState currentMaster = state.leader
state.leader foreach updateCurrentMaster case LeaderChanged(leader) currentMaster = leader
case LeaderChanged(Some(leaderAddress))
updateCurrentMaster(leaderAddress)
}
def updateCurrentMaster(leaderAddress: Address): Unit = {
if (leaderAddress == cluster.selfAddress) {
if (!currentMasterCreatedByMe) {
log.info("Creating new statsService master at [{}]", leaderAddress)
currentMaster = Some(context.actorOf(Props[StatsService],
name = "statsService"))
currentMasterCreatedByMe = true
}
} else {
if (currentMasterCreatedByMe)
currentMaster foreach { context.stop(_) }
log.info("Using statsService master at [{}]", leaderAddress)
currentMaster = Some(context.actorFor(
self.path.toStringWithAddress(leaderAddress) + "/statsService"))
currentMasterCreatedByMe = false
}
} }
} }
@ -140,7 +123,7 @@ object StatsSample {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0))
//#start-router-lookup //#start-router-lookup
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
@ -168,12 +151,12 @@ object StatsSampleOneMaster {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0))
//#start-router-deploy //#start-router-deploy
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
akka.actor.deployment { akka.actor.deployment {
/statsFacade/statsService/workerRouter { /singleton/statsService/workerRouter {
router = consistent-hashing router = consistent-hashing
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
@ -186,6 +169,11 @@ object StatsSampleOneMaster {
""").withFallback(ConfigFactory.load())) """).withFallback(ConfigFactory.load()))
//#start-router-deploy //#start-router-deploy
//#create-singleton-manager
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = _ Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill)), name = "singleton")
//#create-singleton-manager
system.actorOf(Props[StatsFacade], name = "statsFacade") system.actorOf(Props[StatsFacade], name = "statsFacade")
} }
} }
@ -200,6 +188,12 @@ object StatsSampleClient {
object StatsSampleOneMasterClient { object StatsSampleOneMasterClient {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val system = ActorSystem("ClusterSystem") val system = ActorSystem("ClusterSystem")
// the client is also part of the cluster
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = _ Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill)), name = "singleton")
system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client") system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client")
} }
} }

View file

@ -30,7 +30,7 @@ object TransformationFrontend {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0))
val system = ActorSystem("ClusterSystem") val system = ActorSystem("ClusterSystem")
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
@ -77,7 +77,7 @@ object TransformationBackend {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
// Override the configuration of the port // Override the configuration of the port
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remoting.transports.tcp.port", args(0))
val system = ActorSystem("ClusterSystem") val system = ActorSystem("ClusterSystem")
system.actorOf(Props[TransformationBackend], name = "backend") system.actorOf(Props[TransformationBackend], name = "backend")

View file

@ -2,15 +2,14 @@ package sample.cluster.stats
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import akka.actor.PoisonPill
import akka.actor.Props import akka.actor.Props
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.contrib.pattern.ClusterSingletonManager
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.Member import akka.cluster.Member
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
@ -37,7 +36,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
#//#router-deploy-config #//#router-deploy-config
akka.actor.deployment { akka.actor.deployment {
/statsFacade/statsService/workerRouter { /singleton/statsService/workerRouter {
router = consistent-hashing router = consistent-hashing
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
@ -82,12 +81,16 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
Cluster(system).unsubscribe(testActor) Cluster(system).unsubscribe(testActor)
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = _ Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill)), name = "singleton")
system.actorOf(Props[StatsFacade], "statsFacade") system.actorOf(Props[StatsFacade], "statsFacade")
testConductor.enter("all-up") testConductor.enter("all-up")
} }
"show usage of the statsFacade" in within(20 seconds) { "show usage of the statsFacade" in within(40 seconds) {
val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade")
// eventually the service should be ok, // eventually the service should be ok,

View file

@ -2,13 +2,11 @@ package sample.cluster.stats.japi
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import akka.actor.PoisonPill
import akka.actor.Props import akka.actor.Props
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.cluster.Cluster import akka.cluster.Cluster
@ -16,10 +14,12 @@ import akka.cluster.Member
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberUp
import akka.contrib.pattern.ClusterSingletonManager
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import sample.cluster.stats.japi.StatsMessages._ import sample.cluster.stats.japi.StatsMessages._
import akka.contrib.pattern.ClusterSingletonPropsFactory
object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig { object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test // register the named roles (nodes) of the test
@ -37,7 +37,7 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
# don't use sigar for tests, native lib not in path # don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
akka.actor.deployment { akka.actor.deployment {
/statsFacade/statsService/workerRouter { /singleton/statsService/workerRouter {
router = consistent-hashing router = consistent-hashing
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
@ -81,12 +81,19 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample
Cluster(system).unsubscribe(testActor) Cluster(system).unsubscribe(testActor)
system.actorOf(Props(new ClusterSingletonManager(
singletonName = "statsService",
terminationMessage = PoisonPill,
singletonPropsFactory = new ClusterSingletonPropsFactory {
def create(handOverData: Any) = Props[StatsService]
})), name = "singleton")
system.actorOf(Props[StatsFacade], "statsFacade") system.actorOf(Props[StatsFacade], "statsFacade")
testConductor.enter("all-up") testConductor.enter("all-up")
} }
"show usage of the statsFacade" in within(20 seconds) { "show usage of the statsFacade" in within(40 seconds) {
val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade")
// eventually the service should be ok, // eventually the service should be ok,

View file

@ -356,7 +356,7 @@ object AkkaBuild extends Build {
lazy val clusterSample = Project( lazy val clusterSample = Project(
id = "akka-sample-cluster-experimental", id = "akka-sample-cluster-experimental",
base = file("akka-samples/akka-sample-cluster"), base = file("akka-samples/akka-sample-cluster"),
dependencies = Seq(cluster, remoteTests % "test", testkit % "test"), dependencies = Seq(cluster, contrib, remoteTests % "test", testkit % "test"),
settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq( settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq(
// sigar is in Typesafe repo // sigar is in Typesafe repo
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
@ -419,7 +419,7 @@ object AkkaBuild extends Build {
lazy val contrib = Project( lazy val contrib = Project(
id = "akka-contrib", id = "akka-contrib",
base = file("akka-contrib"), base = file("akka-contrib"),
dependencies = Seq(remote, remoteTests % "compile;test->test"), dependencies = Seq(remote, remoteTests % "compile;test->test", cluster),
settings = defaultSettings ++ multiJvmSettings ++ Seq( settings = defaultSettings ++ multiJvmSettings ++ Seq(
libraryDependencies ++= Dependencies.contrib, libraryDependencies ++= Dependencies.contrib,
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),