Merging in the hotswap docs into master
This commit is contained in:
commit
9cc8b67cf4
13 changed files with 420 additions and 254 deletions
|
|
@ -248,14 +248,17 @@ trait Actor {
|
|||
/**
|
||||
* User overridable callback.
|
||||
* <p/>
|
||||
* Is called when an Actor is started by invoking 'actor'.
|
||||
* Is called when an Actor is started.
|
||||
* Actors are automatically started asynchronously when created.
|
||||
* Empty default implementation.
|
||||
*/
|
||||
def preStart() {}
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
* <p/>
|
||||
* Is called when 'actor.stop()' is invoked.
|
||||
* Is called asynchronously after 'actor.stop()' is invoked.
|
||||
* Empty default implementation.
|
||||
*/
|
||||
def postStop() {}
|
||||
|
||||
|
|
|
|||
|
|
@ -196,7 +196,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
|||
/**
|
||||
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down,
|
||||
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or default specified in
|
||||
* akka-actor-reference.conf
|
||||
* reference.conf
|
||||
*/
|
||||
protected[akka] def shutdownTimeout: Duration
|
||||
|
||||
|
|
|
|||
|
|
@ -5,9 +5,9 @@ General
|
|||
:maxdepth: 2
|
||||
|
||||
jmm
|
||||
message-send-semantics
|
||||
configuration
|
||||
event-handler
|
||||
slf4j
|
||||
addressing
|
||||
supervision
|
||||
guaranteed-delivery
|
||||
|
|
|
|||
|
|
@ -1,13 +1,14 @@
|
|||
|
||||
.. _guaranteed-delivery:
|
||||
.. _message-send-semantics:
|
||||
|
||||
#####################
|
||||
Guaranteed Delivery
|
||||
#####################
|
||||
#######################
|
||||
Message send semantics
|
||||
#######################
|
||||
|
||||
|
||||
Guaranteed Delivery
|
||||
===================
|
||||
|
||||
Guaranteed Delivery?
|
||||
====================
|
||||
|
||||
Akka does *not* support guaranteed delivery.
|
||||
|
||||
|
|
@ -34,9 +35,35 @@ in Erlang and requires the user to model his application around. You can
|
|||
read more about this approach in the `Erlang documentation`_ (section
|
||||
10.9 and 10.10), Akka follows it closely.
|
||||
|
||||
Bottom line; you as a developer knows what guarantees you need in your
|
||||
Bottom line: you as a developer know what guarantees you need in your
|
||||
application and can solve it fastest and most reliable by explicit ``ACK`` and
|
||||
``RETRY`` (if you really need it, most often you don't). Using Akka's Durable
|
||||
Mailboxes could help with this.
|
||||
|
||||
Delivery semantics
|
||||
==================
|
||||
|
||||
At-most-once
|
||||
------------
|
||||
|
||||
Actual transports may provide stronger semantics,
|
||||
but at-most-once is the semantics you should expect.
|
||||
The alternatives would be once-and-only-once, which is extremely costly,
|
||||
or at-least-once which essentially requires idempotency of message processing,
|
||||
which is a user-level concern.
|
||||
|
||||
Ordering is preserved on a per-sender basis
|
||||
-------------------------------------------
|
||||
|
||||
Actor ``A1` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
|
||||
Actor ``A3`` sends messages ``M4``, ``M5``, ``M6`` to ``A2``
|
||||
|
||||
This means that:
|
||||
1) If ``M1`` is delivered it must be delivered before ``M2`` and ``M3``
|
||||
2) If ``M2`` is delivered it must be delivered before ``M3``
|
||||
3) If ``M4`` is delivered it must be delivered before ``M5`` and ``M6``
|
||||
4) If ``M5`` is delivered it must be delivered before ``M6``
|
||||
5) ``A2`` can see messages from ``A1`` interleaved with messages from ``A3``
|
||||
6) Since there is no guaranteed delivery, none, some or all of the messages may arrive to ``A2``
|
||||
|
||||
.. _Erlang documentation: http://www.erlang.org/faq/academic.html
|
||||
|
|
@ -28,17 +28,12 @@ its syntax from Erlang.
|
|||
Creating Actors
|
||||
===============
|
||||
|
||||
Actors can be created either by:
|
||||
|
||||
* Extending the Actor class and implementing the receive method.
|
||||
* Create an anonymous actor using one of the actor methods.
|
||||
|
||||
|
||||
Defining an Actor class
|
||||
-----------------------
|
||||
|
||||
Actor classes are implemented by extending the Actor class and implementing the
|
||||
``receive`` method. The ``receive`` method should define a series of case
|
||||
:meth:`receive` method. The :meth:`receive` method should define a series of case
|
||||
statements (which has the type ``PartialFunction[Any, Unit]``) that defines
|
||||
which messages your Actor can handle, using standard Scala pattern matching,
|
||||
along with the implementation of how the messages should be processed.
|
||||
|
|
@ -46,21 +41,22 @@ along with the implementation of how the messages should be processed.
|
|||
Here is an example:
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala
|
||||
:include: imports,my-actor
|
||||
:include: imports1,my-actor
|
||||
|
||||
Please note that the Akka Actor ``receive`` message loop is exhaustive, which is
|
||||
different compared to Erlang and Scala Actors. This means that you need to
|
||||
provide a pattern match for all messages that it can accept and if you want to
|
||||
be able to handle unknown messages then you need to have a default case as in
|
||||
the example above.
|
||||
|
||||
the example above. Otherwise an ``UnhandledMessageException`` will be
|
||||
thrown and the actor is restarted when an unknown message is received.
|
||||
|
||||
Creating Actors
|
||||
---------------
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#creating-actorOf
|
||||
.. includecode:: code/ActorDocSpec.scala
|
||||
:include: imports2,system-actorOf
|
||||
|
||||
The call to ``actorOf`` returns an instance of ``ActorRef``. This is a handle to
|
||||
The call to :meth:`actorOf` returns an instance of ``ActorRef``. This is a handle to
|
||||
the ``Actor`` instance which you can use to interact with the ``Actor``. The
|
||||
``ActorRef`` is immutable and has a one to one relationship with the Actor it
|
||||
represents. The ``ActorRef`` is also serializable and network-aware. This means
|
||||
|
|
@ -68,6 +64,15 @@ that you can serialize it, send it over the wire and use it on a remote host and
|
|||
it will still be representing the same Actor on the original node, across the
|
||||
network.
|
||||
|
||||
In the above example the actor was created from the system. It is also possible
|
||||
to create actors from other actors with the actor ``context``. The difference is
|
||||
how the supervisor hierarchy is arranged. When using the context the current actor
|
||||
will be supervisor of the created child actor. When using the system it will be
|
||||
a top level actor, that is supervised by the system (internal guardian actor).
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#context-actorOf
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
|
||||
Creating Actors with non-default constructor
|
||||
--------------------------------------------
|
||||
|
|
@ -81,23 +86,21 @@ Here is an example:
|
|||
.. includecode:: code/ActorDocSpec.scala#creating-constructor
|
||||
|
||||
|
||||
Creating Actors with Props
|
||||
--------------------------
|
||||
|
||||
``Props`` is a configuration object to specify additional things for the actor to
|
||||
be created, such as the ``MessageDispatcher``.
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#creating-props
|
||||
|
||||
|
||||
Creating Actors using anonymous classes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
---------------------------------------
|
||||
|
||||
When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class::
|
||||
When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class.
|
||||
|
||||
def receive = {
|
||||
case m: DoIt =>
|
||||
actorOf(new Actor {
|
||||
def receive = {
|
||||
case DoIt(msg) =>
|
||||
val replyMsg = doSomeDangerousWork(msg)
|
||||
self.reply(replyMsg)
|
||||
self.stop()
|
||||
}
|
||||
def doSomeDangerousWork(msg: Message) = { ... }
|
||||
}).start() ! m
|
||||
}
|
||||
.. includecode:: code/ActorDocSpec.scala#anonymous-actor
|
||||
|
||||
.. warning::
|
||||
|
||||
|
|
@ -108,41 +111,38 @@ When spawning actors for specific sub-tasks from within an actor, it may be conv
|
|||
code will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
there is not yet a way to detect these illegal accesses at compile time.
|
||||
|
||||
Running a block of code asynchronously
|
||||
--------------------------------------
|
||||
|
||||
Here we create a light-weight actor-based thread, that can be used to spawn off
|
||||
a task. Code blocks spawned up like this are always implicitly started, shut
|
||||
down and made eligible for garbage collection. The actor that is created "under
|
||||
the hood" is not reachable from the outside and there is no way of sending
|
||||
messages to it. It being an actor is only an implementation detail. It will only
|
||||
run the block in an event-based thread and exit once the block has run to
|
||||
completion.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
spawn {
|
||||
... // do stuff
|
||||
}
|
||||
|
||||
|
||||
Actor Internal API
|
||||
==================
|
||||
Actor API
|
||||
=========
|
||||
|
||||
The :class:`Actor` trait defines only one abstract method, the above mentioned
|
||||
:meth:`receive`. In addition, it offers two convenience methods
|
||||
:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as
|
||||
described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s
|
||||
:class:`ActorRef` object. If the current actor behavior does not match a
|
||||
received message, :meth:`unhandled` is called, which by default throws an
|
||||
:class:`UnhandledMessageException`.
|
||||
:meth:`receive`, which implements the behavior of the actor.
|
||||
|
||||
If the current actor behavior does not match a received message, :meth:`unhandled`
|
||||
is called, which by default throws an :class:`UnhandledMessageException`.
|
||||
|
||||
In addition, it offers:
|
||||
|
||||
* :obj:`self` reference to the :class:`ActorRef` of the actor
|
||||
* :obj:`sender` reference sender Actor of the last received message, typically used as described in :ref:`Actor.Reply`
|
||||
* :obj:`context` exposes contextual information for the actor and the current message, such as:
|
||||
|
||||
* factory method to create child actors (:meth:`actorOf`)
|
||||
* system that the actor belongs to
|
||||
* parent supervisor
|
||||
* supervised children
|
||||
* hotswap behavior stack as described in :ref:`Actor.HotSwap`
|
||||
|
||||
You can import the members in the :obj:`context` to avoid prefixing access with ``context.``
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#import-context
|
||||
|
||||
The remaining visible methods are user-overridable life-cycle hooks which are
|
||||
described in the following::
|
||||
|
||||
def preStart() {}
|
||||
def preRestart(cause: Throwable, message: Option[Any]) {}
|
||||
def postRestart(cause: Throwable) {}
|
||||
def preRestart(reason: Throwable, message: Option[Any]) { postStop() }
|
||||
def postRestart(reason: Throwable) { preStart() }
|
||||
def postStop() {}
|
||||
|
||||
The implementations shown above are the defaults provided by the :class:`Actor`
|
||||
|
|
@ -156,7 +156,7 @@ Right after starting the actor, its :meth:`preStart` method is invoked.
|
|||
|
||||
::
|
||||
|
||||
override def preStart {
|
||||
override def preStart() {
|
||||
// registering with other actors
|
||||
someService ! Register(self)
|
||||
}
|
||||
|
|
@ -165,9 +165,9 @@ Right after starting the actor, its :meth:`preStart` method is invoked.
|
|||
Restart Hooks
|
||||
-------------
|
||||
|
||||
A supervised actor, i.e. one which is linked to another actor with a fault
|
||||
handling strategy, will be restarted in case an exception is thrown while
|
||||
processing a message. This restart involves four of the hooks mentioned above:
|
||||
All actors are supervised, i.e. linked to another actor with a fault
|
||||
handling strategy. Actors will be restarted in case an exception is thrown while
|
||||
processing a message. This restart involves the hooks mentioned above:
|
||||
|
||||
1. The old actor is informed by calling :meth:`preRestart` with the exception
|
||||
which caused the restart and the message which triggered that exception; the
|
||||
|
|
@ -175,17 +175,18 @@ processing a message. This restart involves four of the hooks mentioned above:
|
|||
message, e.g. when a supervisor does not trap the exception and is restarted
|
||||
in turn by its supervisor. This method is the best place for cleaning up,
|
||||
preparing hand-over to the fresh actor instance, etc.
|
||||
2. The initial factory from the ``Actor.actorOf`` call is used
|
||||
By default it calls :meth:`postStop`.
|
||||
2. The initial factory from the ``actorOf`` call is used
|
||||
to produce the fresh instance.
|
||||
3. The new actor’s :meth:`preStart` method is invoked, just as in the normal
|
||||
start-up case.
|
||||
4. The new actor’s :meth:`postRestart` method is called with the exception
|
||||
which caused the restart.
|
||||
3. The new actor’s :meth:`postRestart` method is invoked with the exception
|
||||
which caused the restart. By default the :meth:`preStart`
|
||||
is called, just as in the normal start-up case.
|
||||
|
||||
|
||||
An actor restart replaces only the actual actor object; the contents of the
|
||||
mailbox and the hotswap stack are unaffected by the restart, so processing of
|
||||
messages will resume after the :meth:`postRestart` hook returns. Any message
|
||||
messages will resume after the :meth:`postRestart` hook returns. The message
|
||||
that triggered the exception will not be received again. Any message
|
||||
sent to an actor while it is being restarted will be queued to its mailbox as
|
||||
usual.
|
||||
|
||||
|
|
@ -194,16 +195,15 @@ Stop Hook
|
|||
|
||||
After stopping an actor, its :meth:`postStop` hook is called, which may be used
|
||||
e.g. for deregistering this actor from other services. This hook is guaranteed
|
||||
to run after message queuing has been disabled for this actor, i.e. sending
|
||||
messages would fail with an :class:`IllegalActorStateException`.
|
||||
to run after message queuing has been disabled for this actor, i.e. messages
|
||||
sent to a stopped actor will be redirected to the :obj:`deadLetters` of the
|
||||
:obj:`ActorSystem`.
|
||||
|
||||
|
||||
Identifying Actors
|
||||
==================
|
||||
|
||||
An actor is identified by its address. If no address is associated with an actor
|
||||
then a unique identifier is used instead. The address of an actor can be
|
||||
accessed using ``self.address``.
|
||||
FIXME Actor Path documentation
|
||||
|
||||
|
||||
Messages and immutability
|
||||
|
|
@ -236,20 +236,9 @@ Send messages
|
|||
Messages are sent to an Actor through one of the following methods.
|
||||
|
||||
* ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return
|
||||
immediately.
|
||||
immediately. Also know as ``tell``.
|
||||
* ``?`` sends a message asynchronously and returns a :class:`Future`
|
||||
representing a possible reply.
|
||||
|
||||
.. note::
|
||||
|
||||
There used to be two more “bang” methods, which are now removed in Akka 2.0:
|
||||
|
||||
* ``!!`` was similar to the current ``(actor ? msg).as[T]``; deprecation
|
||||
followed from the change of timeout handling described below.
|
||||
* ``!!![T]`` was similar to the current ``(actor ? msg).mapTo[T]``, with the
|
||||
same change in the handling of :class:`Future`’s timeout as for ``!!``, but
|
||||
additionally the old method could defer possible type cast problems into
|
||||
seemingly unrelated parts of the code base.
|
||||
representing a possible reply. Also know as ``ask``.
|
||||
|
||||
Message ordering is guaranteed on a per-sender basis.
|
||||
|
||||
|
|
@ -265,13 +254,11 @@ message. This gives the best concurrency and scalability characteristics.
|
|||
|
||||
If invoked from within an Actor, then the sending actor reference will be
|
||||
implicitly passed along with the message and available to the receiving Actor
|
||||
in its ``channel: UntypedChannel`` member field. The target actor can use this
|
||||
to reply to the original sender, e.g. by using the ``self.reply(message: Any)``
|
||||
method.
|
||||
in its ``sender: ActorRef`` member field. The target actor can use this
|
||||
to reply to the original sender, by using ``sender ! replyMsg``.
|
||||
|
||||
If invoked from an instance that is **not** an Actor there will be no implicit
|
||||
sender passed along with the message and you will get an
|
||||
IllegalActorStateException when calling ``self.reply(...)``.
|
||||
If invoked from an instance that is **not** an Actor the sender will be
|
||||
:obj:`deadLetters` actor reference by default.
|
||||
|
||||
Send-And-Receive-Future
|
||||
-----------------------
|
||||
|
|
@ -284,25 +271,49 @@ will return a :class:`Future`:
|
|||
val future = actor ? "hello"
|
||||
|
||||
The receiving actor should reply to this message, which will complete the
|
||||
future with the reply message as value; if the actor throws an exception while
|
||||
processing the invocation, this exception will also complete the future. If the
|
||||
actor does not complete the future, it will expire after the timeout period,
|
||||
which is taken from one of the following three locations in order of
|
||||
precedence:
|
||||
future with the reply message as value; ``sender ! result``.
|
||||
|
||||
To complete the future with an exception you need send a Failure message to the sender.
|
||||
This is not done automatically when an actor throws an exception while processing a
|
||||
message.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
try {
|
||||
operation()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
sender ! akka.actor.Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
|
||||
If the actor does not complete the future, it will expire after the timeout period,
|
||||
which is taken from one of the following locations in order of precedence:
|
||||
|
||||
#. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)``
|
||||
#. implicit argument of type :class:`Actor.Timeout`, e.g.
|
||||
#. implicit argument of type :class:`akka.actor.Timeout`, e.g.
|
||||
|
||||
::
|
||||
|
||||
implicit val timeout = Actor.Timeout(12 millis)
|
||||
val future = actor ? "hello"
|
||||
import akka.actor.Timeout
|
||||
import akka.util.duration._
|
||||
|
||||
#. default timeout from ``akka.conf``
|
||||
implicit val timeout = Timeout(12 millis)
|
||||
val future = actor ? "hello"
|
||||
|
||||
See :ref:`futures-scala` for more information on how to await or query a
|
||||
future.
|
||||
|
||||
.. warning::
|
||||
|
||||
When using future callbacks, such as ``onComplete``, ``onResult``, and ``onTimeout``,
|
||||
inside actors you need to carefully avoid closing over the containing actor’s
|
||||
reference, i.e. do not call methods or access mutable state on the enclosing actor
|
||||
from within the callback. This would break the actor encapsulation and may
|
||||
introduce synchronization bugs and race conditions because the callback
|
||||
will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
there is not yet a way to detect these illegal accesses at compile time.
|
||||
|
||||
Send-And-Receive-Eventually
|
||||
---------------------------
|
||||
|
||||
|
|
@ -321,17 +332,7 @@ type, it will throw the exception or a :class:`ClassCastException` (if you want
|
|||
to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In
|
||||
case of a timeout, :obj:`None` is returned.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
(actor ? msg).as[String] match {
|
||||
case Some(answer) => ...
|
||||
case None => ...
|
||||
}
|
||||
|
||||
val resultOption = (actor ? msg).as[String]
|
||||
if (resultOption.isDefined) ... else ...
|
||||
|
||||
for (x <- (actor ? msg).as[Int]) yield { 2 * x }
|
||||
.. includecode:: code/ActorDocSpec.scala#using-ask
|
||||
|
||||
Forward message
|
||||
---------------
|
||||
|
|
@ -363,25 +364,15 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause
|
|||
which the message can be matched against the different case clauses using Scala
|
||||
pattern matching. Here is an example:
|
||||
|
||||
.. code-block:: scala
|
||||
.. includecode:: code/ActorDocSpec.scala
|
||||
:include: imports1,my-actor
|
||||
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
log.info("Received 'Hello'")
|
||||
|
||||
case _ =>
|
||||
throw new RuntimeException("unknown message")
|
||||
}
|
||||
}
|
||||
|
||||
.. _Actor.Reply:
|
||||
|
||||
Reply to messages
|
||||
=================
|
||||
|
||||
Reply using the sender
|
||||
----------------------
|
||||
|
||||
If you want to have a handle for replying to a message, you can use
|
||||
``sender``, which gives you an ActorRef. You can reply by sending to
|
||||
that ActorRef with ``sender ! Message``. You can also store the ActorRef
|
||||
|
|
@ -403,16 +394,7 @@ received within a certain time. To receive this timeout you have to set the
|
|||
``receiveTimeout`` property and declare a case handing the ReceiveTimeout
|
||||
object.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
context.receiveTimeout = Some(30000L) // 30 seconds
|
||||
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
log.info("Received 'Hello'")
|
||||
case ReceiveTimeout =>
|
||||
throw new RuntimeException("received timeout")
|
||||
}
|
||||
.. includecode:: code/ActorDocSpec.scala#receive-timeout
|
||||
|
||||
Starting actors
|
||||
===============
|
||||
|
|
@ -438,12 +420,19 @@ add initialization code for the actor.
|
|||
Stopping actors
|
||||
===============
|
||||
|
||||
Actors are stopped by invoking the ``stop`` method.
|
||||
Actors are stopped by invoking the ``stop`` method of the ``ActorRef``.
|
||||
The actual termination of the actor is performed asynchronously, i.e.
|
||||
``stop`` may return before the actor is stopped.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
actor.stop()
|
||||
|
||||
Processing of the current message, if any, will continue before the actor is stopped,
|
||||
but additional messages in the mailbox will not be processed. By default these
|
||||
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
|
||||
depends on the mailbox implementation.
|
||||
|
||||
When stop is called then a call to the ``def postStop`` callback method will
|
||||
take place. The ``Actor`` can use this callback to implement shutdown behavior.
|
||||
|
||||
|
|
@ -458,7 +447,9 @@ PoisonPill
|
|||
==========
|
||||
|
||||
You can also send an actor the ``akka.actor.PoisonPill`` message, which will
|
||||
stop the actor when the message is processed.
|
||||
stop the actor when the message is processed. ``PoisonPill`` is enqueued as
|
||||
ordinary messages and will be handled after messages that were already queued
|
||||
in the mailbox.
|
||||
|
||||
If the sender is a ``Future`` (e.g. the message is sent with ``?``), the
|
||||
``Future`` will be completed with an
|
||||
|
|
@ -486,22 +477,7 @@ pushed and popped.
|
|||
|
||||
To hotswap the Actor behavior using ``become``:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
def angry: Receive = {
|
||||
case "foo" => context reply "I am already angry?"
|
||||
case "bar" => become(happy)
|
||||
}
|
||||
|
||||
def happy: Receive = {
|
||||
case "bar" => context reply "I am already happy :-)"
|
||||
case "foo" => become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" => become(angry)
|
||||
case "bar" => become(happy)
|
||||
}
|
||||
.. includecode:: code/ActorDocSpec.scala#hot-swap-actor
|
||||
|
||||
The ``become`` method is useful for many different things, but a particular nice
|
||||
example of it is in example where it is used to implement a Finite State Machine
|
||||
|
|
@ -511,35 +487,12 @@ example of it is in example where it is used to implement a Finite State Machine
|
|||
|
||||
Here is another little cute example of ``become`` and ``unbecome`` in action:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
case object Swap
|
||||
class Swapper extends Actor {
|
||||
def receive = {
|
||||
case Swap =>
|
||||
println("Hi")
|
||||
become {
|
||||
case Swap =>
|
||||
println("Ho")
|
||||
unbecome() // resets the latest 'become' (just for fun)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val swap = actorOf[Swapper]
|
||||
|
||||
swap ! Swap // prints Hi
|
||||
swap ! Swap // prints Ho
|
||||
swap ! Swap // prints Hi
|
||||
swap ! Swap // prints Ho
|
||||
swap ! Swap // prints Hi
|
||||
swap ! Swap // prints Ho
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#swapper
|
||||
|
||||
Encoding Scala Actors nested receives without accidentally leaking memory
|
||||
-------------------------------------------------------------------------
|
||||
|
||||
See this `Unnested receive example <https://gist.github.com/797035>`_.
|
||||
See this `Unnested receive example <http://github.com/jboner/akka/blob/master/akka/akka-docs/scala/code/UnnestedReceives.scala>`_.
|
||||
|
||||
|
||||
Downgrade
|
||||
|
|
@ -555,7 +508,7 @@ Here's how you use the ``unbecome`` method:
|
|||
|
||||
.. code-block:: scala
|
||||
|
||||
def receive: Receive = {
|
||||
def receive = {
|
||||
case "revert" => unbecome()
|
||||
}
|
||||
|
||||
|
|
@ -601,16 +554,11 @@ messages on that mailbox, will be there as well.
|
|||
What happens to the actor
|
||||
-------------------------
|
||||
|
||||
If an exception is thrown and the actor is supervised, the actor object itself
|
||||
is discarded and a new instance is created. This new instance will now be used
|
||||
in the actor references to this actor (so this is done invisible to the
|
||||
developer).
|
||||
|
||||
If the actor is _not_ supervised, but its lifeCycle is set to Permanent
|
||||
(default), it will just keep on processing messages as if nothing had happened.
|
||||
|
||||
If the actor is _not_ supervised, but its lifeCycle is set to Temporary, it will
|
||||
be stopped immediately.
|
||||
If an exception is thrown, the actor instance is discarded and a new instance is
|
||||
created. This new instance will now be used in the actor references to this actor
|
||||
(so this is done invisible to the developer). Note that this means that current
|
||||
state of the failing actor instance is lost if you don't store and restore it in
|
||||
``preRestart`` and ``postRestart`` callbacks.
|
||||
|
||||
|
||||
Extending Actors using PartialFunction chaining
|
||||
|
|
@ -620,32 +568,4 @@ A bit advanced but very useful way of defining a base message handler and then
|
|||
extend that, either through inheritance or delegation, is to use
|
||||
``PartialFunction.orElse`` chaining.
|
||||
|
||||
In generic base Actor:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import akka.actor.Actor.Receive
|
||||
|
||||
abstract class GenericActor extends Actor {
|
||||
// to be defined in subclassing actor
|
||||
def specificMessageHandler: Receive
|
||||
|
||||
// generic message handler
|
||||
def genericMessageHandler: Receive = {
|
||||
case event => printf("generic: %s\n", event)
|
||||
}
|
||||
|
||||
def receive = specificMessageHandler orElse genericMessageHandler
|
||||
}
|
||||
|
||||
In subclassing Actor:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class SpecificActor extends GenericActor {
|
||||
def specificMessageHandler = {
|
||||
case event: MyMsg => printf("specific: %s\n", event.subject)
|
||||
}
|
||||
}
|
||||
|
||||
case class MyMsg(subject: String)
|
||||
.. includecode:: code/ActorDocSpec.scala#receive-orElse
|
||||
|
|
|
|||
|
|
@ -1,16 +1,19 @@
|
|||
package akka.docs.stm
|
||||
package akka.docs.actor
|
||||
|
||||
//#imports1
|
||||
import akka.actor.Actor
|
||||
import akka.event.Logging
|
||||
//#imports1
|
||||
|
||||
//#imports2
|
||||
import akka.actor.ActorSystem
|
||||
//#imports2
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
//#imports
|
||||
import akka.actor.Actor
|
||||
import akka.event.Logging
|
||||
|
||||
//#imports
|
||||
|
||||
//#my-actor
|
||||
class MyActor extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
|
|
@ -21,12 +24,110 @@ class MyActor extends Actor {
|
|||
}
|
||||
//#my-actor
|
||||
|
||||
case class DoIt(msg: Message)
|
||||
case class Message(s: String)
|
||||
|
||||
//#context-actorOf
|
||||
class FirstActor extends Actor {
|
||||
val myActor = context.actorOf[MyActor]
|
||||
//#context-actorOf
|
||||
//#anonymous-actor
|
||||
def receive = {
|
||||
case m: DoIt ⇒
|
||||
context.actorOf(new Actor {
|
||||
def receive = {
|
||||
case DoIt(msg) ⇒
|
||||
val replyMsg = doSomeDangerousWork(msg)
|
||||
sender ! replyMsg
|
||||
self.stop()
|
||||
}
|
||||
def doSomeDangerousWork(msg: Message): String = { "done" }
|
||||
}) ! m
|
||||
|
||||
case replyMsg: String ⇒ sender ! replyMsg
|
||||
}
|
||||
//#anonymous-actor
|
||||
}
|
||||
|
||||
//#system-actorOf
|
||||
object Main extends App {
|
||||
val system = ActorSystem("MySystem")
|
||||
val myActor = system.actorOf[FirstActor]
|
||||
//#system-actorOf
|
||||
}
|
||||
//#swapper
|
||||
case object Swap
|
||||
class Swapper extends Actor {
|
||||
import context._
|
||||
val log = Logging(system, this)
|
||||
|
||||
def receive = {
|
||||
case Swap ⇒
|
||||
log.info("Hi")
|
||||
become {
|
||||
case Swap ⇒
|
||||
log.info("Ho")
|
||||
unbecome() // resets the latest 'become' (just for fun)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SwapperApp extends App {
|
||||
val system = ActorSystem("SwapperSystem")
|
||||
val swap = system.actorOf[Swapper]
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
}
|
||||
//#swapper
|
||||
|
||||
//#receive-orElse
|
||||
import akka.actor.Actor.Receive
|
||||
|
||||
abstract class GenericActor extends Actor {
|
||||
// to be defined in subclassing actor
|
||||
def specificMessageHandler: Receive
|
||||
|
||||
// generic message handler
|
||||
def genericMessageHandler: Receive = {
|
||||
case event ⇒ printf("generic: %s\n", event)
|
||||
}
|
||||
|
||||
def receive = specificMessageHandler orElse genericMessageHandler
|
||||
}
|
||||
|
||||
class SpecificActor extends GenericActor {
|
||||
def specificMessageHandler = {
|
||||
case event: MyMsg ⇒ printf("specific: %s\n", event.subject)
|
||||
}
|
||||
}
|
||||
|
||||
case class MyMsg(subject: String)
|
||||
//#receive-orElse
|
||||
|
||||
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||
|
||||
"import context" in {
|
||||
//#import-context
|
||||
class FirstActor extends Actor {
|
||||
import context._
|
||||
val myActor = actorOf[MyActor]
|
||||
def receive = {
|
||||
case x ⇒ myActor ! x
|
||||
}
|
||||
}
|
||||
//#import-context
|
||||
|
||||
val first = system.actorOf(new FirstActor)
|
||||
first.stop()
|
||||
|
||||
}
|
||||
|
||||
"creating actor with AkkaSpec.actorOf" in {
|
||||
//#creating-actorOf
|
||||
val myActor = system.actorOf[MyActor]
|
||||
//#creating-actorOf
|
||||
|
||||
// testing the actor
|
||||
|
||||
|
|
@ -62,4 +163,73 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
|
||||
myActor.stop()
|
||||
}
|
||||
|
||||
"creating actor with Props" in {
|
||||
//#creating-props
|
||||
import akka.actor.Props
|
||||
val dispatcher = system.dispatcherFactory.fromConfig("my-dispatcher")
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
||||
//#creating-props
|
||||
|
||||
myActor.stop()
|
||||
}
|
||||
|
||||
"using ask" in {
|
||||
//#using-ask
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case x: String ⇒ sender ! x.toUpperCase
|
||||
case n: Int ⇒ sender ! (n + 1)
|
||||
}
|
||||
}
|
||||
|
||||
val myActor = system.actorOf(new MyActor)
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = myActor ? "hello"
|
||||
future.as[String] match {
|
||||
case Some(answer) ⇒ //...
|
||||
case None ⇒ //...
|
||||
}
|
||||
val result: Option[Int] = for (x ← (myActor ? 3).as[Int]) yield { 2 * x }
|
||||
//#using-ask
|
||||
|
||||
myActor.stop()
|
||||
}
|
||||
|
||||
"using receiveTimeout" in {
|
||||
//#receive-timeout
|
||||
import akka.actor.ReceiveTimeout
|
||||
import akka.util.duration._
|
||||
class MyActor extends Actor {
|
||||
context.receiveTimeout = Some(30 seconds)
|
||||
def receive = {
|
||||
case "Hello" ⇒ //...
|
||||
case ReceiveTimeout ⇒ throw new RuntimeException("received timeout")
|
||||
}
|
||||
}
|
||||
//#receive-timeout
|
||||
}
|
||||
|
||||
"using hot-swap" in {
|
||||
//#hot-swap-actor
|
||||
class HotSwapActor extends Actor {
|
||||
import context._
|
||||
def angry: Receive = {
|
||||
case "foo" ⇒ sender ! "I am already angry?"
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
|
||||
def happy: Receive = {
|
||||
case "bar" ⇒ sender ! "I am already happy :-)"
|
||||
case "foo" ⇒ become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" ⇒ become(angry)
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
}
|
||||
//#hot-swap-actor
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
48
akka-docs/scala/code/UnnestedReceives.scala
Normal file
48
akka-docs/scala/code/UnnestedReceives.scala
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
package akka.docs.actor
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
/**
|
||||
* Requirements are as follows:
|
||||
* The first thing the actor needs to do, is to subscribe to a channel of events,
|
||||
* Then it must replay (process) all "old" events
|
||||
* Then it has to wait for a GoAhead signal to begin processing the new events
|
||||
* It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal
|
||||
*/
|
||||
class UnnestedReceives extends Actor {
|
||||
import context.become
|
||||
//If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)]
|
||||
val queue = new ListBuffer[Any]()
|
||||
|
||||
//This message processes a message/event
|
||||
def process(msg: Any): Unit = println("processing: " + msg)
|
||||
//This method subscribes the actor to the event bus
|
||||
def subscribe() {} //Your external stuff
|
||||
//This method retrieves all prior messages/events
|
||||
def allOldMessages() = List()
|
||||
|
||||
override def preStart {
|
||||
//We override preStart to be sure that the first message the actor gets is
|
||||
//'Replay, that message will start to be processed _after_ the actor is started
|
||||
self ! 'Replay
|
||||
//Then we subscribe to the stream of messages/events
|
||||
subscribe()
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case 'Replay ⇒ //Our first message should be a 'Replay message, all others are invalid
|
||||
allOldMessages() foreach process //Process all old messages/events
|
||||
become { //Switch behavior to look for the GoAhead signal
|
||||
case 'GoAhead ⇒ //When we get the GoAhead signal we process all our buffered messages/events
|
||||
queue foreach process
|
||||
queue.clear
|
||||
become { //Then we change behaviour to process incoming messages/events as they arrive
|
||||
case msg ⇒ process(msg)
|
||||
}
|
||||
case msg ⇒ //While we haven't gotten the GoAhead signal, buffer all incoming messages
|
||||
queue += msg //Here you have full control, you can handle overflow etc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@ class RemoteExtensionSettings(val config: Config, val systemName: String) extend
|
|||
val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
|
||||
val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
|
||||
|
||||
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
|
||||
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
|
||||
val ClusterName = getString("akka.cluster.name")
|
||||
val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_, systemName))
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
|
|||
getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000)
|
||||
getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000)
|
||||
|
||||
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
|
||||
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
|
||||
//akka.cluster
|
||||
getString("akka.cluster.name") must equal("default-cluster")
|
||||
getString("akka.cluster.nodename") must equal("node1")
|
||||
|
|
|
|||
|
|
@ -5,14 +5,11 @@
|
|||
package akka
|
||||
|
||||
import sbt._
|
||||
import Keys._
|
||||
|
||||
import sbt.Keys._
|
||||
import com.typesafe.sbtmultijvm.MultiJvmPlugin
|
||||
import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions }
|
||||
import com.typesafe.sbtscalariform.ScalariformPlugin
|
||||
|
||||
import MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions }
|
||||
import ScalariformPlugin.{ format, formatPreferences, formatSourceDirectories }
|
||||
|
||||
import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys
|
||||
import java.lang.Boolean.getBoolean
|
||||
|
||||
object AkkaBuild extends Build {
|
||||
|
|
@ -263,7 +260,7 @@ object AkkaBuild extends Build {
|
|||
settings = defaultSettings ++ Seq(
|
||||
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
|
||||
libraryDependencies ++= Dependencies.docs,
|
||||
formatSourceDirectories in Test <<= unmanagedSourceDirectories in Test
|
||||
unmanagedSourceDirectories in ScalariformKeys.format in Test <<= unmanagedSourceDirectories in Test
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -335,9 +332,9 @@ object AkkaBuild extends Build {
|
|||
testOptions in Test += Tests.Argument("-oF")
|
||||
)
|
||||
|
||||
lazy val formatSettings = ScalariformPlugin.settings ++ Seq(
|
||||
formatPreferences in Compile := formattingPreferences,
|
||||
formatPreferences in Test := formattingPreferences
|
||||
lazy val formatSettings = ScalariformPlugin.scalariformSettings ++ Seq(
|
||||
ScalariformKeys.preferences in Compile := formattingPreferences,
|
||||
ScalariformKeys.preferences in Test := formattingPreferences
|
||||
)
|
||||
|
||||
def formattingPreferences = {
|
||||
|
|
@ -348,9 +345,9 @@ object AkkaBuild extends Build {
|
|||
.setPreference(AlignSingleLineCaseStatements, true)
|
||||
}
|
||||
|
||||
lazy val multiJvmSettings = MultiJvmPlugin.settings ++ inConfig(MultiJvm)(ScalariformPlugin.formatSettings) ++ Seq(
|
||||
compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (format in MultiJvm),
|
||||
formatPreferences in MultiJvm := formattingPreferences
|
||||
lazy val multiJvmSettings = MultiJvmPlugin.settings ++ inConfig(MultiJvm)(ScalariformPlugin.scalariformSettings) ++ Seq(
|
||||
compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (ScalariformKeys.format in MultiJvm),
|
||||
ScalariformKeys.preferences in MultiJvm := formattingPreferences
|
||||
)
|
||||
|
||||
// reStructuredText docs
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ object Unidoc {
|
|||
}
|
||||
|
||||
def unidocTask: Initialize[Task[File]] = {
|
||||
(compilers, cacheDirectory, unidocSources, unidocClasspath, unidocDirectory, scaladocOptions in Compile, streams) map {
|
||||
(compilers, cacheDirectory, unidocSources, unidocClasspath, unidocDirectory, scalacOptions in doc, streams) map {
|
||||
(compilers, cache, sources, classpath, target, options, s) => {
|
||||
val scaladoc = new Scaladoc(100, compilers.scalac)
|
||||
scaladoc.cached(cache / "unidoc", "main", sources, classpath, target, options, s.log)
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
sbt.version=0.11.0
|
||||
sbt.version=0.11.2
|
||||
|
|
|
|||
|
|
@ -3,10 +3,11 @@ resolvers += Classpaths.typesafeResolver
|
|||
|
||||
addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.7")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbtscalariform" % "sbt-scalariform" % "0.1.4")
|
||||
addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.3.1")
|
||||
|
||||
resolvers ++= Seq(
|
||||
"less is" at "http://repo.lessis.me",
|
||||
"coda" at "http://repo.codahale.com")
|
||||
|
||||
addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.0")
|
||||
addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.1")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue