Merge branch 'wip-improve-docs-rk'

This commit is contained in:
Roland 2011-12-30 00:25:11 +01:00
commit 0fc8ae5d73
33 changed files with 919 additions and 40 deletions

View file

@ -69,11 +69,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
within(2 seconds) {
import Logging._
verifyLevel(bus, InfoLevel)
bus.logLevel = WarningLevel
bus.setLogLevel(WarningLevel)
verifyLevel(bus, WarningLevel)
bus.logLevel = DebugLevel
bus.setLogLevel(DebugLevel)
verifyLevel(bus, DebugLevel)
bus.logLevel = ErrorLevel
bus.setLogLevel(ErrorLevel)
verifyLevel(bus, ErrorLevel)
}
}

View file

@ -62,6 +62,8 @@ import java.util.concurrent.atomic.AtomicBoolean
* val msg = ((Request3) o).getMsg();
* getSender().tell(other.ask(msg, 5000)); // reply with Future for holding the others reply (timeout 5 seconds)
*
* } else {
* unhandled(o);
* }
* }
* }

View file

@ -124,8 +124,6 @@ trait SubchannelClassification { this: EventBus ⇒
@volatile
private var cache = Map.empty[Classifier, Set[Subscriber]]
protected def subscribers = cache.values.flatten
/**
* Returns the Classifier associated with the given Event
*/

View file

@ -54,7 +54,7 @@ trait LoggingBus extends ActorEventBus {
* will not participate in the automatic management of log level
* subscriptions!
*/
def logLevel_=(level: LogLevel): Unit = guard.withGuard {
def setLogLevel(level: LogLevel): Unit = guard.withGuard {
for {
l AllLogLevels
// subscribe if previously ignored and now requested

View file

@ -3,7 +3,7 @@ package akka.event.japi
import akka.event._
/**
* See documentation for akka.event.LookupClassification
* See documentation for [[akka.event.LookupClassification]]
* E is the Event type
* S is the Subscriber type
* C is the Classifier type
@ -15,7 +15,19 @@ abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassificatio
}
/**
* See documentation for akka.event.ScanningClassification
* See documentation for [[akka.event.SubchannelClassification]]
* E is the Event type
* S is the Subscriber type
* C is the Classifier type
*/
abstract class SubchannelEventBus[E, S, C] extends EventBus with SubchannelClassification {
type Event = E
type Subscriber = S
type Classifier = C
}
/**
* See documentation for [[akka.event.ScanningClassification]]
* E is the Event type
* S is the Subscriber type
* C is the Classifier type
@ -27,12 +39,11 @@ abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassific
}
/**
* See documentation for akka.event.ActorClassification
* See documentation for [[akka.event.ActorClassification]]
* An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef
* Means that ActorRefs "listen" to other ActorRefs
* E is the Event type
*/
abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier {
}

View file

@ -3,6 +3,10 @@
Actor References, Paths and Addresses
=====================================
.. sidebar:: Contents
.. contents:: :local:
This chapter describes how actors are identified and located within a possibly
distributed actor system. It ties into the central idea that
:ref:`actor-systems` form intrinsic supervision hierarchies as well as that
@ -225,6 +229,23 @@ extracting the sender references, and then watch all discovered concrete
actors. This scheme of resolving a selection may be improved upon in a future
release.
.. _actorOf-vs-actorFor:
Summary: ``actorOf`` vs. ``actorFor``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
What the above sections described in some detail can be summarized and
memorized easily as follows:
- ``actorOf`` only ever creates a new actor, and it creates it as a direct
child of the context on which this method is invoked (which may be any
actor or actor system).
- ``actorFor`` only ever looks up an existing actor, i.e. does not create
one.
The Interplay with Remote Deployment
------------------------------------

View file

@ -180,3 +180,40 @@ Logging of Configuration
If the system or config property ``akka.logConfigOnStart`` is set to ``on``, then the
complete configuration at INFO level when the actor system is started. This is useful
when you are uncertain of what configuration is used.
If in doubt, you can also easily and nicely inspect configuration objects
before or after using them to construct an actor system:
.. code-block:: scala
Welcome to Scala version 2.9.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_27).
Type in expressions to have them evaluated.
Type :help for more information.
scala> import com.typesafe.config._
import com.typesafe.config._
scala> ConfigFactory.parseString("a.b=12")
res0: com.typesafe.config.Config = Config(SimpleConfigObject({"a" : {"b" : 12}}))
scala> res0.root.render
res1: java.lang.String =
{
# String: 1
"a" : {
# String: 1
"b" : 12
}
}
The comments preceding every item give detailed information about the origin of
the setting (file & line number) plus possible comments which were present,
e.g. in the reference configuration. The settings as merged with the reference
and parsed by the actor system can be displayed like this:
.. code-block:: java
final ActorSystem system = ActorSystem.create();
println(system.settings());
// this is a shortcut for system.settings().config().root().render()

View file

@ -39,6 +39,8 @@ public class FaultHandlingTestBase {
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o));
} else {
unhandled(o);
}
}
}
@ -49,6 +51,8 @@ public class FaultHandlingTestBase {
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o));
} else {
unhandled(o);
}
}
@ -70,6 +74,8 @@ public class FaultHandlingTestBase {
state = (Integer) o;
} else if (o.equals("get")) {
getSender().tell(state);
} else {
unhandled(o);
}
}
}

