Merge pull request #137 from jboner/wip-1435-doc-scala-actors-patriknw
Updated documentation of Actors Scala. See #1435
This commit is contained in:
commit
738857ccc2
7 changed files with 378 additions and 253 deletions
|
|
@ -28,17 +28,12 @@ its syntax from Erlang.
|
|||
Creating Actors
|
||||
===============
|
||||
|
||||
Actors can be created either by:
|
||||
|
||||
* Extending the Actor class and implementing the receive method.
|
||||
* Create an anonymous actor using one of the actor methods.
|
||||
|
||||
|
||||
Defining an Actor class
|
||||
-----------------------
|
||||
|
||||
Actor classes are implemented by extending the Actor class and implementing the
|
||||
``receive`` method. The ``receive`` method should define a series of case
|
||||
:meth:`receive` method. The :meth:`receive` method should define a series of case
|
||||
statements (which has the type ``PartialFunction[Any, Unit]``) that defines
|
||||
which messages your Actor can handle, using standard Scala pattern matching,
|
||||
along with the implementation of how the messages should be processed.
|
||||
|
|
@ -46,21 +41,22 @@ along with the implementation of how the messages should be processed.
|
|||
Here is an example:
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala
|
||||
:include: imports,my-actor
|
||||
:include: imports1,my-actor
|
||||
|
||||
Please note that the Akka Actor ``receive`` message loop is exhaustive, which is
|
||||
different compared to Erlang and Scala Actors. This means that you need to
|
||||
provide a pattern match for all messages that it can accept and if you want to
|
||||
be able to handle unknown messages then you need to have a default case as in
|
||||
the example above.
|
||||
|
||||
the example above. Otherwise an ``UnhandledMessageException`` will be
|
||||
thrown and the actor is restarted when an unknown message is received.
|
||||
|
||||
Creating Actors
|
||||
---------------
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#creating-actorOf
|
||||
.. includecode:: code/ActorDocSpec.scala
|
||||
:include: imports2,system-actorOf
|
||||
|
||||
The call to ``actorOf`` returns an instance of ``ActorRef``. This is a handle to
|
||||
The call to :meth:`actorOf` returns an instance of ``ActorRef``. This is a handle to
|
||||
the ``Actor`` instance which you can use to interact with the ``Actor``. The
|
||||
``ActorRef`` is immutable and has a one to one relationship with the Actor it
|
||||
represents. The ``ActorRef`` is also serializable and network-aware. This means
|
||||
|
|
@ -68,6 +64,15 @@ that you can serialize it, send it over the wire and use it on a remote host and
|
|||
it will still be representing the same Actor on the original node, across the
|
||||
network.
|
||||
|
||||
In the above example the actor was created from the system. It is also possible
|
||||
to create actors from other actors with the actor ``context``. The difference is
|
||||
how the supervisor hierarchy is arranged. When using the context the current actor
|
||||
will be supervisor of the created child actor. When using the system it will be
|
||||
a top level actor, that is supervised by the system (internal guardian actor).
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#context-actorOf
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
|
||||
Creating Actors with non-default constructor
|
||||
--------------------------------------------
|
||||
|
|
@ -81,23 +86,21 @@ Here is an example:
|
|||
.. includecode:: code/ActorDocSpec.scala#creating-constructor
|
||||
|
||||
|
||||
Creating Actors with Props
|
||||
--------------------------
|
||||
|
||||
``Props`` is a configuration object to specify additional things for the actor to
|
||||
be created, such as the ``MessageDispatcher``.
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#creating-props
|
||||
|
||||
|
||||
Creating Actors using anonymous classes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
---------------------------------------
|
||||
|
||||
When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class::
|
||||
When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class.
|
||||
|
||||
def receive = {
|
||||
case m: DoIt =>
|
||||
actorOf(new Actor {
|
||||
def receive = {
|
||||
case DoIt(msg) =>
|
||||
val replyMsg = doSomeDangerousWork(msg)
|
||||
self.reply(replyMsg)
|
||||
self.stop()
|
||||
}
|
||||
def doSomeDangerousWork(msg: Message) = { ... }
|
||||
}).start() ! m
|
||||
}
|
||||
.. includecode:: code/ActorDocSpec.scala#anonymous-actor
|
||||
|
||||
.. warning::
|
||||
|
||||
|
|
@ -108,41 +111,38 @@ When spawning actors for specific sub-tasks from within an actor, it may be conv
|
|||
code will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
there is not yet a way to detect these illegal accesses at compile time.
|
||||
|
||||
Running a block of code asynchronously
|
||||
--------------------------------------
|
||||
|
||||
Here we create a light-weight actor-based thread, that can be used to spawn off
|
||||
a task. Code blocks spawned up like this are always implicitly started, shut
|
||||
down and made eligible for garbage collection. The actor that is created "under
|
||||
the hood" is not reachable from the outside and there is no way of sending
|
||||
messages to it. It being an actor is only an implementation detail. It will only
|
||||
run the block in an event-based thread and exit once the block has run to
|
||||
completion.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
spawn {
|
||||
... // do stuff
|
||||
}
|
||||
|
||||
|
||||
Actor Internal API
|
||||
==================
|
||||
Actor API
|
||||
=========
|
||||
|
||||
The :class:`Actor` trait defines only one abstract method, the above mentioned
|
||||
:meth:`receive`. In addition, it offers two convenience methods
|
||||
:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as
|
||||
described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s
|
||||
:class:`ActorRef` object. If the current actor behavior does not match a
|
||||
received message, :meth:`unhandled` is called, which by default throws an
|
||||
:class:`UnhandledMessageException`.
|
||||
:meth:`receive`, which implements the behavior of the actor.
|
||||
|
||||
If the current actor behavior does not match a received message, :meth:`unhandled`
|
||||
is called, which by default throws an :class:`UnhandledMessageException`.
|
||||
|
||||
In addition, it offers:
|
||||
|
||||
* :obj:`self` reference to the :class:`ActorRef` of the actor
|
||||
* :obj:`sender` reference sender Actor of the last received message, typically used as described in :ref:`Actor.Reply`
|
||||
* :obj:`context` exposes contextual information for the actor and the current message, such as:
|
||||
|
||||
* factory method to create child actors (:meth:`actorOf`)
|
||||
* system that the actor belongs to
|
||||
* parent supervisor
|
||||
* supervised children
|
||||
* hotswap behavior stack as described in :ref:`Actor.HotSwap`
|
||||
|
||||
You can import the members in the :obj:`context` to avoid prefixing access with ``context.``
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#import-context
|
||||
|
||||
The remaining visible methods are user-overridable life-cycle hooks which are
|
||||
described in the following::
|
||||
|
||||
def preStart() {}
|
||||
def preRestart(cause: Throwable, message: Option[Any]) {}
|
||||
def postRestart(cause: Throwable) {}
|
||||
def preRestart(reason: Throwable, message: Option[Any]) { postStop() }
|
||||
def postRestart(reason: Throwable) { preStart() }
|
||||
def postStop() {}
|
||||
|
||||
The implementations shown above are the defaults provided by the :class:`Actor`
|
||||
|
|
@ -156,7 +156,7 @@ Right after starting the actor, its :meth:`preStart` method is invoked.
|
|||
|
||||
::
|
||||
|
||||
override def preStart {
|
||||
override def preStart() {
|
||||
// registering with other actors
|
||||
someService ! Register(self)
|
||||
}
|
||||
|
|
@ -165,9 +165,9 @@ Right after starting the actor, its :meth:`preStart` method is invoked.
|
|||
Restart Hooks
|
||||
-------------
|
||||
|
||||
A supervised actor, i.e. one which is linked to another actor with a fault
|
||||
handling strategy, will be restarted in case an exception is thrown while
|
||||
processing a message. This restart involves four of the hooks mentioned above:
|
||||
All actors are supervised, i.e. linked to another actor with a fault
|
||||
handling strategy. Actors will be restarted in case an exception is thrown while
|
||||
processing a message. This restart involves the hooks mentioned above:
|
||||
|
||||
1. The old actor is informed by calling :meth:`preRestart` with the exception
|
||||
which caused the restart and the message which triggered that exception; the
|
||||
|
|
@ -175,17 +175,18 @@ processing a message. This restart involves four of the hooks mentioned above:
|
|||
message, e.g. when a supervisor does not trap the exception and is restarted
|
||||
in turn by its supervisor. This method is the best place for cleaning up,
|
||||
preparing hand-over to the fresh actor instance, etc.
|
||||
2. The initial factory from the ``Actor.actorOf`` call is used
|
||||
By default it calls :meth:`postStop`.
|
||||
2. The initial factory from the ``actorOf`` call is used
|
||||
to produce the fresh instance.
|
||||
3. The new actor’s :meth:`preStart` method is invoked, just as in the normal
|
||||
start-up case.
|
||||
4. The new actor’s :meth:`postRestart` method is called with the exception
|
||||
which caused the restart.
|
||||
3. The new actor’s :meth:`postRestart` method is invoked with the exception
|
||||
which caused the restart. By default the :meth:`preStart`
|
||||
is called, just as in the normal start-up case.
|
||||
|
||||
|
||||
An actor restart replaces only the actual actor object; the contents of the
|
||||
mailbox and the hotswap stack are unaffected by the restart, so processing of
|
||||
messages will resume after the :meth:`postRestart` hook returns. Any message
|
||||
messages will resume after the :meth:`postRestart` hook returns. The message
|
||||
that triggered the exception will not be received again. Any message
|
||||
sent to an actor while it is being restarted will be queued to its mailbox as
|
||||
usual.
|
||||
|
||||
|
|
@ -194,16 +195,15 @@ Stop Hook
|
|||
|
||||
After stopping an actor, its :meth:`postStop` hook is called, which may be used
|
||||
e.g. for deregistering this actor from other services. This hook is guaranteed
|
||||
to run after message queuing has been disabled for this actor, i.e. sending
|
||||
messages would fail with an :class:`IllegalActorStateException`.
|
||||
to run after message queuing has been disabled for this actor, i.e. messages
|
||||
sent to a stopped actor will be redirected to the :obj:`deadLetters` of the
|
||||
:obj:`ActorSystem`.
|
||||
|
||||
|
||||
Identifying Actors
|
||||
==================
|
||||
|
||||
An actor is identified by its address. If no address is associated with an actor
|
||||
then a unique identifier is used instead. The address of an actor can be
|
||||
accessed using ``self.address``.
|
||||
FIXME Actor Path documentation
|
||||
|
||||
|
||||
Messages and immutability
|
||||
|
|
@ -236,20 +236,9 @@ Send messages
|
|||
Messages are sent to an Actor through one of the following methods.
|
||||
|
||||
* ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return
|
||||
immediately.
|
||||
immediately. Also know as ``tell``.
|
||||
* ``?`` sends a message asynchronously and returns a :class:`Future`
|
||||
representing a possible reply.
|
||||
|
||||
.. note::
|
||||
|
||||
There used to be two more “bang” methods, which are now removed in Akka 2.0:
|
||||
|
||||
* ``!!`` was similar to the current ``(actor ? msg).as[T]``; deprecation
|
||||
followed from the change of timeout handling described below.
|
||||
* ``!!![T]`` was similar to the current ``(actor ? msg).mapTo[T]``, with the
|
||||
same change in the handling of :class:`Future`’s timeout as for ``!!``, but
|
||||
additionally the old method could defer possible type cast problems into
|
||||
seemingly unrelated parts of the code base.
|
||||
representing a possible reply. Also know as ``ask``.
|
||||
|
||||
Message ordering is guaranteed on a per-sender basis.
|
||||
|
||||
|
|
@ -265,13 +254,11 @@ message. This gives the best concurrency and scalability characteristics.
|
|||
|
||||
If invoked from within an Actor, then the sending actor reference will be
|
||||
implicitly passed along with the message and available to the receiving Actor
|
||||
in its ``channel: UntypedChannel`` member field. The target actor can use this
|
||||
to reply to the original sender, e.g. by using the ``self.reply(message: Any)``
|
||||
method.
|
||||
in its ``sender: ActorRef`` member field. The target actor can use this
|
||||
to reply to the original sender, by using ``sender ! replyMsg``.
|
||||
|
||||
If invoked from an instance that is **not** an Actor there will be no implicit
|
||||
sender passed along with the message and you will get an
|
||||
IllegalActorStateException when calling ``self.reply(...)``.
|
||||
If invoked from an instance that is **not** an Actor the sender will be
|
||||
:obj:`deadLetters` actor reference by default.
|
||||
|
||||
Send-And-Receive-Future
|
||||
-----------------------
|
||||
|
|
@ -284,25 +271,49 @@ will return a :class:`Future`:
|
|||
val future = actor ? "hello"
|
||||
|
||||
The receiving actor should reply to this message, which will complete the
|
||||
future with the reply message as value; if the actor throws an exception while
|
||||
processing the invocation, this exception will also complete the future. If the
|
||||
actor does not complete the future, it will expire after the timeout period,
|
||||
which is taken from one of the following three locations in order of
|
||||
precedence:
|
||||
future with the reply message as value; ``sender ! result``.
|
||||
|
||||
To complete the future with an exception you need send a Failure message to the sender.
|
||||
This is not done automatically when an actor throws an exception while processing a
|
||||
message.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
try {
|
||||
operation()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
sender ! akka.actor.Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
|
||||
If the actor does not complete the future, it will expire after the timeout period,
|
||||
which is taken from one of the following locations in order of precedence:
|
||||
|
||||
#. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)``
|
||||
#. implicit argument of type :class:`Actor.Timeout`, e.g.
|
||||
#. implicit argument of type :class:`akka.actor.Timeout`, e.g.
|
||||
|
||||
::
|
||||
|
||||
import akka.actor.Timeout
|
||||
import akka.util.duration._
|
||||
|
||||
implicit val timeout = Actor.Timeout(12 millis)
|
||||
implicit val timeout = Timeout(12 millis)
|
||||
val future = actor ? "hello"
|
||||
|
||||
#. default timeout from ``akka.conf``
|
||||
|
||||
See :ref:`futures-scala` for more information on how to await or query a
|
||||
future.
|
||||
|
||||
.. warning::
|
||||
|
||||
When using future callbacks, such as ``onComplete``, ``onResult``, and ``onTimeout``,
|
||||
inside actors you need to carefully avoid closing over the containing actor’s
|
||||
reference, i.e. do not call methods or access mutable state on the enclosing actor
|
||||
from within the callback. This would break the actor encapsulation and may
|
||||
introduce synchronization bugs and race conditions because the callback
|
||||
will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
there is not yet a way to detect these illegal accesses at compile time.
|
||||
|
||||
Send-And-Receive-Eventually
|
||||
---------------------------
|
||||
|
||||
|
|
@ -321,17 +332,7 @@ type, it will throw the exception or a :class:`ClassCastException` (if you want
|
|||
to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In
|
||||
case of a timeout, :obj:`None` is returned.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
(actor ? msg).as[String] match {
|
||||
case Some(answer) => ...
|
||||
case None => ...
|
||||
}
|
||||
|
||||
val resultOption = (actor ? msg).as[String]
|
||||
if (resultOption.isDefined) ... else ...
|
||||
|
||||
for (x <- (actor ? msg).as[Int]) yield { 2 * x }
|
||||
.. includecode:: code/ActorDocSpec.scala#using-ask
|
||||
|
||||
Forward message
|
||||
---------------
|
||||
|
|
@ -363,25 +364,15 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause
|
|||
which the message can be matched against the different case clauses using Scala
|
||||
pattern matching. Here is an example:
|
||||
|
||||
.. code-block:: scala
|
||||
.. includecode:: code/ActorDocSpec.scala
|
||||
:include: imports1,my-actor
|
||||
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
log.info("Received 'Hello'")
|
||||
|
||||
case _ =>
|
||||
throw new RuntimeException("unknown message")
|
||||
}
|
||||
}
|
||||
|
||||
.. _Actor.Reply:
|
||||
|
||||
Reply to messages
|
||||
=================
|
||||
|
||||
Reply using the sender
|
||||
----------------------
|
||||
|
||||
If you want to have a handle for replying to a message, you can use
|
||||
``sender``, which gives you an ActorRef. You can reply by sending to
|
||||
that ActorRef with ``sender ! Message``. You can also store the ActorRef
|
||||
|
|
@ -403,51 +394,28 @@ received within a certain time. To receive this timeout you have to set the
|
|||
``receiveTimeout`` property and declare a case handing the ReceiveTimeout
|
||||
object.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
context.receiveTimeout = Some(30000L) // 30 seconds
|
||||
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
log.info("Received 'Hello'")
|
||||
case ReceiveTimeout =>
|
||||
throw new RuntimeException("received timeout")
|
||||
}
|
||||
.. includecode:: code/ActorDocSpec.scala#receive-timeout
|
||||
|
||||
This mechanism also work for hotswapped receive functions. Every time a
|
||||
``HotSwap`` is sent, the receive timeout is reset and rescheduled.
|
||||
|
||||
|
||||
Starting actors
|
||||
===============
|
||||
|
||||
Actors are created & started by invoking the ``actorOf`` method.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val actor = actorOf[MyActor]
|
||||
actor
|
||||
|
||||
When you create the ``Actor`` then it will automatically call the ``def
|
||||
preStart`` callback method on the ``Actor`` trait. This is an excellent place to
|
||||
add initialization code for the actor.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
override def preStart() = {
|
||||
... // initialization code
|
||||
}
|
||||
|
||||
|
||||
Stopping actors
|
||||
===============
|
||||
|
||||
Actors are stopped by invoking the ``stop`` method.
|
||||
Actors are stopped by invoking the ``stop`` method of the ``ActorRef``.
|
||||
The actual termination of the actor is performed asynchronously, i.e.
|
||||
``stop`` may return before the actor is stopped.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
actor.stop()
|
||||
|
||||
Processing of the current message, if any, will continue before the actor is stopped,
|
||||
but additional messages in the mailbox will not be processed. By default these
|
||||
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
|
||||
depends on the mailbox implementation.
|
||||
|
||||
When stop is called then a call to the ``def postStop`` callback method will
|
||||
take place. The ``Actor`` can use this callback to implement shutdown behavior.
|
||||
|
||||
|
|
@ -462,7 +430,9 @@ PoisonPill
|
|||
==========
|
||||
|
||||
You can also send an actor the ``akka.actor.PoisonPill`` message, which will
|
||||
stop the actor when the message is processed.
|
||||
stop the actor when the message is processed. ``PoisonPill`` is enqueued as
|
||||
ordinary messages and will be handled after messages that were already queued
|
||||
in the mailbox.
|
||||
|
||||
If the sender is a ``Future`` (e.g. the message is sent with ``?``), the
|
||||
``Future`` will be completed with an
|
||||
|
|
@ -481,7 +451,7 @@ Akka supports hotswapping the Actor’s message loop (e.g. its implementation) a
|
|||
runtime. There are two ways you can do that:
|
||||
|
||||
* Send a ``HotSwap`` message to the Actor.
|
||||
* Invoke the ``become`` method from within the Actor.
|
||||
* Invoke the ``context.become`` method from within the Actor.
|
||||
|
||||
Both of these takes a ``ActorRef => PartialFunction[Any, Unit]`` that implements
|
||||
the new message handler. The hotswapped code is kept in a Stack which can be
|
||||
|
|
@ -493,30 +463,11 @@ pushed and popped.
|
|||
|
||||
To hotswap the Actor body using the ``HotSwap`` message:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
actor ! HotSwap( context => {
|
||||
case message => context reply "hotswapped body"
|
||||
})
|
||||
.. includecode:: code/ActorDocSpec.scala#hot-swap-message
|
||||
|
||||
To hotswap the Actor using ``become``:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
def angry: Receive = {
|
||||
case "foo" => context reply "I am already angry?"
|
||||
case "bar" => become(happy)
|
||||
}
|
||||
|
||||
def happy: Receive = {
|
||||
case "bar" => context reply "I am already happy :-)"
|
||||
case "foo" => become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" => become(angry)
|
||||
case "bar" => become(happy)
|
||||
}
|
||||
.. includecode:: code/ActorDocSpec.scala#hot-swap-actor
|
||||
|
||||
The ``become`` method is useful for many different things, but a particular nice
|
||||
example of it is in example where it is used to implement a Finite State Machine
|
||||
|
|
@ -526,35 +477,12 @@ example of it is in example where it is used to implement a Finite State Machine
|
|||
|
||||
Here is another little cute example of ``become`` and ``unbecome`` in action:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
case object Swap
|
||||
class Swapper extends Actor {
|
||||
def receive = {
|
||||
case Swap =>
|
||||
println("Hi")
|
||||
become {
|
||||
case Swap =>
|
||||
println("Ho")
|
||||
unbecome() // resets the latest 'become' (just for fun)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val swap = actorOf[Swapper]
|
||||
|
||||
swap ! Swap // prints Hi
|
||||
swap ! Swap // prints Ho
|
||||
swap ! Swap // prints Hi
|
||||
swap ! Swap // prints Ho
|
||||
swap ! Swap // prints Hi
|
||||
swap ! Swap // prints Ho
|
||||
|
||||
.. includecode:: code/ActorDocSpec.scala#swapper
|
||||
|
||||
Encoding Scala Actors nested receives without accidentally leaking memory
|
||||
-------------------------------------------------------------------------
|
||||
|
||||
See this `Unnested receive example <https://gist.github.com/797035>`_.
|
||||
See this `Unnested receive example <http://github.com/jboner/akka/blob/master/akka/akka-docs/scala/code/UnnestedReceives.scala>`_.
|
||||
|
||||
|
||||
Downgrade
|
||||
|
|
@ -579,7 +507,7 @@ Revert the Actor body using the ``unbecome`` method:
|
|||
|
||||
.. code-block:: scala
|
||||
|
||||
def receive: Receive = {
|
||||
def receive = {
|
||||
case "revert" => unbecome()
|
||||
}
|
||||
|
||||
|
|
@ -625,16 +553,11 @@ messages on that mailbox, will be there as well.
|
|||
What happens to the actor
|
||||
-------------------------
|
||||
|
||||
If an exception is thrown and the actor is supervised, the actor object itself
|
||||
is discarded and a new instance is created. This new instance will now be used
|
||||
in the actor references to this actor (so this is done invisible to the
|
||||
developer).
|
||||
|
||||
If the actor is _not_ supervised, but its lifeCycle is set to Permanent
|
||||
(default), it will just keep on processing messages as if nothing had happened.
|
||||
|
||||
If the actor is _not_ supervised, but its lifeCycle is set to Temporary, it will
|
||||
be stopped immediately.
|
||||
If an exception is thrown, the actor instance is discarded and a new instance is
|
||||
created. This new instance will now be used in the actor references to this actor
|
||||
(so this is done invisible to the developer). Note that this means that current
|
||||
state of the failing actor instance is lost if you don't store and restore it in
|
||||
``preRestart`` and ``postRestart`` callbacks.
|
||||
|
||||
|
||||
Extending Actors using PartialFunction chaining
|
||||
|
|
@ -644,32 +567,4 @@ A bit advanced but very useful way of defining a base message handler and then
|
|||
extend that, either through inheritance or delegation, is to use
|
||||
``PartialFunction.orElse`` chaining.
|
||||
|
||||
In generic base Actor:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import akka.actor.Actor.Receive
|
||||
|
||||
abstract class GenericActor extends Actor {
|
||||
// to be defined in subclassing actor
|
||||
def specificMessageHandler: Receive
|
||||
|
||||
// generic message handler
|
||||
def genericMessageHandler: Receive = {
|
||||
case event => printf("generic: %s\n", event)
|
||||
}
|
||||
|
||||
def receive = specificMessageHandler orElse genericMessageHandler
|
||||
}
|
||||
|
||||
In subclassing Actor:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class SpecificActor extends GenericActor {
|
||||
def specificMessageHandler = {
|
||||
case event: MyMsg => printf("specific: %s\n", event.subject)
|
||||
}
|
||||
}
|
||||
|
||||
case class MyMsg(subject: String)
|
||||
.. includecode:: code/ActorDocSpec.scala#receive-orElse
|
||||
|
|
|
|||
|
|
@ -1,16 +1,19 @@
|
|||
package akka.docs.stm
|
||||
package akka.docs.actor
|
||||
|
||||
//#imports1
|
||||
import akka.actor.Actor
|
||||
import akka.event.Logging
|
||||
//#imports1
|
||||
|
||||
//#imports2
|
||||
import akka.actor.ActorSystem
|
||||
//#imports2
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
//#imports
|
||||
import akka.actor.Actor
|
||||
import akka.event.Logging
|
||||
|
||||
//#imports
|
||||
|
||||
//#my-actor
|
||||
class MyActor extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
|
|
@ -21,12 +24,110 @@ class MyActor extends Actor {
|
|||
}
|
||||
//#my-actor
|
||||
|
||||
case class DoIt(msg: Message)
|
||||
case class Message(s: String)
|
||||
|
||||
//#context-actorOf
|
||||
class FirstActor extends Actor {
|
||||
val myActor = context.actorOf[MyActor]
|
||||
//#context-actorOf
|
||||
//#anonymous-actor
|
||||
def receive = {
|
||||
case m: DoIt ⇒
|
||||
context.actorOf(new Actor {
|
||||
def receive = {
|
||||
case DoIt(msg) ⇒
|
||||
val replyMsg = doSomeDangerousWork(msg)
|
||||
sender ! replyMsg
|
||||
self.stop()
|
||||
}
|
||||
def doSomeDangerousWork(msg: Message): String = { "done" }
|
||||
}) ! m
|
||||
|
||||
case replyMsg: String ⇒ sender ! replyMsg
|
||||
}
|
||||
//#anonymous-actor
|
||||
}
|
||||
|
||||
//#system-actorOf
|
||||
object Main extends App {
|
||||
val system = ActorSystem("MySystem")
|
||||
val myActor = system.actorOf[FirstActor]
|
||||
//#system-actorOf
|
||||
}
|
||||
//#swapper
|
||||
case object Swap
|
||||
class Swapper extends Actor {
|
||||
import context._
|
||||
val log = Logging(system, this)
|
||||
|
||||
def receive = {
|
||||
case Swap ⇒
|
||||
log.info("Hi")
|
||||
become {
|
||||
case Swap ⇒
|
||||
log.info("Ho")
|
||||
unbecome() // resets the latest 'become' (just for fun)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SwapperApp extends App {
|
||||
val system = ActorSystem("SwapperSystem")
|
||||
val swap = system.actorOf[Swapper]
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
}
|
||||
//#swapper
|
||||
|
||||
//#receive-orElse
|
||||
import akka.actor.Actor.Receive
|
||||
|
||||
abstract class GenericActor extends Actor {
|
||||
// to be defined in subclassing actor
|
||||
def specificMessageHandler: Receive
|
||||
|
||||
// generic message handler
|
||||
def genericMessageHandler: Receive = {
|
||||
case event ⇒ printf("generic: %s\n", event)
|
||||
}
|
||||
|
||||
def receive = specificMessageHandler orElse genericMessageHandler
|
||||
}
|
||||
|
||||
class SpecificActor extends GenericActor {
|
||||
def specificMessageHandler = {
|
||||
case event: MyMsg ⇒ printf("specific: %s\n", event.subject)
|
||||
}
|
||||
}
|
||||
|
||||
case class MyMsg(subject: String)
|
||||
//#receive-orElse
|
||||
|
||||
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||
|
||||
"import context" in {
|
||||
//#import-context
|
||||
class FirstActor extends Actor {
|
||||
import context._
|
||||
val myActor = actorOf[MyActor]
|
||||
def receive = {
|
||||
case x ⇒ myActor ! x
|
||||
}
|
||||
}
|
||||
//#import-context
|
||||
|
||||
val first = system.actorOf(new FirstActor)
|
||||
first.stop()
|
||||
|
||||
}
|
||||
|
||||
"creating actor with AkkaSpec.actorOf" in {
|
||||
//#creating-actorOf
|
||||
val myActor = system.actorOf[MyActor]
|
||||
//#creating-actorOf
|
||||
|
||||
// testing the actor
|
||||
|
||||
|
|
@ -62,4 +163,82 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
|
||||
myActor.stop()
|
||||
}
|
||||
|
||||
"creating actor with Props" in {
|
||||
//#creating-props
|
||||
import akka.actor.Props
|
||||
val dispatcher = system.dispatcherFactory.fromConfig("my-dispatcher")
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
||||
//#creating-props
|
||||
|
||||
myActor.stop()
|
||||
}
|
||||
|
||||
"using ask" in {
|
||||
//#using-ask
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case x: String ⇒ sender ! x.toUpperCase
|
||||
case n: Int ⇒ sender ! (n + 1)
|
||||
}
|
||||
}
|
||||
|
||||
val myActor = system.actorOf(new MyActor)
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = myActor ? "hello"
|
||||
future.as[String] match {
|
||||
case Some(answer) ⇒ //...
|
||||
case None ⇒ //...
|
||||
}
|
||||
val result: Option[Int] = for (x ← (myActor ? 3).as[Int]) yield { 2 * x }
|
||||
//#using-ask
|
||||
|
||||
myActor.stop()
|
||||
}
|
||||
|
||||
"using receiveTimeout" in {
|
||||
//#receive-timeout
|
||||
import akka.actor.ReceiveTimeout
|
||||
import akka.util.duration._
|
||||
class MyActor extends Actor {
|
||||
context.receiveTimeout = Some(30 seconds)
|
||||
def receive = {
|
||||
case "Hello" ⇒ //...
|
||||
case ReceiveTimeout ⇒ throw new RuntimeException("received timeout")
|
||||
}
|
||||
}
|
||||
//#receive-timeout
|
||||
}
|
||||
|
||||
"using hot-swap" in {
|
||||
//#hot-swap-actor
|
||||
class HotSwapActor extends Actor {
|
||||
import context._
|
||||
def angry: Receive = {
|
||||
case "foo" ⇒ sender ! "I am already angry?"
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
|
||||
def happy: Receive = {
|
||||
case "bar" ⇒ sender ! "I am already happy :-)"
|
||||
case "foo" ⇒ become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" ⇒ become(angry)
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
}
|
||||
//#hot-swap-actor
|
||||
|
||||
val actor = system.actorOf(new MyActor)
|
||||
|
||||
//#hot-swap-message
|
||||
import akka.actor.HotSwap
|
||||
actor ! HotSwap(context ⇒ {
|
||||
case message ⇒ context.sender ! "hotswapped body"
|
||||
})
|
||||
//#hot-swap-message
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
48
akka-docs/scala/code/UnnestedReceives.scala
Normal file
48
akka-docs/scala/code/UnnestedReceives.scala
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
package akka.docs.actor
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
/**
|
||||
* Requirements are as follows:
|
||||
* The first thing the actor needs to do, is to subscribe to a channel of events,
|
||||
* Then it must replay (process) all "old" events
|
||||
* Then it has to wait for a GoAhead signal to begin processing the new events
|
||||
* It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal
|
||||
*/
|
||||
class UnnestedReceives extends Actor {
|
||||
import context.become
|
||||
//If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)]
|
||||
val queue = new ListBuffer[Any]()
|
||||
|
||||
//This message processes a message/event
|
||||
def process(msg: Any): Unit = println("processing: " + msg)
|
||||
//This method subscribes the actor to the event bus
|
||||
def subscribe() {} //Your external stuff
|
||||
//This method retrieves all prior messages/events
|
||||
def allOldMessages() = List()
|
||||
|
||||
override def preStart {
|
||||
//We override preStart to be sure that the first message the actor gets is
|
||||
//'Replay, that message will start to be processed _after_ the actor is started
|
||||
self ! 'Replay
|
||||
//Then we subscribe to the stream of messages/events
|
||||
subscribe()
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case 'Replay ⇒ //Our first message should be a 'Replay message, all others are invalid
|
||||
allOldMessages() foreach process //Process all old messages/events
|
||||
become { //Switch behavior to look for the GoAhead signal
|
||||
case 'GoAhead ⇒ //When we get the GoAhead signal we process all our buffered messages/events
|
||||
queue foreach process
|
||||
queue.clear
|
||||
become { //Then we change behaviour to process incoming messages/events as they arrive
|
||||
case msg ⇒ process(msg)
|
||||
}
|
||||
case msg ⇒ //While we haven't gotten the GoAhead signal, buffer all incoming messages
|
||||
queue += msg //Here you have full control, you can handle overflow etc
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue