Renaming link/unlink to startsMonitoring/stopsMonitoring

This commit is contained in:
Viktor Klang 2011-10-14 15:09:46 +02:00
parent 07a7b27386
commit 5788ed4281
17 changed files with 57 additions and 1567 deletions

View file

@ -19,7 +19,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"notify with one Terminated message when an Actor is stopped" in {
val terminal = createActor(Props(context { case _ context.self.stop() }))
testActor link terminal
testActor startsMonitoring terminal
terminal ! "anything"
@ -32,9 +32,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val monitor1, monitor2 = createActor(Props(context { case t: Terminated testActor ! t }))
val terminal = createActor(Props(context { case _ context.self.stop() }))
monitor1 link terminal
monitor2 link terminal
testActor link terminal
monitor1 startsMonitoring terminal
monitor2 startsMonitoring terminal
testActor startsMonitoring terminal
terminal ! "anything"
@ -51,11 +51,11 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val monitor1, monitor2 = createActor(Props(context { case t: Terminated testActor ! t }))
val terminal = createActor(Props(context { case _ context.self.stop() }))
monitor1 link terminal
monitor2 link terminal
testActor link terminal
monitor1 startsMonitoring terminal
monitor2 startsMonitoring terminal
testActor startsMonitoring terminal
monitor2 unlink terminal
monitor2 stopsMonitoring terminal
terminal ! "anything"
@ -72,7 +72,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val supervisor = createActor(Props(context { case _ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2))))
val terminal = createActor(Props(context { case x context.channel ! x }).withSupervisor(supervisor))
testActor link terminal
testActor startsMonitoring terminal
terminal ! Kill
terminal ! Kill

View file

@ -55,9 +55,8 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
"not fail when listener goes away" in {
val forward = createActor(new Forwarder(testActor))
val fsm = createActor(new MyFSM(testActor))
val sup = createActor(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
sup link fsm
val fsm = sup startsMonitoring createActor(new MyFSM(testActor))
within(300 millis) {
fsm ! SubscribeTransitionCallBack(forward)
expectMsg(CurrentState(fsm, 0))

View file

@ -137,13 +137,13 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
expectMsg(EventHandler.Debug(actor, "started"))
supervisor link actor
supervisor startsMonitoring actor
expectMsgPF(hint = "now monitoring") {
case EventHandler.Debug(ref, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("now monitoring")
}
supervisor unlink actor
supervisor stopsMonitoring actor
expectMsgPF(hint = "stopped monitoring") {
case EventHandler.Debug(ref, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring")

View file

@ -231,7 +231,7 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll {
}
}).withSupervisor(boss))
boss.link(slave)
boss startsMonitoring slave
slave ! Ping
slave ! Crash

View file

@ -48,7 +48,7 @@ class SupervisorHierarchySpec extends AkkaSpec {
val countDownMessages = new CountDownLatch(1)
val countDownMax = new CountDownLatch(1)
val boss = createActor(Props(new Actor {
val crasher = self.link(createActor(Props(new CountDownActor(countDownMessages)).withSupervisor(self)))
val crasher = self startsMonitoring createActor(Props(new CountDownActor(countDownMessages)).withSupervisor(self))
protected def receive = {
case "killCrasher" crasher ! Kill

View file

@ -262,12 +262,12 @@ private[akka] class ActorCell(
private[akka] def stop(): Unit =
dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel))
def link(subject: ActorRef): ActorRef = {
def startsMonitoring(subject: ActorRef): ActorRef = {
dispatcher.systemDispatch(SystemEnvelope(this, Link(subject), NullChannel))
subject
}
def unlink(subject: ActorRef): ActorRef = {
def stopsMonitoring(subject: ActorRef): ActorRef = {
dispatcher.systemDispatch(SystemEnvelope(this, Unlink(subject), NullChannel))
subject
}

View file

@ -109,19 +109,22 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
def isShutdown: Boolean
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
* <p/>
* If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
* Registers this actor to be a death monitor of the provided ActorRef
* This means that this actor will get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @returns the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def link(actorRef: ActorRef): ActorRef
def startsMonitoring(subject: ActorRef): ActorRef
/**
* Unlink the actor.
* Deregisters this actor from being a death monitor of the provided ActorRef
* This means that this actor will not get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @returns the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def unlink(actorRef: ActorRef): ActorRef
def stopsMonitoring(subject: ActorRef): ActorRef
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit
@ -194,25 +197,22 @@ class LocalActorRef private[akka] (
def stop(): Unit = actorCell.stop()
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
* <p/>
* If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
* <p/>
* To be invoked from within the actor itself.
* Returns the ref that was passed into it
* Registers this actor to be a death monitor of the provided ActorRef
* This means that this actor will get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @returns the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def link(subject: ActorRef): ActorRef = actorCell.link(subject)
def startsMonitoring(subject: ActorRef): ActorRef = actorCell.startsMonitoring(subject)
/**
* Unlink the actor.
* <p/>
* To be invoked from within the actor itself.
* Returns the ref that was passed into it
* Deregisters this actor from being a death monitor of the provided ActorRef
* This means that this actor will not get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @returns the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def unlink(subject: ActorRef): ActorRef = actorCell.unlink(subject)
def stopsMonitoring(subject: ActorRef): ActorRef = actorCell.stopsMonitoring(subject)
// ========= AKKA PROTECTED FUNCTIONS =========
@ -343,9 +343,9 @@ case class SerializedActorRef(uuid: Uuid, address: String, hostname: String, por
*/
trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
def link(actorRef: ActorRef): ActorRef = unsupported
def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported
def unlink(actorRef: ActorRef): ActorRef = unsupported
def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported
def suspend(): Unit = unsupported
@ -360,9 +360,9 @@ class DeadLetterActorRef(app: AkkaApplication) extends UnsupportedActorRef {
val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher)
val address: String = "akka:internal:DeadLetterActorRef"
override def link(actorRef: ActorRef): ActorRef = actorRef
override def startsMonitoring(actorRef: ActorRef): ActorRef = actorRef
override def unlink(actorRef: ActorRef): ActorRef = actorRef
override def stopsMonitoring(actorRef: ActorRef): ActorRef = actorRef
def isShutdown(): Boolean = true

View file

@ -116,7 +116,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
val requestedCapacity = capacity(_delegates)
val newDelegates = requestedCapacity match {
case qty if qty > 0
_delegates ++ Vector.fill(requestedCapacity)(self link instance(defaultProps))
_delegates ++ Vector.fill(requestedCapacity)(self startsMonitoring instance(defaultProps))
case qty if qty < 0
_delegates.splitAt(_delegates.length + requestedCapacity) match {

View file

@ -295,9 +295,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
callback.done(false)
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef): ActorRef = unsupported
def unlink(actorRef: ActorRef): ActorRef = unsupported
def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported
def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Timeout, channel: UntypedChannel) = unsupported
def restart(reason: Throwable): Unit = unsupported

View file

@ -7,494 +7,4 @@ Fault Tolerance Through Supervisor Hierarchies (Java)
.. contents:: :local:
Module stability: **SOLID**
The "let it crash" approach to fault/error handling, implemented by linking actors, is very different to what Java and most non-concurrency oriented languages/frameworks have adopted. Its a way of dealing with failure that is designed for concurrent and distributed systems.
Concurrency
-----------
Throwing an exception in concurrent code (lets assume we are using non-linked actors), will just simply blow up the thread that currently executes the actor.
- There is no way to find out that things went wrong (apart from inspecting the stack trace).
- There is nothing you can do about it.
Here actors provide a clean way of getting notification of the error and do something about it.
Linking actors also allow you to create sets of actors where you can be sure that either:
- All are dead
- None are dead
This is very useful when you have thousands of concurrent actors. Some actors might have implicit dependencies and together implement a service, computation, user session etc.
It encourages non-defensive programming. Dont try to prevent things from go wrong, because they will, whether you want it or not. Instead; expect failure as a natural state in the life-cycle of your app, crash early and let someone else (that sees the whole picture), deal with it.
Distributed actors
------------------
You cant build a fault-tolerant system with just one single box - you need at least two. Also, you (usually) need to know if one box is down and/or the service you are talking to on the other box is down. Here actor supervision/linking is a critical tool for not only monitoring the health of remote services, but to actually manage the service, do something about the problem if the actor or node is down. Such as restarting actors on the same node or on another node.
In short, it is a very different way of thinking, but a way that is very useful (if not critical) to building fault-tolerant highly concurrent and distributed applications, which is as valid if you are writing applications for the JVM or the Erlang VM (the origin of the idea of "let-it-crash" and actor supervision).
Supervision
-----------
Supervisor hierarchies originate from `Erlangs OTP framework <http://www.erlang.org/doc/design_principles/sup_princ.html#5>`_.
A supervisor is responsible for starting, stopping and monitoring its child processes. The basic idea of a supervisor is that it should keep its child processes alive by restarting them when necessary. This makes for a completely different view on how to write fault-tolerant servers. Instead of trying all things possible to prevent an error from happening, this approach embraces failure. It shifts the view to look at errors as something natural and something that **will** happen, instead of trying to prevent it; embraces it. Just Let It Crash™, since the components will be reset to a stable state and restarted upon failure.
Akka has two different restart strategies; All-For-One and One-For-One. Best explained using some pictures (referenced from `erlang.org <http://erlang.org>`_ ):
OneForOne
^^^^^^^^^
The OneForOne fault handler will restart only the component that has crashed.
`<image:http://www.erlang.org/doc/design_principles/sup4.gif>`_
AllForOne
^^^^^^^^^
The AllForOne fault handler will restart all the components that the supervisor is managing, including the one that have crashed. This strategy should be used when you have a certain set of components that are coupled in some way that if one is crashing they all need to be reset to a stable state before continuing.
`<image:http://www.erlang.org/doc/design_principles/sup5.gif>`_
Restart callbacks
^^^^^^^^^^^^^^^^^
There are two different callbacks that an UntypedActor or TypedActor can hook in to:
- Pre restart
- Post restart
These are called prior to and after the restart upon failure and can be used to clean up and reset/reinitialize state upon restart. This is important in order to reset the component failure and leave the component in a fresh and stable state before consuming further messages.
Defining a supervisor's restart strategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Both the Typed Actor supervisor configuration and the Actor supervisor configuration take a FaultHandlingStrategy instance which defines the fault management. The different strategies are:
- AllForOne
- OneForOne
These have the semantics outlined in the section above.
Here is an example of how to define a restart strategy:
.. code-block:: java
new AllForOneStrategy( //Or OneForOneStrategy
new Class[]{ Exception.class }, //List of Exceptions/Throwables to handle
3, // maximum number of restart retries
5000 // within time in millis
)
Defining actor life-cycle
^^^^^^^^^^^^^^^^^^^^^^^^^
The other common configuration element is the LifeCycle which defines the life-cycle. The supervised actor can define one of two different life-cycle configurations:
- Permanent: which means that the actor will always be restarted.
- Temporary: which means that the actor will **not** be restarted, but it will be shut down through the regular shutdown process so the 'postStop' callback function will called.
Here is an example of how to define the life-cycle:
.. code-block:: java
import static akka.config.Supervision.*;
getContext().setLifeCycle(permanent()); //permanent() means that the component will always be restarted
getContext().setLifeCycle(temporary()); //temporary() means that the component will not be restarted, but rather shut down normally
Supervising Untyped Actor
-------------------------
Declarative supervisor configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The Actors supervision can be declaratively defined by creating a Supervisor factory object. Here is an example:
.. code-block:: java
import static akka.config.Supervision.*;
import static akka.actor.Actors.*;
Supervisor supervisor = Supervisor.apply(
new SupervisorConfig(
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
new Supervise[] {
new Supervise(
actorOf(MyActor1.class),
permanent()),
new Supervise(
actorOf(MyActor2.class),
permanent())
}));
Supervisors created like this are implicitly instantiated and started.
To configure a handler function for when the actor underlying the supervisor receives a MaximumNumberOfRestartsWithinTimeRangeReached message, you can specify
a Procedure2<ActorRef,MaximumNumberOfRestartsWithinTimeRangeReached> when creating the SupervisorConfig. This handler will be called with the ActorRef of the supervisor and the
MaximumNumberOfRestartsWithinTimeRangeReached message.
.. code-block:: java
import static akka.config.Supervision.*;
import static akka.actor.Actors.*;
import akka.event.EventHandler;
import akka.japi.Procedure2;
Procedure2<ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached> handler = new Procedure2<ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached>() {
public void apply(ActorRef ref, MaximumNumberOfRestartsWithinTimeRangeReached max) {
EventHandler.error(ref, max);
}
};
Supervisor supervisor = Supervisor.apply(
new SupervisorConfig(
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
new Supervise[] {
new Supervise(
actorOf(MyActor1.class),
permanent()),
new Supervise(
actorOf(MyActor2.class),
permanent())
}, handler));
You can link and unlink actors from a declaratively defined supervisor using the 'link' and 'unlink' methods:
.. code-block:: java
Supervisor supervisor = Supervisor.apply(...);
supervisor.link(..);
supervisor.unlink(..);
You can also create declarative supervisors through the 'SupervisorFactory' factory object. Use this factory instead of the 'Supervisor' factory object if you want to control instantiation and starting of the Supervisor, if not then it is easier and better to use the 'Supervisor' factory object.
Example usage:
.. code-block:: java
import static akka.config.Supervision.*;
import static akka.actor.Actors.*;
SupervisorFactory factory = new SupervisorFactory(
new SupervisorConfig(
new OneForOneStrategy(new Class[]{Exception.class}, 3, 5000),
new Supervise[] {
new Supervise(
actorOf(MyActor1.class),
permanent()),
new Supervise(
actorOf(MyActor2.class),
temporary())
}));
Then create a new instance our Supervisor and start it up explicitly.
.. code-block:: java
SupervisorFactory supervisor = factory.newInstance();
supervisor.start(); // start up all managed servers
Declaratively define actors as remote services
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can expose your actors as remote services by specifying the registerAsRemote to **true** in Supervise.
Here is an example:
.. code-block:: java
import static akka.config.Supervision.*;
import static akka.actor.Actors.*;
Supervisor supervisor = Supervisor.apply(
new SupervisorConfig(
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
new Supervise[] {
new Supervise(
actorOf(MyActor1.class),
permanent(),
true)
}));
Programmatic linking and supervision of Untyped Actors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Untyped Actors can at runtime create, spawn, link and supervise other actors. Linking and unlinking is done using one of the 'link' and 'unlink' methods available in the 'ActorRef' (therefore prefixed with getContext() in these examples).
Here is the API and how to use it from within an 'Actor':
.. code-block:: java
// link and unlink actors
getContext().link(actorRef);
getContext().unlink(actorRef);
// starts and links Actors atomically
getContext().link(actorRef);
// spawns (creates and starts) actors
getContext().spawn(MyActor.class);
// spawns and links Actors atomically
getContext().spawnLink(MyActor.class);
A child actor can tell the supervising actor to unlink him by sending him the 'Unlink(this)' message. When the supervisor receives the message he will unlink and shut down the child. The supervisor for an actor is available in the 'supervisor: Option[Actor]' method in the 'ActorRef' class. Here is how it can be used.
.. code-block:: java
ActorRef supervisor = getContext().getSupervisor();
if (supervisor != null) supervisor.tell(new Unlink(getContext()))
The supervising actor's side of things
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If a linked Actor is failing and throws an exception then an new Exit(deadActor, cause) message will be sent to the supervisor (however you should never try to catch this message in your own message handler, it is managed by the runtime).
The supervising Actor also needs to define a fault handler that defines the restart strategy the Actor should accommodate when it traps an Exit message. This is done by setting the setFaultHandler method.
The different options are:
- AllForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
- trapExit is an Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
- OneForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
- trapExit is an Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
Here is an example:
.. code-block:: java
getContext().setFaultHandler(new AllForOneStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000));
Putting all this together it can look something like this:
.. code-block:: java
class MySupervisor extends UntypedActor {
public MySupervisor() {
getContext().setFaultHandler(new AllForOneStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000));
}
public void onReceive(Object message) throws Exception {
if (message instanceof Register) {
Register event = (Register)message;
UntypedActorRef actor = event.getActor();
context.link(actor);
} else throw new IllegalArgumentException("Unknown message: " + message);
}
}
You can also link an actor from outside the supervisor like this:
.. code-block:: java
UntypedActor supervisor = Actors.registry().actorsFor(MySupervisor.class])[0];
supervisor.link(actorRef);
The supervised actor's side of things
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The supervised actor needs to define a life-cycle. This is done by setting the lifeCycle field as follows:
.. code-block:: java
import static akka.config.Supervision.*;
getContext().setLifeCycle(permanent()); // Permanent or Temporary
Default is 'Permanent' so if you don't set the life-cycle then that is what you get.
In the supervised Actor you can override the preRestart and postRestart callback methods to add hooks into the restart process. These methods take the reason for the failure, e.g. the exception that caused termination and restart of the actor as argument. It is in these methods that **you** have to add code to do cleanup before termination and initialization after restart. Here is an example:
.. code-block:: java
class FaultTolerantService extends UntypedActor {
@Override
public void preRestart(Throwable reason) {
... // clean up before restart
}
@Override
public void postRestart(Throwable reason) {
... // reinit stable state after restart
}
}
Reply to initial senders
^^^^^^^^^^^^^^^^^^^^^^^^
Supervised actors have the option to reply to the initial sender within preRestart, postRestart and postStop. A reply within these methods is possible after receive has thrown an exception. When receive returns normally it is expected that any necessary reply has already been done within receive. Here's an example.
.. code-block:: java
public class FaultTolerantService extends UntypedActor {
public void onReceive(Object msg) {
// do something that may throw an exception
// ...
getContext().tryReply("ok");
}
@Override
public void preRestart(Throwable reason) {
getContext().tryReply(reason.getMessage());
}
@Override
public void postStop() {
getContext().tryReply("stopped by supervisor");
}
}
- A reply within preRestart or postRestart must be a safe reply via getContext().tryReply() because a getContext().reply() will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies.
- A reply within postStop must be a safe reply via getContext().tryReply() because a getContext().reply() will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all).
Handling too many actor restarts within a specific time limit
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you remember, when you define the 'RestartStrategy' you also defined maximum number of restart retries within time in millis.
.. code-block:: java
new AllForOneStrategy( // FaultHandlingStrategy policy (AllForOneStrategy or OneForOneStrategy)
new Class[]{MyException.class, IOException.class}, //What types of errors will be handled
3, // maximum number of restart retries
5000 // within time in millis
);
Now, what happens if this limit is reached?
What will happen is that the failing actor will send a system message to its supervisor called 'MaximumNumberOfRestartsWithinTimeRangeReached' with the following these properties:
- victim: ActorRef
- maxNrOfRetries: int
- withinTimeRange: int
- lastExceptionCausingRestart: Throwable
If you want to be able to take action upon this event (highly recommended) then you have to create a message handle for it in the supervisor.
Here is an example:
.. code-block:: java
public class SampleUntypedActorSupervisor extends UntypedActor {
...
public void onReceive(Object message) throws Exception {
if (message instanceof MaximumNumberOfRestartsWithinTimeRangeReached) {
MaximumNumberOfRestartsWithinTimeRangeReached event = (MaximumNumberOfRestartsWithinTimeRangeReached)message;
... = event.getVictim();
... = event.getMaxNrOfRetries();
... = event.getWithinTimeRange();
... = event.getLastExceptionCausingRestart();
} else throw new IllegalArgumentException("Unknown message: " + message);
}
}
You will also get this log warning similar to this:
.. code-block:: console
WAR [20100715-14:05:25.821] actor: Maximum number of restarts [5] within time range [5000] reached.
WAR [20100715-14:05:25.821] actor: Will *not* restart actor [Actor[akka.actor.SupervisorHierarchySpec$CountDownActor:1279195525812]] anymore.
WAR [20100715-14:05:25.821] actor: Last exception causing restart was [akka.actor.SupervisorHierarchySpec$FireWorkerException: Fire the worker!].
If you don't define a message handler for this message then you don't get an error but the message is simply not sent to the supervisor. Instead you will get a log warning.
Supervising Typed Actors
------------------------
Declarative supervisor configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To configure Typed Actors for supervision you have to consult the TypedActorConfigurator and its configure method. This method takes a RestartStrategy and an array of Component definitions defining the Typed Actors and their LifeCycle. Finally you call the supervise method to start everything up. The Java configuration elements reside in the akka.config.JavaConfig class and need to be imported statically.
Here is an example:
.. code-block:: java
import static akka.config.Supervision.*;
import static akka.config.SupervisorConfig.*;
TypedActorConfigurator manager = new TypedActorConfigurator();
manager.configure(
new AllForOneStrategy(new Class[]{Exception.class}, 3, 1000),
new SuperviseTypedActor[] {
new SuperviseTypedActor(
Foo.class,
FooImpl.class,
temporary(),
1000),
new SuperviseTypedActor(
Bar.class,
BarImpl.class,
permanent(),
1000)
}).supervise();
Then you can retrieve the Typed Actor as follows:
.. code-block:: java
Foo foo = (Foo) manager.getInstance(Foo.class);
Restart callbacks
^^^^^^^^^^^^^^^^^
In the supervised TypedActor you can override the preRestart and postRestart callback methods to add hooks into the restart process. These methods take the reason for the failure, e.g. the exception that caused termination and restart of the actor as argument. It is in these methods that **you** have to add code to do cleanup before termination and initialization after restart. Here is an example:
.. code-block:: java
class FaultTolerantService extends TypedActor {
@Override
public void preRestart(Throwable reason) {
... // clean up before restart
}
@Override
public void postRestart(Throwable reason) {
... // reinit stable state after restart
}
}
Programatic linking and supervision of TypedActors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypedActors can be linked and unlinked just like UntypedActors:
.. code-block:: java
TypedActor.link(supervisor, supervised);
TypedActor.unlink(supervisor, supervised);
If the parent TypedActor (supervisor) wants to be able to do handle failing child TypedActors, e.g. be able restart the linked TypedActor according to a given fault handling scheme then it has to set its trapExit flag to an array of Exceptions that it wants to be able to trap:
.. code-block:: java
TypedActor.faultHandler(supervisor, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000));
For convenience there is an overloaded link that takes trapExit and faultHandler for the supervisor as arguments. Here is an example:
.. code-block:: java
import static akka.actor.TypedActor.*;
import static akka.config.Supervision.*;
foo = newInstance(Foo.class, FooImpl.class, 1000);
bar = newInstance(Bar.class, BarImpl.class, 1000);
link(foo, bar, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000));
// alternative: chaining
bar = faultHandler(foo, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000)).newInstance(Bar.class, 1000);
link(foo, bar);
REWRITE ME

View file

@ -180,9 +180,8 @@ The messages that it prevents are all that extends 'LifeCycleMessage':
It also prevents the client from invoking any life-cycle and side-effecting methods, such as:
* start
* stop
* link
* unlink
* spawnLink
* startsMonitoring
* stopsMonitoring
* etc.
Using secure cookie for remote client authentication

View file

@ -7,449 +7,4 @@ Fault Tolerance Through Supervisor Hierarchies (Scala)
.. contents:: :local:
Module stability: **SOLID**
The "let it crash" approach to fault/error handling, implemented by linking actors, is very different to what Java and most non-concurrency oriented languages/frameworks have adopted. It's a way of dealing with failure that is designed for concurrent and distributed systems.
Concurrency
-----------
Throwing an exception in concurrent code (let's assume we are using non-linked actors), will just simply blow up the thread that currently executes the actor.
- There is no way to find out that things went wrong (apart from inspecting the stack trace).
- There is nothing you can do about it.
Here actors provide a clean way of getting notification of the error and do something about it.
Linking actors also allow you to create sets of actors where you can be sure that either:
- All are dead
- None are dead
This is very useful when you have thousands of concurrent actors. Some actors might have implicit dependencies and together implement a service, computation, user session etc.
It encourages non-defensive programming. Don't try to prevent things from go wrong, because they will, whether you want it or not. Instead; expect failure as a natural state in the life-cycle of your app, crash early and let someone else (that sees the whole picture), deal with it.
Distributed actors
------------------
You can't build a fault-tolerant system with just one single box - you need at least two. Also, you (usually) need to know if one box is down and/or the service you are talking to on the other box is down. Here actor supervision/linking is a critical tool for not only monitoring the health of remote services, but to actually manage the service, do something about the problem if the actor or node is down. Such as restarting actors on the same node or on another node.
In short, it is a very different way of thinking, but a way that is very useful (if not critical) to building fault-tolerant highly concurrent and distributed applications, which is as valid if you are writing applications for the JVM or the Erlang VM (the origin of the idea of "let-it-crash" and actor supervision).
Supervision
-----------
Supervisor hierarchies originate from `Erlang's OTP framework <http://www.erlang.org/doc/design_principles/sup_princ.html#5>`_.
A supervisor is responsible for starting, stopping and monitoring its child processes. The basic idea of a supervisor is that it should keep its child processes alive by restarting them when necessary. This makes for a completely different view on how to write fault-tolerant servers. Instead of trying all things possible to prevent an error from happening, this approach embraces failure. It shifts the view to look at errors as something natural and something that **will** happen, instead of trying to prevent it; embraces it. Just "Let It Crash", since the components will be reset to a stable state and restarted upon failure.
Akka has two different restart strategies; All-For-One and One-For-One. Best explained using some pictures (referenced from `erlang.org <http://erlang.org>`_ ):
OneForOne
^^^^^^^^^
The OneForOne fault handler will restart only the component that has crashed.
`<image:http://www.erlang.org/doc/design_principles/sup4.gif>`_
AllForOne
^^^^^^^^^
The AllForOne fault handler will restart all the components that the supervisor is managing, including the one that have crashed. This strategy should be used when you have a certain set of components that are coupled in some way that if one is crashing they all need to be reset to a stable state before continuing.
`<image:http://www.erlang.org/doc/design_principles/sup5.gif>`_
Restart callbacks
^^^^^^^^^^^^^^^^^
There are two different callbacks that the Typed Actor and Actor can hook in to:
* Pre restart
* Post restart
These are called prior to and after the restart upon failure and can be used to clean up and reset/reinitialize state upon restart. This is important in order to reset the component failure and leave the component in a fresh and stable state before consuming further messages.
Defining a supervisor's restart strategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Both the Typed Actor supervisor configuration and the Actor supervisor configuration take a 'FaultHandlingStrategy' instance which defines the fault management. The different strategies are:
* AllForOne
* OneForOne
These have the semantics outlined in the section above.
Here is an example of how to define a restart strategy:
.. code-block:: scala
AllForOneStrategy( //FaultHandlingStrategy; AllForOneStrategy or OneForOneStrategy
List(classOf[Exception]), //What exceptions will be handled
3, // maximum number of restart retries
5000 // within time in millis
)
Defining actor life-cycle
^^^^^^^^^^^^^^^^^^^^^^^^^
The other common configuration element is the "LifeCycle' which defines the life-cycle. The supervised actor can define one of two different life-cycle configurations:
* Permanent: which means that the actor will always be restarted.
* Temporary: which means that the actor will **not** be restarted, but it will be shut down through the regular shutdown process so the 'postStop' callback function will called.
Here is an example of how to define the life-cycle:
.. code-block:: scala
Permanent // means that the component will always be restarted
Temporary // means that it will not be restarted, but it will be shut
// down through the regular shutdown process so the 'postStop' hook will called
Supervising Actors
------------------
Declarative supervisor configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The Actor's supervision can be declaratively defined by creating a "Supervisor' factory object. Here is an example:
.. code-block:: scala
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(
actorOf[MyActor1],
Permanent) ::
Supervise(
actorOf[MyActor2],
Permanent) ::
Nil))
Supervisors created like this are implicitly instantiated and started.
To configure a handler function for when the actor underlying the supervisor receives a MaximumNumberOfRestartsWithinTimeRangeReached message, you can specify a function of type
(ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit when creating the SupervisorConfig. This handler will be called with the ActorRef of the supervisor and the
MaximumNumberOfRestartsWithinTimeRangeReached message.
.. code-block:: scala
val handler = {
(supervisor:ActorRef,max:MaximumNumberOfRestartsWithinTimeRangeReached) => EventHandler.notify(supervisor,max)
}
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(
actorOf[MyActor1],
Permanent) ::
Supervise(
actorOf[MyActor2],
Permanent) ::
Nil), handler)
You can link and unlink actors from a declaratively defined supervisor using the 'link' and 'unlink' methods:
.. code-block:: scala
val supervisor = Supervisor(...)
supervisor.link(..)
supervisor.unlink(..)
You can also create declarative supervisors through the 'SupervisorFactory' factory object. Use this factory instead of the 'Supervisor' factory object if you want to control instantiation and starting of the Supervisor, if not then it is easier and better to use the 'Supervisor' factory object.
Example usage:
.. code-block:: scala
val factory = SupervisorFactory(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 10),
Supervise(
myFirstActor,
Permanent) ::
Supervise(
mySecondActor,
Permanent) ::
Nil))
Then create a new instance our Supervisor and start it up explicitly.
.. code-block:: scala
val supervisor = factory.newInstance
supervisor.start // start up all managed servers
Declaratively define actors as remote services
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can declaratively define an actor to be available as a remote actor by specifying **true** for registerAsRemoteService.
Here is an example:
.. code-block:: scala
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(
actorOf[MyActor1],
Permanent,
**true**)
:: Nil))
Programmatic linking and supervision of Actors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Actors can at runtime create, spawn, link and supervise other actors. Linking and unlinking is done using one of the 'link' and 'unlink' methods available in the 'ActorRef' (therefore prefixed with 'self' in these examples).
Here is the API and how to use it from within an 'Actor':
.. code-block:: scala
// link and unlink actors
self.link(actorRef)
self.unlink(actorRef)
// link first, then start actor
self.link(actorRef)
// spawns (creates and starts) actors
self.spawn[MyActor]
self.spawnRemote[MyActor]
// spawns and links Actors atomically
self.spawnLink[MyActor]
self.spawnLinkRemote[MyActor]
A child actor can tell the supervising actor to unlink him by sending him the 'Unlink(this)' message. When the supervisor receives the message he will unlink and shut down the child. The supervisor for an actor is available in the 'supervisor: Option[Actor]' method in the 'ActorRef' class. Here is how it can be used.
.. code-block:: scala
if (supervisor.isDefined) supervisor.get ! Unlink(self)
// Or shorter using 'foreach':
supervisor.foreach(_ ! Unlink(self))
The supervising actor's side of things
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If a linked Actor is failing and throws an exception then an "Exit(deadActor, cause)' message will be sent to the supervisor (however you should never try to catch this message in your own message handler, it is managed by the runtime).
The supervising Actor also needs to define a fault handler that defines the restart strategy the Actor should accommodate when it traps an "Exit' message. This is done by setting the "faultHandler' field.
.. code-block:: scala
protected var faultHandler: FaultHandlingStrategy
The different options are:
- AllForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
- trapExit is a List or Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
- OneForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
- trapExit is a List or Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
Here is an example:
.. code-block:: scala
self.faultHandler = AllForOneStrategy(List(classOf[Throwable]), 3, 1000)
Putting all this together it can look something like this:
.. code-block:: scala
class MySupervisor extends Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
def receive = {
case Register(actor) =>
self.link(actor)
}
}
You can also link an actor from outside the supervisor like this:
.. code-block:: scala
val supervisor = Actor.registry.actorsFor(classOf[MySupervisor]).head
supervisor.link(actor)
The supervised actor's side of things
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The supervised actor needs to define a life-cycle. This is done by setting the lifeCycle field as follows:
.. code-block:: scala
self.lifeCycle = Permanent // Permanent or Temporary or UndefinedLifeCycle
In the supervised Actor you can override the "preRestart' and "postRestart' callback methods to add hooks into the restart process. These methods take the reason for the failure, e.g. the exception that caused termination and restart of the actor as argument. It is in these methods that **you** have to add code to do cleanup before termination and initialization after restart. Here is an example:
.. code-block:: scala
class FaultTolerantService extends Actor {
override def preRestart(reason: Throwable) {
... // clean up before restart
}
override def postRestart(reason: Throwable) {
... // reinit stable state after restart
}
}
Reply to initial senders
^^^^^^^^^^^^^^^^^^^^^^^^
Supervised actors have the option to reply to the initial sender within preRestart, postRestart and postStop. A reply within these methods is possible after receive has thrown an exception. When receive returns normally it is expected that any necessary reply has already been done within receive. Here's an example.
.. code-block:: scala
class FaultTolerantService extends Actor {
def receive = {
case msg => {
// do something that may throw an exception
// ...
self.reply("ok")
}
}
override def preRestart(reason: scala.Throwable) {
self.tryReply(reason.getMessage)
}
override def postStop() {
self.tryReply("stopped by supervisor")
}
}
- A reply within preRestart or postRestart must be a safe reply via `self.tryReply` because an unsafe self.reply will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies.
- A reply within postStop must be a safe reply via `self.tryReply` because an unsafe self.reply will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all).
Handling too many actor restarts within a specific time limit
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you remember, when you define the 'RestartStrategy' you also defined maximum number of restart retries within time in millis.
.. code-block:: scala
AllForOneStrategy( //Restart policy, AllForOneStrategy or OneForOneStrategy
List(classOf[Exception]), //What kinds of exception it will handle
3, // maximum number of restart retries
5000 // within time in millis
)
Now, what happens if this limit is reached?
What will happen is that the failing actor will send a system message to its supervisor called 'MaximumNumberOfRestartsWithinTimeRangeReached' with the following signature:
.. code-block:: scala
case class MaximumNumberOfRestartsWithinTimeRangeReached(
victim: ActorRef, maxNrOfRetries: Int, withinTimeRange: Int, lastExceptionCausingRestart: Throwable)
If you want to be able to take action upon this event (highly recommended) then you have to create a message handle for it in the supervisor.
Here is an example:
.. code-block:: scala
val supervisor = actorOf(new Actor{
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
protected def receive = {
case MaximumNumberOfRestartsWithinTimeRangeReached(
victimActorRef, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) =>
... // handle the error situation
}
})
You will also get this log warning similar to this:
.. code-block:: console
WAR [20100715-14:05:25.821] actor: Maximum number of restarts [5] within time range [5000] reached.
WAR [20100715-14:05:25.821] actor: Will *not* restart actor [Actor[akka.actor.SupervisorHierarchySpec$CountDownActor:1279195525812]] anymore.
WAR [20100715-14:05:25.821] actor: Last exception causing restart was [akka.actor.SupervisorHierarchySpec$FireWorkerException: Fire the worker!].
If you don't define a message handler for this message then you don't get an error but the message is simply not sent to the supervisor. Instead you will get a log warning.
Supervising Typed Actors
------------------------
Declarative supervisor configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To configure Typed Actors for supervision you have to consult the "TypedActorConfigurator' and its "configure' method. This method takes a "RestartStrategy' and an array of "Component' definitions defining the Typed Actors and their "LifeCycle'. Finally you call the "supervise' method to start everything up. The configuration elements reside in the "akka.config.JavaConfig' class and need to be imported statically.
Here is an example:
.. code-block:: scala
import akka.config.Supervision._
val manager = new TypedActorConfigurator
manager.configure(
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
List(
SuperviseTypedActor(
Foo.class,
FooImpl.class,
Permanent,
1000),
new SuperviseTypedActor(
Bar.class,
BarImpl.class,
Permanent,
1000)
)).supervise
Then you can retrieve the Typed Actor as follows:
.. code-block:: java
Foo foo = manager.getInstance(classOf[Foo])
Restart callbacks
^^^^^^^^^^^^^^^^^
Programatic linking and supervision of TypedActors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypedActors can be linked and unlinked just like actors - in fact the linking is done on the underlying actor:
.. code-block:: scala
TypedActor.link(supervisor, supervised)
TypedActor.unlink(supervisor, supervised)
If the parent TypedActor (supervisor) wants to be able to do handle failing child TypedActors, e.g. be able restart the linked TypedActor according to a given fault handling scheme then it has to set its 'trapExit' flag to an array of Exceptions that it wants to be able to trap:
.. code-block:: scala
TypedActor.faultHandler(supervisor, AllForOneStrategy(Array(classOf[IOException]), 3, 2000))
For convenience there is an overloaded link that takes trapExit and faultHandler for the supervisor as arguments. Here is an example:
.. code-block:: scala
import akka.actor.TypedActor._
val foo = newInstance(classOf[Foo], 1000)
val bar = newInstance(classOf[Bar], 1000)
link(foo, bar, new AllForOneStrategy(Array(classOf[IOException]), 3, 2000))
// alternative: chaining
bar = faultHandler(foo, new AllForOneStrategy(Array(classOf[IOException]), 3, 2000))
.newInstance(Bar.class, 1000)
link(foo, bar
REWRITE ME

View file

@ -182,9 +182,8 @@ The messages that it prevents are all that extends 'LifeCycleMessage':
It also prevents the client from invoking any life-cycle and side-effecting methods, such as:
* start
* stop
* link
* unlink
* spawnLink
* startsMonitoring
* stopsMonitoring
* etc.
Using secure cookie for remote client authentication

View file

@ -742,7 +742,7 @@ options:
* *Logging of the actor lifecycle*
Actor creation, start, restart, link, unlink and stop may be traced by
Actor creation, start, restart, monitor start, monitor stop and stop may be traced by
enabling the setting ``akka.actor.debug.lifecycle``; this, too, is enabled
uniformly on all actors.

View file

@ -5,574 +5,4 @@ Tutorial: write a scalable, fault-tolerant, network chat server and client (Scal
.. contents:: :local:
Introduction
------------
`Tutorial source code <https://github.com/jboner/akka/blob/master/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala>`_.
Writing correct concurrent, fault-tolerant and scalable applications is too hard. Most of the time it's because we are using the wrong tools and the wrong level of abstraction.
`Akka <http://akka.io>`_ is an attempt to change that.
Akka uses the Actor Model together with Software Transactional Memory to raise the abstraction level and provide a better platform to build correct concurrent and scalable applications.
For fault-tolerance Akka adopts the "Let it crash", also called "Embrace failure", model which has been used with great success in the telecom industry to build applications that self-heal, systems that never stop.
Actors also provides the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.
Akka is Open Source and available under the Apache 2 License.
In this article we will introduce you to Akka and see how we can utilize it to build a highly concurrent, scalable and fault-tolerant network server.
But first let's take a step back and discuss what Actors really are and what they are useful for.
Actors
------
`The Actor Model <http://en.wikipedia.org/wiki/Actor_model>`_ provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management. It makes it easier to write correct concurrent and parallel systems. Actors are really nothing new, they were defined in the 1963 paper by Carl Hewitt and have been popularized by the Erlang language which emerged in the mid 80s. It has been used by for example at Ericsson with great success to build highly concurrent and extremely reliable (99.9999999 % availability - 31 ms/year downtime) telecom systems.
Actors encapsulate state and behavior into a lightweight process/thread. In a sense they are like OO objects but with a major semantic difference; they *do not* share state with any other Actor. Each Actor has its own view of the world and can only have impact on other Actors by sending messages to them. Messages are sent asynchronously and non-blocking in a so-called "fire-and-forget" manner where the Actor sends off a message to some other Actor and then do not wait for a reply but goes off doing other things or are suspended by the runtime. Each Actor has a mailbox (ordered message queue) in which incoming messages are processed one by one. Since all processing is done asynchronously and Actors do not block and consume any resources while waiting for messages, Actors tend to give very good concurrency and scalability characteristics and are excellent for building event-based systems.
Creating Actors
---------------
Akka has both a Scala API (:ref:`actors-scala`) and a Java API (:ref:`untyped-actors-java`). In this article we will only look at the Scala API since that is the most expressive one. The article assumes some basic Scala knowledge, but even if you don't know Scala I don't think it will not be too hard to follow along anyway.
Akka has adopted the same style of writing Actors as Erlang in which each Actor has an explicit message handler which does pattern matching to match on the incoming messages.
Actors can be created either by:
* Extending the 'Actor' class and implementing the 'receive' method.
* Create an anonymous Actor using one of the 'actor' methods.
Here is a little example before we dive into a more interesting one.
.. code-block:: scala
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
case "test" => println("received test")
case _ => println("received unknown message")
}
}
val myActor = Actor.actorOf[MyActor]
From this call we get a handle to the 'Actor' called 'ActorRef', which we can use to interact with the Actor
The 'actorOf' factory method can be imported like this:
.. code-block:: scala
import akka.actor.Actor.actorOf
val a = actorOf[MyActor]
From now on we will assume that it is imported like this and can use it directly.
Akka Actors are extremely lightweight. Each Actor consume ~600 bytes, which means that you can create 6.5 million on 4 GB RAM.
Messages are sent using the '!' operator:
.. code-block:: scala
myActor ! "test"
Sample application
------------------
We will try to write a simple chat/IM system. It is client-server based and uses remote Actors to implement remote clients. Even if it is not likely that you will ever write a chat system I think that it can be a useful exercise since it uses patterns and idioms found in many other use-cases and domains.
We will use many of the features of Akka along the way. In particular; Actors, fault-tolerance using Actor supervision, remote Actors, Software Transactional Memory (STM) and persistence.
Creating an Akka SBT project
----------------------------
First we need to create an SBT project for our tutorial. You do that by stepping into the directory you want to create your project in and invoking the ``sbt`` command answering the questions for setting up your project::
$ sbt
Project does not exist, create new project? (y/N/s) y
Name: Chat
Organization: Hakkers Inc
Version [1.0]:
Scala version [2.9.0]:
sbt version [0.7.6.RC0]:
Add the Akka SBT plugin definition to your SBT project by creating a ``Plugins.scala`` file in the ``project/plugins`` directory containing::
import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val akkaRepo = "Akka Repo" at "http://akka.io/repository"
val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT"
}
Create a project definition ``project/build/Project.scala`` file containing::
import sbt._
class ChatProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject {
val akkaRepo = "Akka Repo" at "http://akka.io/repository"
val akkaSTM = akkaModule("stm")
val akkaRemote = akkaModule("remote")
}
Make SBT download the dependencies it needs. That is done by invoking::
> reload
> update
From the SBT project you can generate files for your IDE:
- `SbtEclipsify <https://github.com/musk/SbtEclipsify>`_ to generate Eclipse project. Detailed instructions are available in :ref:`getting-started-first-scala-eclipse`.
- `sbt-idea <https://github.com/mpeltonen/sbt-idea>`_ to generate IntelliJ IDEA project.
Creating messages
-----------------
Let's start by defining the messages that will flow in our system. It is very important that all messages that will be sent around in the system are immutable. The Actor model relies on the simple fact that no state is shared between Actors and the only way to guarantee that is to make sure we don't pass mutable state around as part of the messages.
In Scala we have something called `case classes <http://www.scala-lang.org/node/107>`_. These make excellent messages since they are both immutable and great to pattern match on.
Let's now start by creating the messages that will flow in our system.
.. code-block:: scala
sealed trait Event
case class Login(user: String) extends Event
case class Logout(user: String) extends Event
case class GetChatLog(from: String) extends Event
case class ChatLog(log: List[String]) extends Event
case class ChatMessage(from: String, message: String) extends Event
As you can see with these messages we can log in and out, send a chat message and ask for and get a reply with all the messages in the chat log so far.
Client: Sending messages
------------------------
Our client wraps each message send in a function, making it a bit easier to use. Here we assume that we have a reference to the chat service so we can communicate with it by sending messages. Messages are sent with the '!' operator (pronounced "bang"). This sends a message of asynchronously and do not wait for a reply.
Sometimes however, there is a need for sequential logic, sending a message and
wait for the reply before doing anything else. In Akka we can achieve that
using the '?' operator. When sending a message with '?' we get back a `Future
<http://en.wikipedia.org/wiki/Futures_and_promises>`_. A 'Future' is a promise
that we will get a result later but with the difference from regular method
dispatch that the OS thread we are running on is put to sleep while waiting and
that we can set a time-out for how long we wait before bailing out, retrying or
doing something else. This waiting is achieved with the :meth:`Future.as[T]`
method, which returns a `scala.Option
<http://www.codecommit.com/blog/scala/the-option-pattern>`_ which implements
the `Null Object pattern <http://en.wikipedia.org/wiki/Null_Object_pattern>`_.
It has two subclasses; 'None' which means no result and 'Some(value)' which
means that we got a reply. The 'Option' class has a lot of great methods to
work with the case of not getting a defined result. F.e. as you can see below
we are using the 'getOrElse' method which will try to return the result and if
there is no result defined invoke the "...OrElse" statement.
.. code-block:: scala
class ChatClient(val name: String) {
val chat = Actor.remote.actorFor("chat:service", "localhost", 2552)
def login = chat ! Login(name)
def logout = chat ! Logout(name)
def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
def chatLog = (chat ? GetChatLog(name)).as[ChatLog]
.getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
}
As you can see, we are using the 'Actor.remote.actorFor' to lookup the chat server on the remote node. From this call we will get a handle to the remote instance and can use it as it is local.
Session: Receiving messages
---------------------------
Now we are done with the client side and let's dig into the server code. We start by creating a user session. The session is an Actor and is defined by extending the 'Actor' trait. This trait has one abstract method that we have to define; 'receive' which implements the message handler for the Actor.
In our example the session has state in the form of a 'List' with all the messages sent by the user during the session. In takes two parameters in its constructor; the user name and a reference to an Actor implementing the persistent message storage. For both of the messages it responds to, 'ChatMessage' and 'GetChatLog', it passes them on to the storage Actor.
If you look closely (in the code below) you will see that when passing on the 'GetChatLog' message we are not using '!' but 'forward'. This is similar to '!' but with the important difference that it passes the original sender reference, in this case to the storage Actor. This means that the storage can use this reference to reply to the original sender (our client) directly.
.. code-block:: scala
class Session(user: String, storage: ActorRef) extends Actor {
private val loginTime = System.currentTimeMillis
private var userLog: List[String] = Nil
EventHandler.info(this, "New session for user [%s] has been created at [%s]".format(user, loginTime))
def receive = {
case msg @ ChatMessage(from, message) =>
userLog ::= message
storage ! msg
case msg @ GetChatLog(_) =>
storage forward msg
}
}
Let it crash: Implementing fault-tolerance
------------------------------------------
Akka's `approach to fault-tolerance <fault-tolerance>`_; the "let it crash" model, is implemented by linking Actors. It is very different to what Java and most non-concurrency oriented languages/frameworks have adopted. Its a way of dealing with failure that is designed for concurrent and distributed systems.
If we look at concurrency first. Now lets assume we are using non-linked Actors. Throwing an exception in concurrent code, will just simply blow up the thread that currently executes the Actor. There is no way to find out that things went wrong (apart from see the stack trace in the log). There is nothing you can do about it. Here linked Actors provide a clean way of both getting notification of the error so you know what happened, as well as the Actor that crashed, so you can do something about it.
Linking Actors allow you to create sets of Actors where you can be sure that either:
* All are dead
* All are alive
This is very useful when you have hundreds of thousands of concurrent Actors. Some Actors might have implicit dependencies and together implement a service, computation, user session etc. for these being able to group them is very nice.
Akka encourages non-defensive programming. Dont try to prevent things from go wrong, because they will, whether you want it or not. Instead; expect failure as a natural state in the life-cycle of your app, crash early and let someone else (that sees the whole picture), deal with it.
Now lets look at distributed Actors. As you probably know, you cant build a fault-tolerant system with just one single node, but you need at least two. Also, you (usually) need to know if one node is down and/or the service you are talking to on the other node is down. Here Actor supervision/linking is a critical tool for not only monitoring the health of remote services, but to actually manage the service, do something about the problem if the Actor or node is down. This could be restarting him on the same node or on another node.
To sum things up, it is a very different way of thinking but a way that is very useful (if not critical) to building fault-tolerant highly concurrent and distributed applications.
Supervisor hierarchies
----------------------
A supervisor is a regular Actor that is responsible for starting, stopping and monitoring its child Actors. The basic idea of a supervisor is that it should keep its child Actors alive by restarting them when necessary. This makes for a completely different view on how to write fault-tolerant servers. Instead of trying all things possible to prevent an error from happening, this approach embraces failure. It shifts the view to look at errors as something natural and something that will happen and instead of trying to prevent it; embrace it. Just "let it crash" and reset the service to a stable state through restart.
Akka has two different restart strategies; All-For-One and One-For-One.
* OneForOne: Restart only the component that has crashed.
* AllForOne: Restart all the components that the supervisor is managing, including the one that have crashed.
The latter strategy should be used when you have a certain set of components that are coupled in some way that if one is crashing they all need to be reset to a stable state before continuing.
Chat server: Supervision, Traits and more
-----------------------------------------
There are two ways you can define an Actor to be a supervisor; declaratively and dynamically. In this example we use the dynamic approach. There are two things we have to do:
* Define the fault handler by setting the 'faultHandler' member field to the strategy we want.
* Define the exceptions we want to "trap", e.g. which exceptions should be handled according to the fault handling strategy we have defined. This in done by setting the 'trapExit' member field to a 'List' with all exceptions we want to trap.
The last thing we have to do to supervise Actors (in our example the storage Actor) is to 'link' the Actor. Invoking 'link(actor)' will create a link between the Actor passed as argument into 'link' and ourselves. This means that we will now get a notification if the linked Actor is crashing and if the cause of the crash, the exception, matches one of the exceptions in our 'trapExit' list then the crashed Actor is restarted according the the fault handling strategy defined in our 'faultHandler'. We also have the 'unlink(actor)' function which disconnects the linked Actor from the supervisor.
In our example we are using a method called 'spawnLink(actor)' which creates, starts and links the Actor in an atomic operation. The linking and unlinking is done in 'preStart' and 'postStop' callback methods which are invoked by the runtime when the Actor is started and shut down (shutting down is done by invoking 'actor.stop()'). In these methods we initialize our Actor, by starting and linking the storage Actor and clean up after ourselves by shutting down all the user session Actors and the storage Actor.
That is it. Now we have implemented the supervising part of the fault-tolerance for the storage Actor. But before we dive into the 'ChatServer' code there are some more things worth mentioning about its implementation.
It defines an abstract member field holding the 'ChatStorage' implementation the server wants to use. We do not define that in the 'ChatServer' directly since we want to decouple it from the actual storage implementation.
The 'ChatServer' is a 'trait', which is Scala's version of mixins. A mixin can be seen as an interface with an implementation and is a very powerful tool in Object-Oriented design that makes it possible to design the system into small, reusable, highly cohesive, loosely coupled parts that can be composed into larger object and components structures.
I'll try to show you how we can make use Scala's mixins to decouple the Actor implementation from the business logic of managing the user sessions, routing the chat messages and storing them in the persistent storage. Each of these separate parts of the server logic will be represented by its own trait; giving us four different isolated mixins; 'Actor', 'SessionManagement', 'ChatManagement' and 'ChatStorageFactory' This will give us as loosely coupled system with high cohesion and reusability. At the end of the article I'll show you how you can compose these mixins into a the complete runtime component we like.
.. code-block:: scala
/**
* Chat server. Manages sessions and redirects all other messages to the Session for the client.
*/
trait ChatServer extends Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000)
val storage: ActorRef
EventHandler.info(this, "Chat server is starting up...")
// actor message handler
def receive: Receive = sessionManagement orElse chatManagement
// abstract methods to be defined somewhere else
protected def chatManagement: Receive
protected def sessionManagement: Receive
protected def shutdownSessions(): Unit
override def postStop() = {
EventHandler.info(this, "Chat server is shutting down...")
shutdownSessions
self.unlink(storage)
storage.stop()
}
}
If you look at the 'receive' message handler function you can see that we have defined it but instead of adding our logic there we are delegating to two different functions; 'sessionManagement' and 'chatManagement', chaining them with 'orElse'. These two functions are defined as abstract in our 'ChatServer' which means that they have to be provided by some another mixin or class when we instantiate our 'ChatServer'. Naturally we will put the 'sessionManagement' implementation in the 'SessionManagement' trait and the 'chatManagement' implementation in the 'ChatManagement' trait. First let's create the 'SessionManagement' trait.
Chaining partial functions like this is a great way of composing functionality in Actors. You can for example put define one default message handle handling generic messages in the base Actor and then let deriving Actors extend that functionality by defining additional message handlers. There is a section on how that is done `here <actors>`_.
Session management
------------------
The session management is defined in the 'SessionManagement' trait in which we implement the two abstract methods in the 'ChatServer'; 'sessionManagement' and 'shutdownSessions'.
The 'SessionManagement' trait holds a 'HashMap' with all the session Actors mapped by user name as well as a reference to the storage (to be able to pass it in to each newly created 'Session').
The 'sessionManagement' function performs session management by responding to the 'Login' and 'Logout' messages. For each 'Login' message it creates a new 'Session' Actor, starts it and puts it in the 'sessions' Map and for each 'Logout' message it does the opposite; shuts down the user's session and removes it from the 'sessions' Map.
The 'shutdownSessions' function simply shuts all the sessions Actors down. That completes the user session management.
.. code-block:: scala
/**
* Implements user session management.
* <p/>
* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor.
*/
trait SessionManagement { this: Actor =>
val storage: ActorRef // needs someone to provide the ChatStorage
val sessions = new HashMap[String, ActorRef]
protected def sessionManagement: Receive = {
case Login(username) =>
EventHandler.info(this, "User [%s] has logged in".format(username))
val session = actorOf(new Session(username, storage))
sessions += (username -> session)
case Logout(username) =>
EventHandler.info(this, "User [%s] has logged out".format(username))
val session = sessions(username)
session.stop()
sessions -= username
}
protected def shutdownSessions =
sessions.foreach { case (_, session) => session.stop() }
}
Chat message management
-----------------------
Chat message management is implemented by the 'ChatManagement' trait. It has an abstract 'HashMap' session member field with all the sessions. Since it is abstract it needs to be mixed in with someone that can provide this reference. If this dependency is not resolved when composing the final component, you will get a compilation error.
It implements the 'chatManagement' function which responds to two different messages; 'ChatMessage' and 'GetChatLog'. It simply gets the session for the user (the sender of the message) and routes the message to this session. Here we also use the 'forward' function to make sure the original sender reference is passed along to allow the end receiver to reply back directly.
.. code-block:: scala
/**
* Implements chat management, e.g. chat message dispatch.
* <p/>
* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor.
*/
trait ChatManagement { this: Actor =>
val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map
protected def chatManagement: Receive = {
case msg @ ChatMessage(from, _) => getSession(from).foreach(_ ! msg)
case msg @ GetChatLog(from) => getSession(from).foreach(_ forward msg)
}
private def getSession(from: String) : Option[ActorRef] = {
if (sessions.contains(from))
Some(sessions(from))
else {
EventHandler.info(this, "Session expired for %s".format(from))
None
}
}
}
Using an Actor as a message broker, as in this example, is a very common pattern with many variations; load-balancing, master/worker, map/reduce, replication, logging etc. It becomes even more useful with remote Actors when we can use it to route messages to different nodes.
STM and Transactors
-------------------
Actors are excellent for solving problems where you have many independent processes that can work in isolation and only interact with other Actors through message passing. This model fits many problems. But the Actor model is unfortunately a terrible model for implementing truly shared state. E.g. when you need to have consensus and a stable view of state across many components. The classic example is the bank account where clients can deposit and withdraw, in which each operation needs to be atomic. For detailed discussion on the topic see this `presentation <http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009>`_.
`Software Transactional Memory <http://en.wikipedia.org/wiki/Software_transactional_memory>`_ (STM) on the other hand is excellent for problems where you need consensus and a stable view of the state by providing compositional transactional shared state. Some of the really nice traits of STM are that transactions compose and that it raises the abstraction level from lock-based concurrency.
Akka has a `STM implementation <stm>`_ that is based on the same ideas as found in the `Clojure language <http://clojure.org/>`_; Managed References working with immutable data.
Akka allows you to combine Actors and STM into what we call `Transactors <transactors>`_ (short for Transactional Actors), these allow you to optionally combine Actors and STM provides IMHO the best of the Actor model (simple concurrency and asynchronous event-based programming) and STM (compositional transactional shared state) by providing transactional, compositional, asynchronous, event-based message flows. You don't need Transactors all the time but when you do need them then you *really need* them.
Akka currently provides three different transactional abstractions; 'Map', 'Vector' and 'Ref'. They can be shared between multiple Actors and they are managed by the STM. You are not allowed to modify them outside a transaction, if you do so, an exception will be thrown.
What you get is transactional memory in which multiple Actors are allowed to read and write to the same memory concurrently and if there is a clash between two transactions then both of them are aborted and retried. Aborting a transaction means that the memory is rolled back to the state it were in when the transaction was started.
In database terms STM gives you 'ACI' semantics; 'Atomicity', 'Consistency' and 'Isolation'. The 'D' in 'ACID'; 'Durability', you can't get with an STM since it is in memory.
It possible to implement durable persistence for the transactional data structures, but in this sample we keep them in memory.
Chat storage: Backed with simple in-memory
------------------------------------------
To keep it simple we implement the persistent storage, with a in-memory Vector, i.e. it will not be persistent. We start by creating a 'ChatStorage' trait allowing us to have multiple different storage backend. For example one in-memory and one persistent.
.. code-block:: scala
/**
* Abstraction of chat storage holding the chat log.
*/
trait ChatStorage extends Actor
Our 'MemoryChatStorage' extends the 'ChatStorage' trait. The only state it holds is the 'chatLog' which is a transactional 'Vector'.
It responds to two different messages; 'ChatMessage' and 'GetChatLog'. The 'ChatMessage' message handler takes the 'message' attribute and appends it to the 'chatLog' vector. Here you can see that we are using the 'atomic { ... }' block to run the vector operation in a transaction. For this in-memory storage it is not important to use a transactional Vector, since it is not shared between actors, but it illustrates the concept.
The 'GetChatLog' message handler retrieves all the messages in the chat log storage inside an atomic block, iterates over them using the 'map' combinator transforming them from 'Array[Byte] to 'String'. Then it invokes the 'reply(message)' function that will send the chat log to the original sender; the 'ChatClient'.
You might remember that the 'ChatServer' was supervising the 'ChatStorage' actor. When we discussed that we showed you the supervising Actor's view. Now is the time for the supervised Actor's side of things. First, a supervised Actor need to define a life-cycle in which it declares if it should be seen as a:
* 'Permanent': which means that the actor will always be restarted.
* 'Temporary': which means that the actor will not be restarted, but it will be shut down through the regular shutdown process so the 'postStop' callback function will called.
We define the 'MemoryChatStorage' as 'Permanent' by setting the 'lifeCycle' member field to 'Permanent'.
The idea with this crash early style of designing your system is that the services should just crash and then they should be restarted and reset into a stable state and continue from there. The definition of "stable state" is domain specific and up to the application developer to define. Akka provides two callback functions; 'preRestart' and 'postRestart' that are called right *before* and right *after* the Actor is restarted. Both of these functions take a 'Throwable', the reason for the crash, as argument. In our case we just need to implement the 'postRestart' hook and there re-initialize the 'chatLog' member field with a fresh 'Vector'.
.. code-block:: scala
/**
* Memory-backed chat storage implementation.
*/
class MemoryChatStorage extends ChatStorage {
self.lifeCycle = Permanent
private var chatLog = TransactionalVector[Array[Byte]]()
EventHandler.info(this, "Memory-based chat storage is starting up...")
def receive = {
case msg @ ChatMessage(from, message) =>
EventHandler.debug(this, "New chat message [%s]".format(message))
atomic { chatLog + message.getBytes("UTF-8") }
case GetChatLog(_) =>
val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
self.reply(ChatLog(messageList))
}
override def postRestart(reason: Throwable) = chatLog = TransactionalVector()
}
The last thing we need to do in terms of persistence is to create a 'MemoryChatStorageFactory' that will take care of instantiating and resolving the 'val storage: ChatStorage' field in the 'ChatServer' with a concrete implementation of our persistence Actor.
.. code-block:: scala
/**
* Creates and links a MemoryChatStorage.
*/
trait MemoryChatStorageFactory { this: Actor =>
val storage = this.self.spawnLink[MemoryChatStorage] // starts and links ChatStorage
}
Composing the full Chat Service
-------------------------------
We have now created the full functionality for the chat server, all nicely decoupled into isolated and well-defined traits. Now let's bring all these traits together and compose the complete concrete 'ChatService'.
.. code-block:: scala
/**
* Class encapsulating the full Chat Service.
* Start service by invoking:
* <pre>
* val chatService = Actor.actorOf[ChatService]
* </pre>
*/
class ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
MemoryChatStorageFactory {
override def preStart() = {
remote.start("localhost", 2552);
remote.register("chat:service", self) //Register the actor with the specified service id
}
}
Creating a remote server service
--------------------------------
As you can see in the section above, we are overriding the Actor's 'start' method and are starting up a remote server node by invoking 'remote.start("localhost", 2552)'. This starts up the remote node on address "localhost" and port 2552 which means that it accepts incoming messages on this address. Then we register the ChatService actor in the remote node by invoking 'remote.register("chat:service", self)'. This means that the ChatService will be available to other actors on this specific id, address and port.
That's it. Were done. Now we have a, very simple, but scalable, fault-tolerant, event-driven, persistent chat server that can without problem serve a million concurrent users on a regular workstation.
Let's use it.
Sample client chat session
--------------------------
Now let's create a simple test runner that logs in posts some messages and logs out.
.. code-block:: scala
/**
* Test runner emulating a chat session.
*/
object ClientRunner {
def run = {
val client1 = new ChatClient("jonas")
client1.login
val client2 = new ChatClient("patrik")
client2.login
client1.post("Hi there")
println("CHAT LOG:\n\t" + client1.chatLog.log.mkString("\n\t"))
client2.post("Hello")
println("CHAT LOG:\n\t" + client2.chatLog.log.mkString("\n\t"))
client1.post("Hi again")
println("CHAT LOG:\n\t" + client1.chatLog.log.mkString("\n\t"))
client1.logout
client2.logout
}
}
Sample code
-----------
All this code is available as part of the Akka distribution. It resides in the './akka-samples/akka-sample-chat' module and have a 'README' file explaining how to run it.
Or if you rather browse it `online <https://github.com/jboner/akka/blob/master/akka-samples/akka-sample-chat/>`_.
Run it
------
Download and build Akka
#. Check out Akka from `<http://github.com/jboner/akka>`_
#. Set 'AKKA_HOME' environment variable to the root of the Akka distribution.
#. Open up a shell and step into the Akka distribution root folder.
#. Build Akka by invoking:
::
% sbt update
% sbt dist
Run a sample chat session
1. Fire up two shells. For each of them:
- Step down into to the root of the Akka distribution.
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
.. code-block:: scala
import sample.chat._
import akka.actor.Actor._
val chatService = actorOf[ChatService]
3. In the second REPL you get execute:
.. code-block:: scala
import sample.chat._
ClientRunner.run
4. See the chat simulation run.
5. Run it again to see full speed after first initialization.
6. In the client REPL, or in a new REPL, you can also create your own client
.. code-block:: scala
import sample.chat._
val myClient = new ChatClient("<your name>")
myClient.login
myClient.post("Can I join?")
println("CHAT LOG:\n\t" + myClient.chatLog.log.mkString("\n\t"))
That's it. Have fun.
REWRITE ME

View file

@ -281,9 +281,9 @@ private[akka] case class RemoteActorRef private[akka] (
SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)
}
def link(actorRef: ActorRef): ActorRef = unsupported
def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported
def unlink(actorRef: ActorRef): ActorRef = unsupported
def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported
protected[akka] def restart(cause: Throwable): Unit = unsupported

View file

@ -189,7 +189,6 @@
override def postStop() {
EventHandler.info(this, "Chat server is shutting down...")
shutdownSessions()
self.unlink(storage)
storage.stop()
}
}