View file

@ -69,6 +69,8 @@ public class SchedulerDocTestBase {
public void onReceive(Object message) {
if (message.equals("Tick")) {
// Do someting
} else {
unhandled(message);
}
}
};

View file

@ -24,6 +24,10 @@ import static akka.actor.Actors.*;
import akka.japi.Procedure;
//#import-procedure
//#import-watch
import akka.actor.Terminated;
//#import-watch
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
@ -161,6 +165,15 @@ public class UntypedActorDocTestBase {
system.shutdown();
}
@Test
public void useWatch() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(WatchActor.class));
Future<Object> future = myActor.ask("kill", 1000);
assert Await.result(future, Duration.parse("1 second")).equals("finished");
system.shutdown();
}
public static class MyActor extends UntypedActor {
public MyActor(String s) {
@ -246,9 +259,36 @@ public class UntypedActorDocTestBase {
getContext().become(angry);
} else if (message.equals("foo")) {
getContext().become(happy);
} else {
unhandled(message);
}
}
}
//#hot-swap-actor
//#watch
public static class WatchActor extends UntypedActor {
final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child); // <-- this is the only call needed for registration
}
ActorRef lastSender = getContext().system().deadLetters();
@Override
public void onReceive(Object message) {
if (message.equals("kill")) {
getContext().stop(child);
lastSender = getSender();
} else if (message instanceof Terminated) {
final Terminated t = (Terminated) message;
if (t.getActor() == child) {
lastSender.tell("finished");
}
} else {
unhandled(message);
}
}
}
//#watch
}

View file

@ -23,11 +23,14 @@ import org.junit.Test;
import scala.Option;
import static org.junit.Assert.*;
import akka.actor.UntypedActorFactory;
//#imports-deadletter
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.actor.DeadLetter;
//#imports-deadletter
public class LoggingDocTestBase {
@ -43,6 +46,16 @@ public class LoggingDocTestBase {
system.shutdown();
}
@Test
public void subscribeToDeadLetters() {
//#deadletters
final ActorSystem system = ActorSystem.create("DeadLetters");
final ActorRef actor = system.actorOf(new Props(DeadLetterActor.class));
system.eventStream().subscribe(actor, DeadLetter.class);
//#deadletters
system.shutdown();
}
//#my-actor
class MyActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@ -87,4 +100,14 @@ public class LoggingDocTestBase {
}
//#my-event-listener
//#deadletter-actor
public static class DeadLetterActor extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof DeadLetter) {
System.out.println(message);
}
}
}
//#deadletter-actor
}

View file

@ -290,6 +290,8 @@ public class FutureDocTestBase {
} else {
getSender().tell(i);
}
} else {
unhandled(message);
}
}
}

View file

@ -34,6 +34,8 @@ public class CoordinatedCounter extends UntypedActor {
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get());
} else {
unhandled(incoming);
}
}
}

View file

@ -22,6 +22,8 @@ public class Coordinator extends UntypedActor {
});
//#coordinated-atomic
}
} else {
unhandled(incoming);
}
}
}

View file

@ -0,0 +1,211 @@
.. _event-bus-java:
################
Event Bus (Java)
################
.. sidebar:: Contents
.. contents:: :local:
Originally conceived as a way to send messages to groups of actors, the
:class:`EventBus` has been generalized into a set of composable traits
implementing a simple interface:
- :meth:`public boolean subscribe(S subscriber, C classifier)` subscribes the
given subscriber to events with the given classifier
- :meth:`public boolean unsubscribe(S subscriber, C classifier)` undoes a
specific subscription
- :meth:`public void unsubscribe(S subscriber)` undoes all subscriptions for
the given subscriber
- :meth:`public void publish(E event)` publishes an event, which first is classified
according to the specific bus (see `Classifiers`_) and then published to all
subscribers for the obtained classifier
This mechanism is used in different places within Akka, e.g. the
:ref:`DeathWatch <deathwatch-java>` and the `Event Stream`_. Implementations
can make use of the specific building blocks presented below.
An event bus must define the following three abstract types:
- :class:`E` is the type of all events published on that bus
- :class:`S` is the type of subscribers allowed to register on that event bus
- :class:`C` defines the classifier to be used in selecting subscribers for
dispatching events
The traits below are still generic in these types, but they need to be defined
for any concrete implementation.
Classifiers
===========
The classifiers presented here are part of the Akka distribution, but rolling
your own in case you do not find a perfect match is not difficult, check the
implementation of the existing ones on `github`_.
.. _github: https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala
Lookup Classification
---------------------
The simplest classification is just to extract an arbitrary classifier from
each event and maintaining a set of subscribers for each possible classifier.
This can be compared to tuning in on a radio station. The abstract class
:class:`LookupEventBus` is still generic in that it abstracts over how to
compare subscribers and how exactly to classify. The necessary methods to be
implemented are the following:
- :meth:`public C classify(E event)` is used for extracting the
classifier from the incoming events.
- :meth:`public int compareSubscribers(S a, S b)` must define a
partial order over the subscribers, expressed as expected from
:meth:`java.lang.Comparable.compare`.
- :meth:`public void publish(E event, S subscriber)` will be invoked for
each event for all subscribers which registered themselves for the events
classifier.
- :meth:`public int mapSize()` determines the initial size of the index data structure
used internally (i.e. the expected number of different classifiers).
This classifier is efficient in case no subscribers exist for a particular event.
Subchannel Classification
-------------------------
If classifiers form a hierarchy and it is desired that subscription be possible
not only at the leaf nodes, this classification may be just the right one. It
can be compared to tuning in on (possibly multiple) radio channels by genre.
This classification has been developed for the case where the classifier is
just the JVM class of the event and subscribers may be interested in
subscribing to all subclasses of a certain class, but it may be used with any
classifier hierarchy. The abstract members needed by this classifier are
- :meth:`public Subclassification[C] subclassification()` provides an object
providing :meth:`isEqual(a: Classifier, b: Classifier)` and
:meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other
methods of this classifier; this method is called on various occasions, it
should be implemented so that it always returns the same object for
performance reasons.
- :meth:`public C classify(E event)` is used for extracting the classifier from
the incoming events.
- :meth:`public void publish(E event, S subscriber)` will be invoked for
each event for all subscribers which registered themselves for the events
classifier.
This classifier is also efficient in case no subscribers are found for an
event, but it uses conventional locking to synchronize an internal classifier
cache, hence it is not well-suited to use cases in which subscriptions change
with very high frequency (keep in mind that “opening” a classifier by sending
the first message will also have to re-check all previous subscriptions).
Scanning Classification
-----------------------
The previous classifier was built for multi-classifier subscriptions which are
strictly hierarchical, this classifier is useful if there are overlapping
classifiers which cover various parts of the event space without forming a
hierarchy. It can be compared to tuning in on (possibly multiple) radio
stations by geographical reachability (for old-school radio-wave transmission).
The abstract members for this classifier are:
- :meth:`public int compareClassifiers(C a, C b)` is needed for
determining matching classifiers and storing them in an ordered collection.
- :meth:`public int compareSubscribers(S a, S b)` is needed for
storing subscribers in an ordered collection.
- :meth:`public boolean matches(C classifier, E event)` determines
whether a given classifier shall match a given event; it is invoked for each
subscription for all received events, hence the name of the classifier.
- :meth:`public void publish(E event, S subscriber)` will be invoked for
each event for all subscribers which registered themselves for a classifier
matching this event.
This classifier takes always a time which is proportional to the number of
subscriptions, independent of how many actually match.
Actor Classification
--------------------
This classification has been developed specifically for implementing
:ref:`DeathWatch <deathwatch-java>`: subscribers as well as classifiers are of
type :class:`ActorRef`. The abstract members are
- :meth:`public ActorRef classify(E event)` is used for extracting the
classifier from the incoming events.
- :meth:`public int mapSize()` determines the initial size of the index data structure
used internally (i.e. the expected number of different classifiers).
This classifier is still is generic in the event type, and it is efficient for
all use cases.
.. _event-stream-java:
Event Stream
============
The event stream is the main event bus of each actor system: it is used for
carrying :ref:`log messages <logging-java>` and `Dead Letters`_ and may be
used by the user code for other purposes as well. It uses `Subchannel
Classification`_ which enables registering to related sets of channels (as is
used for :class:`RemoteLifeCycleMessage`). The following example demonstrates
how a simple subscription works. Given a simple actor:
.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#imports-deadletter
.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#deadletter-actor
it can be subscribed like this:
.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#deadletters
Default Handlers
----------------
Upon start-up the actor system creates and subscribes actors to the event
stream for logging: these are the handlers which are configured for example in
``application.conf``:
.. code-block:: text
akka {
event-handlers = ["akka.event.Logging$DefaultLogger"]
}
The handlers listed here by fully-qualified class name will be subscribed to
all log event classes with priority higher than or equal to the configured
log-level and their subscriptions are kept in sync when changing the log-level
at runtime::
system.eventStream.setLogLevel(Logging.DebugLevel());
This means that log events for a level which will not be logged are not
typically not dispatched at all (unless manual subscriptions to the respective
event class have been done)
Dead Letters
------------
As described at :ref:`stopping-actors-java`, messages queued when an actor
terminates or sent after its death are re-routed to the dead letter mailbox,
which by default will publish the messages wrapped in :class:`DeadLetter`. This
wrapper holds the original sender, receiver and message of the envelope which
was redirected.
Other Uses
----------
The event stream is always there and ready to be used, just publish your own
events (it accepts ``Object``) and subscribe listeners to the corresponding JVM
classes.

View file

@ -9,6 +9,7 @@ Java API
untyped-actors
typed-actors
logging
event-bus
scheduler
futures
dataflow

View file

@ -54,6 +54,8 @@ As you can see from the example above the following pattern is used to find an `
akka://<actorsystemname>@<hostname>:<port>/<actor path>
For more details on how actor addresses and paths are formed and used, please refer to :ref:`addressing`.
Creating Actors Remotely
^^^^^^^^^^^^^^^^^^^^^^^^
@ -113,3 +115,108 @@ This is also done via configuration::
This configuration setting will clone the actor “aggregation” 10 times and deploy it evenly distributed across
the two given target nodes.
Description of the Remoting Sample
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The sample application included with the Akka sources demonstrates both, remote
deployment and look-up of remote actors. First, let us have a look at the
common setup for both scenarios (this is ``common.conf``):
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/common.conf
This enables the remoting by installing the :class:`RemoteActorRefProvider` and
chooses the default remote transport. All other options will be set
specifically for each show case.
.. note::
Be sure to replace the default IP 127.0.0.1 with the real address the system
is reachable by if you deploy onto multiple machines!
.. _remote-lookup-sample-java:
Remote Lookup
-------------
In order to look up a remote actor, that one must be created first. For this
purpose, we configure an actor system to listen on port 2552 (this is a snippet
from ``application.conf``):
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/application.conf
:include: calculator
Then the actor must be created. For all code which follows, assume these imports:
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java
:include: imports
The actor doing the work will be this one:
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java
:include: actor
and we start it within an actor system using the above configuration
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java
:include: setup
With the service actor up and running, we may look it up from another actor
system, which will be configured to use port 2553 (this is a snippet from
``application.conf``).
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/application.conf
:include: remotelookup
The actor which will query the calculator is a quite simple one for demonstration purposes
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java
:include: actor
and it is created from an actor system using the aforementioned clients config.
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java
:include: setup
Requests which come in via ``doSomething`` will be sent to the client actor
along with the reference which was looked up earlier. Observe how the actor
system name using in ``actorFor`` matches the remote systems name, as do IP
and port number. Top-level actors are always created below the ``"/user"``
guardian, which supervises them.
Remote Deployment
-----------------
Creating remote actors instead of looking them up is not visible in the source
code, only in the configuration file. This section is used in this scenario
(this is a snippet from ``application.conf``):
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/application.conf
:include: remotecreation
For all code which follows, assume these imports:
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java
:include: imports
The server actor can multiply or divide numbers:
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java
:include: actor
The client actor looks like in the previous example
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java
:include: actor
but the setup uses only ``actorOf``:
.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java
:include: setup
Observe how the name of the server actor matches the deployment given in the
configuration file, which will transparently delegate the actor creation to the
remote node.

View file

@ -28,6 +28,10 @@ its syntax from Erlang.
Creating Actors
===============
Since Akka enforces parental supervision every actor is supervised and
(potentially) the supervisor of its children; it is advisable that you
familiarize yourself with :ref:`actor-systems` and :ref:`supervision` and it
may also help to read :ref:`actorOf-vs-actorFor`.
Defining an Actor class
-----------------------
@ -131,6 +135,7 @@ In addition, it offers:
* system that the actor belongs to
* parent supervisor
* supervised children
* lifecycle monitoring
* hotswap behavior stack as described in :ref:`UntypedActor.HotSwap`
The remaining visible methods are user-overridable life-cycle hooks which are
@ -141,6 +146,36 @@ described in the following:
The implementations shown above are the defaults provided by the :class:`UntypedActor`
class.
.. _deathwatch-java:
Lifecycle Monitoring aka DeathWatch
-----------------------------------
In order to be notified when another actor terminates (i.e. stops permanently,
not temporary failure and restart), an actor may register itself for reception
of the :class:`Terminated` message dispatched by the other actor upon
termination (see `Stopping Actors`_). This service is provided by the
:class:`DeathWatch` component of the actor system.
Registering a monitor is easy (see fourth line, the rest is for demonstrating
the whole functionality):
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#watch
It should be noted that the :class:`Terminated` message is generated
independent of the order in which registration and termination occur.
Registering multiple times does not necessarily lead to multiple messages being
generated, but there is no guarantee that only exactly one such message is
received: if termination of the watched actor has generated and queued the
message, and another registration is done before this message has been
processed, then a second message will be queued, because registering for
monitoring of an already terminated actor leads to the immediate generation of
the :class:`Terminated` message.
It is also possible to deregister from watching another actors liveliness
using ``context.unwatch(target)``, but obviously this cannot guarantee
non-reception of the :class:`Terminated` message because that may already have
been queued.
Start Hook
----------
@ -225,8 +260,8 @@ Remote actor addresses may also be looked up, if remoting is enabled::
These look-ups return a (possibly remote) actor reference immediately, so you
will have to send to it and await a reply in order to verify that ``serviceB``
is actually reachable and running.
is actually reachable and running. An example demonstrating actor look-up is
given in :ref:`remote-lookup-sample-java`.
Messages and immutability
=========================
@ -383,6 +418,8 @@ message.
.. includecode:: code/akka/docs/actor/MyReceivedTimeoutUntypedActor.java#receive-timeout
.. _stopping-actors-java:
Stopping actors
===============
@ -397,21 +434,44 @@ but additional messages in the mailbox will not be processed. By default these
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
depends on the mailbox implementation.
When stop is called then a call to the ``def postStop`` callback method will
take place. The ``Actor`` can use this callback to implement shutdown behavior.
Termination of an actor proceeds in two steps: first the actor suspends its
mailbox processing and sends a stop command to all its children, then it keeps
processing the termination messages from its children until the last one is
gone, finally terminating itself (invoking :meth:`postStop`, dumping mailbox,
publishing :class:`Terminated` on the :ref:`DeathWatch <deathwatch-java>`, telling
its supervisor). This procedure ensures that actor system sub-trees terminate
in an orderly fashion, propagating the stop command to the leaves and
collecting their confirmation back to the stopped supervisor. If one of the
actors does not respond (i.e. processing a message for extended periods of time
and therefore not receiving the stop command), this whole process will be
stuck.
It is possible to disregard specific children with respect to shutdown
confirmation by stopping them explicitly before issuing the
``context.stop(self)``::
context.stop(someChild);
context.stop(self);
In this case ``someChild`` will be stopped asynchronously and re-parented to
the :class:`Locker`, where :class:`DavyJones` will keep tabs and dispose of it
eventually.
Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be
stopped, and the aforementioned process will ensure proper termination of the
whole system.
The :meth:`postStop()` hook is invoked after an actor is fully stopped. This
enables cleaning up of resources:
.. code-block:: java
@Override
public void postStop() {
... // clean up resources
// close some file or database connection
}
All Actors are stopped when the ``ActorSystem`` is stopped.
Supervised actors are stopped when the supervisor is stopped, i.e. children are stopped
when parent is stopped.
PoisonPill
----------
@ -420,9 +480,6 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as
ordinary messages and will be handled after messages that were already queued
in the mailbox.
If the ``PoisonPill`` was sent with ``ask``, the ``Future`` will be completed with an
``akka.actor.ActorKilledException("PoisonPill")``.
Use it like this:
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java

View file

@ -12,3 +12,18 @@ changes in client code. This API cleanup is planned to be the last one for a
significant amount of time.
Detailed migration guide will be written.
Unordered Collection of Migration Items
=======================================
``ActorRef.ask()``
------------------
The mechanism for collecting an actors reply in a :class:`Future` has been
reworked for better location transparency: it uses an actor under the hood.
This actor needs to be disposable by the garbage collector in case no reply is
ever received, and the decision is based upon a timeout. This timeout
determines when the actor will stop itself and hence closes the window for a
reply to be received; it is independent of the timeout applied when awaiting
completion of the :class:`Future`, however, the actor will complete the
:class:`Future` with an :class:`AskTimeoutException` when it stops itself.

View file

@ -28,6 +28,10 @@ its syntax from Erlang.
Creating Actors
===============
Since Akka enforces parental supervision every actor is supervised and
(potentially) the supervisor of its children; it is advisable that you
familiarize yourself with :ref:`actor-systems` and :ref:`supervision` and it
may also help to read :ref:`actorOf-vs-actorFor`.
Defining an Actor class
-----------------------
@ -156,6 +160,7 @@ In addition, it offers:
* system that the actor belongs to
* parent supervisor
* supervised children
* lifecycle monitoring
* hotswap behavior stack as described in :ref:`Actor.HotSwap`
You can import the members in the :obj:`context` to avoid prefixing access with ``context.``
@ -176,6 +181,35 @@ described in the following::
The implementations shown above are the defaults provided by the :class:`Actor`
trait.
.. _deathwatch-scala:
Lifecycle Monitoring aka DeathWatch
-----------------------------------
In order to be notified when another actor terminates (i.e. stops permanently,
not temporary failure and restart), an actor may register itself for reception
of the :class:`Terminated` message dispatched by the other actor upon
termination (see `Stopping Actors`_). This service is provided by the
:class:`DeathWatch` component of the actor system.
Registering a monitor is easy:
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#watch
It should be noted that the :class:`Terminated` message is generated
independent of the order in which registration and termination occur.
Registering multiple times does not necessarily lead to multiple messages being
generated, but there is no guarantee that only exactly one such message is
received: if termination of the watched actor has generated and queued the
message, and another registration is done before this message has been
processed, then a second message will be queued, because registering for
monitoring of an already terminated actor leads to the immediate generation of
the :class:`Terminated` message.
It is also possible to deregister from watching another actors liveliness
using ``context.unwatch(target)``, but obviously this cannot guarantee
non-reception of the :class:`Terminated` message because that may already have
been queued.
Start Hook
----------
@ -258,7 +292,8 @@ Remote actor addresses may also be looked up, if remoting is enabled::
These look-ups return a (possibly remote) actor reference immediately, so you
will have to send to it and await a reply in order to verify that ``serviceB``
is actually reachable and running.
is actually reachable and running. An example demonstrating actor look-up is
given in :ref:`remote-lookup-sample-scala`.
Messages and immutability
=========================
@ -442,6 +477,7 @@ object.
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-timeout
.. _stopping-actors-scala:
Stopping actors
===============
@ -457,19 +493,42 @@ but additional messages in the mailbox will not be processed. By default these
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
depends on the mailbox implementation.
When stop is called then a call to the ``def postStop`` callback method will
take place. The ``Actor`` can use this callback to implement shutdown behavior.
Termination of an actor proceeds in two steps: first the actor suspends its
mailbox processing and sends a stop command to all its children, then it keeps
processing the termination messages from its children until the last one is
gone, finally terminating itself (invoking :meth:`postStop`, dumping mailbox,
publishing :class:`Terminated` on the :ref:`DeathWatch <deathwatch-scala>`, telling
its supervisor). This procedure ensures that actor system sub-trees terminate
in an orderly fashion, propagating the stop command to the leaves and
collecting their confirmation back to the stopped supervisor. If one of the
actors does not respond (i.e. processing a message for extended periods of time
and therefore not receiving the stop command), this whole process will be
stuck.
It is possible to disregard specific children with respect to shutdown
confirmation by stopping them explicitly before issuing the
``context.stop(self)``::
context.stop(someChild)
context.stop(self)
In this case ``someChild`` will be stopped asynchronously and re-parented to
the :class:`Locker`, where :class:`DavyJones` will keep tabs and dispose of it
eventually.
Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be
stopped, and the aforementioned process will ensure proper termination of the
whole system.
The :meth:`postStop()` hook is invoked after an actor is fully stopped. This
enables cleaning up of resources:
.. code-block:: scala
override def postStop() = {
... // clean up resources
// close some file or database connection
}
All Actors are stopped when the ``ActorSystem`` is stopped.
Supervised actors are stopped when the supervisor is stopped, i.e. children are stopped
when parent is stopped.
PoisonPill
----------
@ -479,10 +538,6 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as
ordinary messages and will be handled after messages that were already queued
in the mailbox.
If the ``PoisonPill`` was sent with ``?``, the ``Future`` will be completed with an
``akka.actor.ActorKilledException("PoisonPill")``.
.. _Actor.HotSwap:
Become/Unbecome

View file

@ -296,4 +296,25 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val actor = system.actorOf(Props(new HotSwapActor), name = "hot")
}
"using watch" in {
//#watch
import akka.actor.{ Actor, Props, Terminated }
class WatchActor extends Actor {
val child = context.actorOf(Props.empty, "child")
context.watch(child) // <-- this is the only call needed for registration
var lastSender = system.deadLetters
def receive = {
case "kill" context.stop(child); lastSender = sender
case Terminated(`child`) lastSender ! "finished"
}
}
//#watch
val a = system.actorOf(Props(new WatchActor))
implicit val sender = testActor
a ! "kill"
expectMsg("finished")
}
}

View file

@ -58,4 +58,17 @@ class LoggingDocSpec extends AkkaSpec {
myActor ! "test"
}
"allow registration to dead letters" in {
//#deadletters
import akka.actor.{ Actor, DeadLetter, Props }
val listener = system.actorOf(Props(new Actor {
def receive = {
case d: DeadLetter println(d)
}
}))
system.eventStream.subscribe(listener, classOf[DeadLetter])
//#deadletters
}
}

View file

@ -0,0 +1,205 @@
.. _event-bus-scala:
#################
Event Bus (Scala)
#################
.. sidebar:: Contents
.. contents:: :local:
Originally conceived as a way to send messages to groups of actors, the
:class:`EventBus` has been generalized into a set of composable traits
implementing a simple interface:
- :meth:`subscribe(subscriber: Subscriber, classifier: Classifier): Boolean`
subscribes the given subscriber to events with the given classifier
- :meth:`unsubscribe(subscriber: Subscriber, classifier: Classifier): Boolean`
undoes a specific subscription
- :meth:`unsubscribe(subscriber: Subscriber)` undoes all subscriptions for the
given subscriber
- :meth:`publish(event: Event)` publishes an event, which first is classified
according to the specific bus (see `Classifiers`_) and then published to all
subscribers for the obtained classifier
This mechanism is used in different places within Akka, e.g. the
:ref:`DeathWatch <deathwatch-scala>` and the `Event Stream`_. Implementations
can make use of the specific building blocks presented below.
An event bus must define the following three abstract types:
- :class:`Event` is the type of all events published on that bus
- :class:`Subscriber` is the type of subscribers allowed to register on that
event bus
- :class:`Classifier` defines the classifier to be used in selecting
subscribers for dispatching events
The traits below are still generic in these types, but they need to be defined
for any concrete implementation.
Classifiers
===========
The classifiers presented here are part of the Akka distribution, but rolling
your own in case you do not find a perfect match is not difficult, check the
implementation of the existing ones on `github`_.
.. _github: https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala
Lookup Classification
---------------------
The simplest classification is just to extract an arbitrary classifier from
each event and maintaining a set of subscribers for each possible classifier.
This can be compared to tuning in on a radio station. The trait
:class:`LookupClassification` is still generic in that it abstracts over how to
compare subscribers and how exactly to classify. The necessary methods to be
implemented are the following:
- :meth:`classify(event: Event): Classifier` is used for extracting the
classifier from the incoming events.
- :meth:`compareSubscribers(a: Subscriber, b: Subscriber): Int` must define a
partial order over the subscribers, expressed as expected from
:meth:`java.lang.Comparable.compare`.
- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for
each event for all subscribers which registered themselves for the events
classifier.
- :meth:`mapSize: Int` determines the initial size of the index data structure
used internally (i.e. the expected number of different classifiers).
This classifier is efficient in case no subscribers exist for a particular event.
Subchannel Classification
-------------------------
If classifiers form a hierarchy and it is desired that subscription be possible
not only at the leaf nodes, this classification may be just the right one. It
can be compared to tuning in on (possibly multiple) radio channels by genre.
This classification has been developed for the case where the classifier is
just the JVM class of the event and subscribers may be interested in
subscribing to all subclasses of a certain class, but it may be used with any
classifier hierarchy. The abstract members needed by this classifier are
- :obj:`subclassification: Subclassification[Classifier]` is an object
providing :meth:`isEqual(a: Classifier, b: Classifier)` and
:meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other
methods of this classifier.
- :meth:`classify(event: Event): Classifier` is used for extracting the
classifier from the incoming events.
- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for
each event for all subscribers which registered themselves for the events
classifier.
This classifier is also efficient in case no subscribers are found for an
event, but it uses conventional locking to synchronize an internal classifier
cache, hence it is not well-suited to use cases in which subscriptions change
with very high frequency (keep in mind that “opening” a classifier by sending
the first message will also have to re-check all previous subscriptions).
Scanning Classification
-----------------------
The previous classifier was built for multi-classifier subscriptions which are
strictly hierarchical, this classifier is useful if there are overlapping
classifiers which cover various parts of the event space without forming a
hierarchy. It can be compared to tuning in on (possibly multiple) radio
stations by geographical reachability (for old-school radio-wave transmission).
The abstract members for this classifier are:
- :meth:`compareClassifiers(a: Classifier, b: Classifier): Int` is needed for
determining matching classifiers and storing them in an ordered collection.
- :meth:`compareSubscribers(a: Subscriber, b: Subscriber): Int` is needed for
storing subscribers in an ordered collection.
- :meth:`matches(classifier: Classifier, event: Event): Boolean` determines
whether a given classifier shall match a given event; it is invoked for each
subscription for all received events, hence the name of the classifier.
- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for
each event for all subscribers which registered themselves for a classifier
matching this event.
This classifier takes always a time which is proportional to the number of
subscriptions, independent of how many actually match.
Actor Classification
--------------------
This classification has been developed specifically for implementing
:ref:`DeathWatch <deathwatch-scala>`: subscribers as well as classifiers are of
type :class:`ActorRef`. The abstract members are
- :meth:`classify(event: Event): ActorRef` is used for extracting the
classifier from the incoming events.
- :meth:`mapSize: Int` determines the initial size of the index data structure
used internally (i.e. the expected number of different classifiers).
This classifier is still is generic in the event type, and it is efficient for
all use cases.
.. _event-stream-scala:
Event Stream
============
The event stream is the main event bus of each actor system: it is used for
carrying :ref:`log messages <logging-scala>` and `Dead Letters`_ and may be
used by the user code for other purposes as well. It uses `Subchannel
Classification`_ which enables registering to related sets of channels (as is
used for :class:`RemoteLifeCycleMessage`). The following example demonstrates
how a simple subscription works:
.. includecode:: code/akka/docs/event/LoggingDocSpec.scala#deadletters
Default Handlers
----------------
Upon start-up the actor system creates and subscribes actors to the event
stream for logging: these are the handlers which are configured for example in
``application.conf``:
.. code-block:: text
akka {
event-handlers = ["akka.event.Logging$DefaultLogger"]
}
The handlers listed here by fully-qualified class name will be subscribed to
all log event classes with priority higher than or equal to the configured
log-level and their subscriptions are kept in sync when changing the log-level
at runtime::
system.eventStream.setLogLevel(Logging.DebugLevel)
This means that log events for a level which will not be logged are not
typically not dispatched at all (unless manual subscriptions to the respective
event class have been done)
Dead Letters
------------
As described at :ref:`stopping-actors-scala`, messages queued when an actor
terminates or sent after its death are re-routed to the dead letter mailbox,
which by default will publish the messages wrapped in :class:`DeadLetter`. This
wrapper holds the original sender, receiver and message of the envelope which
was redirected.
Other Uses
----------
The event stream is always there and ready to be used, just publish your own
events (it accepts ``AnyRef``) and subscribe listeners to the corresponding JVM
classes.

View file

@ -9,6 +9,7 @@ Scala API
actors
typed-actors
logging
event-bus
scheduler
futures
dataflow

View file

@ -70,6 +70,8 @@ Once you a reference to the actor you can interact with it they same way you wou
actor ! "Pretty awesome feature"
For more details on how actor addresses and paths are formed and used, please refer to :ref:`addressing`.
Creating Actors Remotely
^^^^^^^^^^^^^^^^^^^^^^^^
@ -151,6 +153,13 @@ This enables the remoting by installing the :class:`RemoteActorRefProvider` and
chooses the default remote transport. All other options will be set
specifically for each show case.
.. note::
Be sure to replace the default IP 127.0.0.1 with the real address the system
is reachable by if you deploy onto multiple machines!
.. _remote-lookup-sample-scala:
Remote Lookup
-------------

View file

@ -5,19 +5,24 @@ package sample.remote.calculator.java;
import akka.actor.UntypedActor;
//#actor
public class JAdvancedCalculatorActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Op.Multiply) {
Op.Multiply multiply = (Op.Multiply) message;
System.out.println("Calculating " + multiply.getN1() + " * " + multiply.getN2());
getSender().tell(new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2()));
} else if (message instanceof Op.Divide) {
Op.Divide divide = (Op.Divide) message;
System.out.println("Calculating " + divide.getN1() + " / " + divide.getN2());
getSender().tell(new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2()));
} else {
unhandled(message);
}
}
}
//#actor

View file

@ -9,6 +9,7 @@ import akka.actor.Props;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;
//#setup
public class JCalculatorApplication implements Bootable {
private ActorSystem system;
@ -26,3 +27,4 @@ public class JCalculatorApplication implements Bootable {
system.shutdown();
}
}
//#setup

View file

@ -8,24 +8,35 @@ import akka.actor.UntypedActor;
import java.text.DecimalFormat;
import java.text.NumberFormat;
//#actor
public class JCreationActor extends UntypedActor {
private static final NumberFormat formatter = new DecimalFormat("#0.00");
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof InternalMsg.MathOpMsg) {
// forward math op to server actor
InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message;
msg.getActor().tell(msg.getMathOp(), getSelf());
} else if (message instanceof Op.MathResult) {
// receive reply from server actor
if (message instanceof Op.MultiplicationResult) {
Op.MultiplicationResult result = (Op.MultiplicationResult) message;
System.out.println("Mul result: " + result.getN1() + " * " +
result.getN2() + " = " + result.getResult());
} else if (message instanceof Op.DivisionResult) {
Op.DivisionResult result = (Op.DivisionResult) message;
System.out.println("Div result: " + result.getN1() + " / " +
result.getN2() + " = " + formatter.format(result.getResult()));
}
} else {
unhandled(message);
}
}
}
//#actor

View file

@ -9,6 +9,7 @@ import akka.actor.Props;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;
//#setup
public class JCreationApplication implements Bootable {
private ActorSystem system;
private ActorRef actor;
@ -33,3 +34,4 @@ public class JCreationApplication implements Bootable {
system.shutdown();
}
}
//#setup

View file

@ -5,23 +5,35 @@ package sample.remote.calculator.java;
import akka.actor.UntypedActor;
//#actor
public class JLookupActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof InternalMsg.MathOpMsg) {
// send message to server actor
InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message;
msg.getActor().tell(msg.getMathOp(), getSelf());
} else if (message instanceof Op.MathResult) {
// receive reply from server actor
if (message instanceof Op.AddResult) {
Op.AddResult result = (Op.AddResult) message;
System.out.println("Add result: " + result.getN1() + " + " +
result.getN2() + " = " + result.getResult());
} else if (message instanceof Op.SubtractResult) {
Op.SubtractResult result = (Op.SubtractResult) message;
System.out.println("Sub result: " + result.getN1() + " - " +
result.getN2() + " = " + result.getResult());
}
} else {
unhandled(message);
}
}
}
//#actor

View file

@ -3,12 +3,16 @@
*/
package sample.remote.calculator.java;
//#imports
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;
//#imports
//#setup
public class JLookupApplication implements Bootable {
private ActorSystem system;
private ActorRef actor;
@ -33,3 +37,4 @@ public class JLookupApplication implements Bootable {
system.shutdown();
}
}
//#setup

View file

@ -5,19 +5,24 @@ package sample.remote.calculator.java;
import akka.actor.UntypedActor;
//#actor
public class JSimpleCalculatorActor extends UntypedActor {
@Override
public void onReceive(Object message) {
if (message instanceof Op.Add) {
Op.Add add = (Op.Add) message;
System.out.println("Calculating " + add.getN1() + " + " + add.getN2());
getSender().tell(new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2()));
} else if (message instanceof Op.Subtract) {
Op.Subtract subtract = (Op.Subtract) message;
System.out.println("Calculating " + subtract.getN1() + " - " + subtract.getN2());
getSender().tell(new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2()));
} else {
unhandled(message);
}
}
}
//#actor

View file

@ -314,10 +314,6 @@ object AkkaBuild extends Build {
if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
// add config dir to classpaths
unmanagedClasspath in Runtime <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") },
unmanagedClasspath in Test <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") },
parallelExecution in Test := System.getProperty("akka.parallelExecution", "true").toBoolean,
// for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)