Merge pull request #1336 from akka/wip-3081-PropsClosures-∂π
deprecate closure-taking Props factories, see #3081
This commit is contained in:
commit
57d71b0b44
77 changed files with 2234 additions and 1592 deletions
|
|
@ -43,114 +43,145 @@ Here is an example:
|
|||
.. includecode:: code/docs/actor/ActorDocSpec.scala
|
||||
:include: imports1,my-actor
|
||||
|
||||
Please note that the Akka Actor ``receive`` message loop is exhaustive, which is
|
||||
different compared to Erlang and Scala Actors. This means that you need to
|
||||
provide a pattern match for all messages that it can accept and if you want to
|
||||
be able to handle unknown messages then you need to have a default case as in
|
||||
the example above. Otherwise an ``akka.actor.UnhandledMessage(message, sender, recipient)`` will be
|
||||
published to the ``ActorSystem``'s ``EventStream``.
|
||||
Please note that the Akka Actor ``receive`` message loop is exhaustive, which
|
||||
is different compared to Erlang and the late Scala Actors. This means that you
|
||||
need to provide a pattern match for all messages that it can accept and if you
|
||||
want to be able to handle unknown messages then you need to have a default case
|
||||
as in the example above. Otherwise an ``akka.actor.UnhandledMessage(message,
|
||||
sender, recipient)`` will be published to the ``ActorSystem``'s
|
||||
``EventStream``.
|
||||
|
||||
Note further that the return type of the behavior defined above is ``Unit``; if
|
||||
the actor shall reply to the received message then this must be done explicitly
|
||||
as explained below.
|
||||
|
||||
The result of the :meth:`receive` method is a partial function object, which is
|
||||
stored within the actor as its “initial behavior”, see `Become/Unbecome`_ for
|
||||
further information on changing the behavior of an actor after its
|
||||
construction.
|
||||
|
||||
Creating Actors with default constructor
|
||||
----------------------------------------
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala
|
||||
:include: imports2,system-actorOf
|
||||
|
||||
The call to :meth:`actorOf` returns an instance of ``ActorRef``. This is a handle to
|
||||
the ``Actor`` instance which you can use to interact with the ``Actor``. The
|
||||
``ActorRef`` is immutable and has a one to one relationship with the Actor it
|
||||
represents. The ``ActorRef`` is also serializable and network-aware. This means
|
||||
that you can serialize it, send it over the wire and use it on a remote host and
|
||||
it will still be representing the same Actor on the original node, across the
|
||||
network.
|
||||
|
||||
In the above example the actor was created from the system. It is also possible
|
||||
to create actors from other actors with the actor ``context``. The difference is
|
||||
how the supervisor hierarchy is arranged. When using the context the current actor
|
||||
will be supervisor of the created child actor. When using the system it will be
|
||||
a top level actor, that is supervised by the system (internal guardian actor).
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#context-actorOf
|
||||
|
||||
The name parameter is optional, but you should preferably name your actors, since
|
||||
that is used in log messages and for identifying actors. The name must not be empty
|
||||
or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space).
|
||||
If the given name is already in use by another child to the
|
||||
same parent actor an `InvalidActorNameException` is thrown.
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
When you create the ``Actor`` then it will automatically call the ``preStart``
|
||||
callback method on the ``Actor`` trait. This is an excellent place to
|
||||
add initialization code for the actor.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
override def preStart() = {
|
||||
... // initialization code
|
||||
}
|
||||
|
||||
Creating Actors with non-default constructor
|
||||
--------------------------------------------
|
||||
|
||||
If your Actor has a constructor that takes parameters then you can't create it
|
||||
using ``actorOf(Props[TYPE])``. Instead you can use a variant of ``actorOf`` that takes
|
||||
a call-by-name block in which you can create the Actor in any way you like.
|
||||
|
||||
Here is an example:
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-constructor
|
||||
|
||||
.. warning::
|
||||
|
||||
You might be tempted at times to offer an ``Actor`` factory which always
|
||||
returns the same instance, e.g. by using a ``lazy val`` or an
|
||||
``object ... extends Actor``. This is not supported, as it goes against the
|
||||
meaning of an actor restart, which is described here:
|
||||
:ref:`supervision-restart`.
|
||||
|
||||
.. warning::
|
||||
|
||||
Also avoid passing mutable state into the constructor of the Actor, since
|
||||
the call-by-name block can be executed by another thread.
|
||||
|
||||
Props
|
||||
-----
|
||||
|
||||
``Props`` is a configuration class to specify options for the creation
|
||||
of actors. Here are some examples on how to create a ``Props`` instance.
|
||||
:class:`Props` is a configuration class to specify options for the creation
|
||||
of actors, think of it as an immutable and thus freely shareable recipe for
|
||||
creating an actor including associated deployment information (e.g. which
|
||||
dispatcher to use, see more below). Here are some examples of how to create a
|
||||
:class:`Props` instance.
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-props-config
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-props
|
||||
|
||||
The last line shows how to pass constructor arguments to the :class:`Actor`
|
||||
being created. The presence of a matching constructor is verified during
|
||||
construction of the :class:`Props` object, resulting in an
|
||||
:class:`IllegalArgumentEception` if no or multiple matching constructors are
|
||||
found.
|
||||
|
||||
Deprecated Variants
|
||||
^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Up to Akka 2.1 there were also the following possibilities (which are retained
|
||||
for a migration period):
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-props-deprecated
|
||||
|
||||
The last one is deprecated because its functionality is available in full
|
||||
through :meth:`Props.apply()`.
|
||||
|
||||
The first three are deprecated because the captured closure is a local class
|
||||
which means that it implicitly carries a reference to the enclosing class. This
|
||||
can easily make the resulting :class:`Props` non-serializable, e.g. when the
|
||||
enclosing class is an :class:`Actor`. Akka advocates location transparency,
|
||||
meaning that an application written with actors should just work when it is
|
||||
deployed over multiple network nodes, and non-serializable actor factories
|
||||
would break this principle. In case indirect actor creation is needed—for
|
||||
example when using dependency injection—there is the possibility to use an
|
||||
:class:`IndirectActorProducer` as described below.
|
||||
|
||||
There were two use-cases for these methods: passing constructor arguments to
|
||||
the actor—which is solved by the newly introduced
|
||||
:meth:`Props.apply(clazz, args)` method above—and creating actors “on the spot”
|
||||
as anonymous classes. The latter should be solved by making these actors named
|
||||
inner classes instead (if they are not declared within a top-level ``object``
|
||||
then the enclosing instance’s ``this`` reference needs to be passed as the
|
||||
first argument).
|
||||
|
||||
.. warning::
|
||||
|
||||
Declaring one actor within another is very dangerous and breaks actor
|
||||
encapsulation. Never pass an actor’s ``this`` reference into :class:`Props`!
|
||||
|
||||
Recommended Practices
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
It is a good idea to provide factory methods on the companion object of each
|
||||
:class:`Actor` which help keeping the creation of suitable :class:`Props` as
|
||||
close to the actor definition as possible, thus containing the gap in
|
||||
type-safety introduced by reflective instantiation within a single class
|
||||
instead of spreading it out across a whole code-base. This helps especially
|
||||
when refactoring the actor’s constructor signature at a later point, where
|
||||
compiler checks will allow this modification to be done with greater confidence
|
||||
than without.
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#props-factory
|
||||
|
||||
Creating Actors with Props
|
||||
--------------------------
|
||||
|
||||
Actors are created by passing in a ``Props`` instance into the ``actorOf`` factory method.
|
||||
Actors are created by passing a :class:`Props` instance into the
|
||||
:meth:`actorOf` factory method which is available on :class:`ActorSystem` and
|
||||
:class:`ActorContext`.
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-props
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#system-actorOf
|
||||
|
||||
Using the :class:`ActorSystem` will create top-level actors, supervised by the
|
||||
actor system’s provided guardian actor, while using an actor’s context will
|
||||
create a child actor.
|
||||
|
||||
Creating Actors using anonymous classes
|
||||
---------------------------------------
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#context-actorOf
|
||||
:exclude: plus-some-behavior
|
||||
|
||||
When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class.
|
||||
It is recommended to create a hierarchy of children, grand-children and so on
|
||||
such that it fits the logical failure-handling structure of the application,
|
||||
see :ref:`actor-systems`.
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#anonymous-actor
|
||||
The call to :meth:`actorOf` returns an instance of :class:`ActorRef`. This is a
|
||||
handle to the actor instance and the only way to interact with it. The
|
||||
:class:`ActorRef` is immutable and has a one to one relationship with the Actor
|
||||
it represents. The :class:`ActorRef` is also serializable and network-aware.
|
||||
This means that you can serialize it, send it over the wire and use it on a
|
||||
remote host and it will still be representing the same Actor on the original
|
||||
node, across the network.
|
||||
|
||||
The name parameter is optional, but you should preferably name your actors,
|
||||
since that is used in log messages and for identifying actors. The name must
|
||||
not be empty or start with ``$``, but it may contain URL encoded characters
|
||||
(eg. ``%20`` for a blank space). If the given name is already in use by
|
||||
another child to the same parent an `InvalidActorNameException` is thrown.
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
|
||||
Creating Actors with Factory Methods
|
||||
------------------------------------
|
||||
|
||||
If your UntypedActor has a constructor that takes parameters then those need to
|
||||
be part of the :class:`Props` as well, as described `above <Props>`_. But there
|
||||
are cases when a factory method must be used, for example when the actual
|
||||
constructor arguments are determined by a dependency injection framework.
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala
|
||||
:include: creating-indirectly
|
||||
:exclude: obtain-fresh-Actor-instance-from-DI-framework
|
||||
|
||||
.. warning::
|
||||
|
||||
In this case you need to carefully avoid closing over the containing actor’s
|
||||
reference, i.e. do not call methods on the enclosing actor from within the
|
||||
anonymous Actor class. This would break the actor encapsulation and may
|
||||
introduce synchronization bugs and race conditions because the other actor’s
|
||||
code will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
there is not yet a way to detect these illegal accesses at compile time.
|
||||
See also: :ref:`jmm-shared-state`
|
||||
You might be tempted at times to offer an :class:`IndirectActorProducer`
|
||||
which always returns the same instance, e.g. by using a ``lazy val``. This is
|
||||
not supported, as it goes against the meaning of an actor restart, which is
|
||||
described here: :ref:`supervision-restart`.
|
||||
|
||||
When using a dependency injection framework, actor beans *MUST NOT* have
|
||||
singleton scope.
|
||||
|
||||
The Actor DSL
|
||||
-------------
|
||||
|
|
@ -268,15 +299,9 @@ You can import the members in the :obj:`context` to avoid prefixing access with
|
|||
.. includecode:: code/docs/actor/ActorDocSpec.scala#import-context
|
||||
|
||||
The remaining visible methods are user-overridable life-cycle hooks which are
|
||||
described in the following::
|
||||
described in the following:
|
||||
|
||||
def preStart() {}
|
||||
def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
context.children foreach (context.stop(_))
|
||||
postStop()
|
||||
}
|
||||
def postRestart(reason: Throwable) { preStart() }
|
||||
def postStop() {}
|
||||
.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Actor.scala#lifecycle-hooks
|
||||
|
||||
The implementations shown above are the defaults provided by the :class:`Actor`
|
||||
trait.
|
||||
|
|
@ -319,13 +344,15 @@ Start Hook
|
|||
|
||||
Right after starting the actor, its :meth:`preStart` method is invoked.
|
||||
|
||||
::
|
||||
|
||||
override def preStart() {
|
||||
// registering with other actors
|
||||
someService ! Register(self)
|
||||
}
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#preStart
|
||||
|
||||
This method is called when the actor is first created. During restarts it is
|
||||
called by the default implementation of :meth:`postRestart`, which means that
|
||||
by overriding that method you can choose whether the initialization code in
|
||||
this method is called only exactly once for this actor or for every restart.
|
||||
Initialization code which is part of the actor’s constructor will always be
|
||||
called when an instance of the actor class is created, which happens at every
|
||||
restart.
|
||||
|
||||
Restart Hooks
|
||||
-------------
|
||||
|
|
@ -390,12 +417,9 @@ are used by the system to look up actors, e.g. when a remote message is
|
|||
received and the recipient is searched, but they are also useful more directly:
|
||||
actors may look up other actors by specifying absolute or relative
|
||||
paths—logical or physical—and receive back an :class:`ActorSelection` with the
|
||||
result::
|
||||
result:
|
||||
|
||||
// will look up this absolute path
|
||||
context.actorSelection("/user/serviceA/aggregator")
|
||||
// will look up sibling beneath same supervisor
|
||||
context.actorSelection("../joe")
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#selection-local
|
||||
|
||||
The supplied path is parsed as a :class:`java.net.URI`, which basically means
|
||||
that it is split on ``/`` into path elements. If the path starts with ``/``, it
|
||||
|
|
@ -407,12 +431,9 @@ It should be noted that the ``..`` in actor paths here always means the logical
|
|||
structure, i.e. the supervisor.
|
||||
|
||||
The path elements of an actor selection may contain wildcard patterns allowing for
|
||||
broadcasting of messages to that section::
|
||||
broadcasting of messages to that section:
|
||||
|
||||
// will look all children to serviceB with names starting with worker
|
||||
context.actorSelection("/user/serviceB/worker*")
|
||||
// will look up all siblings beneath same supervisor
|
||||
context.actorSelection("../*")
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#selection-wildcard
|
||||
|
||||
Messages can be sent via the :class:`ActorSelection` and the path of the
|
||||
:class:`ActorSelection` is looked up when delivering each message. If the selection
|
||||
|
|
@ -426,9 +447,9 @@ and automatically reply to with a ``ActorIdentity`` message containing the
|
|||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#identify
|
||||
|
||||
Remote actor addresses may also be looked up, if :ref:`remoting <remoting-scala>` is enabled::
|
||||
Remote actor addresses may also be looked up, if :ref:`remoting <remoting-scala>` is enabled:
|
||||
|
||||
context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#selection-remote
|
||||
|
||||
An example demonstrating actor look-up is given in :ref:`remote-lookup-sample-scala`.
|
||||
|
||||
|
|
@ -463,10 +484,6 @@ Here is an example:
|
|||
// create a new case class message
|
||||
val message = Register(user)
|
||||
|
||||
Other good messages types are ``scala.Tuple2``, ``scala.List``, ``scala.Map``
|
||||
which are all immutable and great for pattern matching.
|
||||
|
||||
|
||||
Send messages
|
||||
=============
|
||||
|
||||
|
|
@ -486,17 +503,15 @@ Message ordering is guaranteed on a per-sender basis.
|
|||
a ``Promise`` into an ``ActorRef`` and it also needs to be reachable through
|
||||
remoting. So always prefer ``tell`` for performance, and only ``ask`` if you must.
|
||||
|
||||
.. _actors-tell-sender-scala:
|
||||
|
||||
Tell: Fire-forget
|
||||
-----------------
|
||||
|
||||
This is the preferred way of sending messages. No blocking waiting for a
|
||||
message. This gives the best concurrency and scalability characteristics.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
actor ! "hello"
|
||||
|
||||
.. _actors-tell-sender-scala:
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#tell
|
||||
|
||||
If invoked from within an Actor, then the sending actor reference will be
|
||||
implicitly passed along with the message and available to the receiving Actor
|
||||
|
|
@ -573,25 +588,16 @@ original sender address/reference is maintained even though the message is going
|
|||
through a 'mediator'. This can be useful when writing actors that work as
|
||||
routers, load-balancers, replicators etc.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
myActor.forward(message)
|
||||
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#forward
|
||||
|
||||
Receive messages
|
||||
================
|
||||
|
||||
An Actor has to implement the ``receive`` method to receive messages:
|
||||
|
||||
.. code-block:: scala
|
||||
.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Actor.scala#receive
|
||||
|
||||
def receive: PartialFunction[Any, Unit]
|
||||
|
||||
Note: Akka has an alias to the ``PartialFunction[Any, Unit]`` type called
|
||||
``Receive`` (``akka.actor.Actor.Receive``), so you can use this type instead for
|
||||
clarity. But most often you don't need to spell it out.
|
||||
|
||||
This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause in
|
||||
This method returns a ``PartialFunction``, e.g. a ‘match/case’ clause in
|
||||
which the message can be matched against the different case clauses using Scala
|
||||
pattern matching. Here is an example:
|
||||
|
||||
|
|
@ -669,11 +675,8 @@ whole system.
|
|||
The :meth:`postStop()` hook is invoked after an actor is fully stopped. This
|
||||
enables cleaning up of resources:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
override def postStop() = {
|
||||
// close some file or database connection
|
||||
}
|
||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#postStop
|
||||
:exclude: clean-up-some-resources
|
||||
|
||||
.. note::
|
||||
|
||||
|
|
|
|||
|
|
@ -37,11 +37,43 @@ case class Message(s: String)
|
|||
|
||||
//#context-actorOf
|
||||
class FirstActor extends Actor {
|
||||
val myActor = context.actorOf(Props[MyActor], name = "myactor")
|
||||
//#context-actorOf
|
||||
val child = context.actorOf(Props[MyActor], name = "myChild")
|
||||
//#plus-some-behavior
|
||||
def receive = {
|
||||
case x ⇒ sender ! x
|
||||
}
|
||||
//#plus-some-behavior
|
||||
}
|
||||
//#context-actorOf
|
||||
|
||||
class ActorWithArgs(arg: String) extends Actor {
|
||||
def receive = { case _ ⇒ () }
|
||||
}
|
||||
|
||||
class DemoActorWrapper extends Actor {
|
||||
//#props-factory
|
||||
object DemoActor {
|
||||
/**
|
||||
* Create Props for an actor of this type.
|
||||
* @param name The name to be passed to this actor’s constructor.
|
||||
* @return a Props for creating this actor, which can then be further configured
|
||||
* (e.g. calling `.withDispatcher()` on it)
|
||||
*/
|
||||
def apply(name: String): Props = Props(classOf[DemoActor], name)
|
||||
}
|
||||
|
||||
class DemoActor(name: String) extends Actor {
|
||||
def receive = {
|
||||
case x ⇒ // some behavior
|
||||
}
|
||||
}
|
||||
|
||||
// ...
|
||||
|
||||
context.actorOf(DemoActor("hello"))
|
||||
//#props-factory
|
||||
|
||||
def receive = Actor.emptyBehavior
|
||||
}
|
||||
|
||||
class AnonymousActor extends Actor {
|
||||
|
|
@ -61,11 +93,21 @@ class AnonymousActor extends Actor {
|
|||
//#anonymous-actor
|
||||
}
|
||||
|
||||
//#system-actorOf
|
||||
object Main extends App {
|
||||
val system = ActorSystem("MySystem")
|
||||
val myActor = system.actorOf(Props[MyActor], name = "myactor")
|
||||
//#system-actorOf
|
||||
class Hook extends Actor {
|
||||
var child: ActorRef = _
|
||||
//#preStart
|
||||
override def preStart() {
|
||||
child = context.actorOf(Props[MyActor], "child")
|
||||
}
|
||||
//#preStart
|
||||
def receive = Actor.emptyBehavior
|
||||
//#postStop
|
||||
override def postStop() {
|
||||
//#clean-up-some-resources
|
||||
()
|
||||
//#clean-up-some-resources
|
||||
}
|
||||
//#postStop
|
||||
}
|
||||
|
||||
class ReplyException extends Actor {
|
||||
|
|
@ -142,22 +184,23 @@ case class MyMsg(subject: String)
|
|||
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||
|
||||
"import context" in {
|
||||
//#import-context
|
||||
class FirstActor extends Actor {
|
||||
import context._
|
||||
val myActor = actorOf(Props[MyActor], name = "myactor")
|
||||
def receive = {
|
||||
case x ⇒ myActor ! x
|
||||
new AnyRef {
|
||||
//#import-context
|
||||
class FirstActor extends Actor {
|
||||
import context._
|
||||
val myActor = actorOf(Props[MyActor], name = "myactor")
|
||||
def receive = {
|
||||
case x ⇒ myActor ! x
|
||||
}
|
||||
}
|
||||
//#import-context
|
||||
|
||||
val first = system.actorOf(Props(classOf[FirstActor], this), name = "first")
|
||||
system.stop(first)
|
||||
}
|
||||
//#import-context
|
||||
|
||||
val first = system.actorOf(Props(new FirstActor), name = "first")
|
||||
system.stop(first)
|
||||
|
||||
}
|
||||
|
||||
"creating actor with AkkaSpec.actorOf" in {
|
||||
"creating actor with system.actorOf" in {
|
||||
val myActor = system.actorOf(Props[MyActor])
|
||||
|
||||
// testing the actor
|
||||
|
|
@ -183,44 +226,99 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
}
|
||||
|
||||
"creating actor with constructor" in {
|
||||
class MyActor(arg: String) extends Actor {
|
||||
def receive = { case _ ⇒ () }
|
||||
}
|
||||
|
||||
//#creating-constructor
|
||||
// allows passing in arguments to the MyActor constructor
|
||||
val myActor = system.actorOf(Props(new MyActor("...")), name = "myactor")
|
||||
val myActor = system.actorOf(Props[MyActor], name = "myactor")
|
||||
//#creating-constructor
|
||||
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"creating a Props config" in {
|
||||
//#creating-props-config
|
||||
//#creating-props
|
||||
import akka.actor.Props
|
||||
val props1 = Props.empty
|
||||
val props2 = Props[MyActor]
|
||||
val props3 = Props(new MyActor)
|
||||
|
||||
val props1 = Props[MyActor]
|
||||
val props3 = Props(classOf[ActorWithArgs], "arg")
|
||||
//#creating-props
|
||||
|
||||
//#creating-props-deprecated
|
||||
// DEPRECATED: encourages to close over enclosing class
|
||||
val props4 = Props(
|
||||
creator = { () ⇒ new MyActor },
|
||||
dispatcher = "my-dispatcher")
|
||||
|
||||
// DEPRECATED: encourages to close over enclosing class
|
||||
val props5 = props1.withCreator(new MyActor)
|
||||
val props6 = props5.withDispatcher("my-dispatcher")
|
||||
//#creating-props-config
|
||||
|
||||
// DEPRECATED: encourages to close over enclosing class
|
||||
val props6 = Props(new MyActor)
|
||||
|
||||
// DEPRECATED due to duplicate functionality with Props.apply()
|
||||
val props7 = props1.withCreator(classOf[MyActor])
|
||||
//#creating-props-deprecated
|
||||
}
|
||||
|
||||
"creating actor with Props" in {
|
||||
//#creating-props
|
||||
import akka.actor.Props
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),
|
||||
name = "myactor2")
|
||||
//#creating-props
|
||||
//#system-actorOf
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
system.stop(myActor)
|
||||
// ActorSystem is a heavy object: create only one per application
|
||||
val system = ActorSystem("mySystem")
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor2")
|
||||
//#system-actorOf
|
||||
system.shutdown()
|
||||
}
|
||||
|
||||
"creating actor with IndirectActorProducer" in {
|
||||
class Echo(name: String) extends Actor {
|
||||
def receive = {
|
||||
case n: Int ⇒ sender ! name
|
||||
case message ⇒
|
||||
val target = testActor
|
||||
//#forward
|
||||
target forward message
|
||||
//#forward
|
||||
}
|
||||
}
|
||||
|
||||
val a: { def actorRef: ActorRef } = new AnyRef {
|
||||
val applicationContext = this
|
||||
|
||||
//#creating-indirectly
|
||||
import akka.actor.IndirectActorProducer
|
||||
|
||||
class DependencyInjector(applicationContext: AnyRef, beanName: String)
|
||||
extends IndirectActorProducer {
|
||||
|
||||
override def actorClass = classOf[Actor]
|
||||
override def produce =
|
||||
//#obtain-fresh-Actor-instance-from-DI-framework
|
||||
new Echo(beanName)
|
||||
|
||||
def this(beanName: String) = this("", beanName)
|
||||
//#obtain-fresh-Actor-instance-from-DI-framework
|
||||
}
|
||||
|
||||
val actorRef = system.actorOf(
|
||||
Props(classOf[DependencyInjector], applicationContext, "hello"),
|
||||
"helloBean")
|
||||
//#creating-indirectly
|
||||
}
|
||||
val actorRef = a.actorRef
|
||||
|
||||
val message = 42
|
||||
implicit val self = testActor
|
||||
//#tell
|
||||
actorRef ! message
|
||||
//#tell
|
||||
expectMsg("hello")
|
||||
actorRef ! "huhu"
|
||||
expectMsg("huhu")
|
||||
}
|
||||
|
||||
"using implicit timeout" in {
|
||||
val myActor = system.actorOf(Props(new FirstActor))
|
||||
val myActor = system.actorOf(Props[FirstActor])
|
||||
//#using-implicit-timeout
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
|
|
@ -233,7 +331,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
}
|
||||
|
||||
"using explicit timeout" in {
|
||||
val myActor = system.actorOf(Props(new FirstActor))
|
||||
val myActor = system.actorOf(Props[FirstActor])
|
||||
//#using-explicit-timeout
|
||||
import scala.concurrent.duration._
|
||||
import akka.pattern.ask
|
||||
|
|
@ -262,28 +360,28 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
//#receive-timeout
|
||||
}
|
||||
|
||||
"using hot-swap" in {
|
||||
//#hot-swap-actor
|
||||
class HotSwapActor extends Actor {
|
||||
import context._
|
||||
def angry: Receive = {
|
||||
case "foo" ⇒ sender ! "I am already angry?"
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
|
||||
def happy: Receive = {
|
||||
case "bar" ⇒ sender ! "I am already happy :-)"
|
||||
case "foo" ⇒ become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" ⇒ become(angry)
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
//#hot-swap-actor
|
||||
class HotSwapActor extends Actor {
|
||||
import context._
|
||||
def angry: Receive = {
|
||||
case "foo" ⇒ sender ! "I am already angry?"
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
//#hot-swap-actor
|
||||
|
||||
val actor = system.actorOf(Props(new HotSwapActor), name = "hot")
|
||||
def happy: Receive = {
|
||||
case "bar" ⇒ sender ! "I am already happy :-)"
|
||||
case "foo" ⇒ become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" ⇒ become(angry)
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
}
|
||||
//#hot-swap-actor
|
||||
|
||||
"using hot-swap" in {
|
||||
val actor = system.actorOf(Props(classOf[HotSwapActor], this), name = "hot")
|
||||
}
|
||||
|
||||
"using Stash" in {
|
||||
|
|
@ -307,55 +405,77 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
}
|
||||
|
||||
"using watch" in {
|
||||
//#watch
|
||||
import akka.actor.{ Actor, Props, Terminated }
|
||||
new AnyRef {
|
||||
//#watch
|
||||
import akka.actor.{ Actor, Props, Terminated }
|
||||
|
||||
class WatchActor extends Actor {
|
||||
val child = context.actorOf(Props.empty, "child")
|
||||
context.watch(child) // <-- this is the only call needed for registration
|
||||
var lastSender = system.deadLetters
|
||||
class WatchActor extends Actor {
|
||||
val child = context.actorOf(Props.empty, "child")
|
||||
context.watch(child) // <-- this is the only call needed for registration
|
||||
var lastSender = system.deadLetters
|
||||
|
||||
def receive = {
|
||||
case "kill" ⇒ context.stop(child); lastSender = sender
|
||||
case Terminated(`child`) ⇒ lastSender ! "finished"
|
||||
def receive = {
|
||||
case "kill" ⇒
|
||||
context.stop(child); lastSender = sender
|
||||
case Terminated(`child`) ⇒ lastSender ! "finished"
|
||||
}
|
||||
}
|
||||
//#watch
|
||||
val a = system.actorOf(Props(classOf[WatchActor], this))
|
||||
implicit val sender = testActor
|
||||
a ! "kill"
|
||||
expectMsg("finished")
|
||||
}
|
||||
//#watch
|
||||
val a = system.actorOf(Props(new WatchActor))
|
||||
implicit val sender = testActor
|
||||
a ! "kill"
|
||||
expectMsg("finished")
|
||||
}
|
||||
|
||||
"demonstrate ActorSelection" in {
|
||||
val context = system
|
||||
//#selection-local
|
||||
// will look up this absolute path
|
||||
context.actorSelection("/user/serviceA/aggregator")
|
||||
// will look up sibling beneath same supervisor
|
||||
context.actorSelection("../joe")
|
||||
//#selection-local
|
||||
//#selection-wildcard
|
||||
// will look all children to serviceB with names starting with worker
|
||||
context.actorSelection("/user/serviceB/worker*")
|
||||
// will look up all siblings beneath same supervisor
|
||||
context.actorSelection("../*")
|
||||
//#selection-wildcard
|
||||
//#selection-remote
|
||||
context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")
|
||||
//#selection-remote
|
||||
}
|
||||
|
||||
"using Identify" in {
|
||||
//#identify
|
||||
import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }
|
||||
new AnyRef {
|
||||
//#identify
|
||||
import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }
|
||||
|
||||
class Follower extends Actor {
|
||||
val identifyId = 1
|
||||
context.actorSelection("/user/another") ! Identify(identifyId)
|
||||
class Follower extends Actor {
|
||||
val identifyId = 1
|
||||
context.actorSelection("/user/another") ! Identify(identifyId)
|
||||
|
||||
def receive = {
|
||||
case ActorIdentity(`identifyId`, Some(ref)) ⇒
|
||||
context.watch(ref)
|
||||
context.become(active(ref))
|
||||
case ActorIdentity(`identifyId`, None) ⇒ context.stop(self)
|
||||
def receive = {
|
||||
case ActorIdentity(`identifyId`, Some(ref)) ⇒
|
||||
context.watch(ref)
|
||||
context.become(active(ref))
|
||||
case ActorIdentity(`identifyId`, None) ⇒ context.stop(self)
|
||||
|
||||
}
|
||||
|
||||
def active(another: ActorRef): Actor.Receive = {
|
||||
case Terminated(`another`) ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
//#identify
|
||||
|
||||
def active(another: ActorRef): Actor.Receive = {
|
||||
case Terminated(`another`) ⇒ context.stop(self)
|
||||
}
|
||||
val a = system.actorOf(Props.empty)
|
||||
val b = system.actorOf(Props(classOf[Follower], this))
|
||||
watch(b)
|
||||
system.stop(a)
|
||||
expectMsgType[akka.actor.Terminated].actor must be === b
|
||||
}
|
||||
//#identify
|
||||
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = Actor.emptyBehavior
|
||||
}))
|
||||
val b = system.actorOf(Props(new Follower))
|
||||
watch(b)
|
||||
system.stop(a)
|
||||
expectMsgType[akka.actor.Terminated].actor must be === b
|
||||
}
|
||||
|
||||
"using pattern gracefulStop" in {
|
||||
|
|
@ -397,20 +517,22 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
//#ask-pipeTo
|
||||
}
|
||||
|
||||
"replying with own or other sender" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case ref: ActorRef ⇒
|
||||
//#reply-with-sender
|
||||
sender.tell("reply", context.parent) // replies will go back to parent
|
||||
sender.!("reply")(context.parent) // alternative syntax (beware of the parens!)
|
||||
class Replier extends Actor {
|
||||
def receive = {
|
||||
case ref: ActorRef ⇒
|
||||
//#reply-with-sender
|
||||
case x ⇒
|
||||
//#reply-without-sender
|
||||
sender ! x // replies will go to this actor
|
||||
sender.tell("reply", context.parent) // replies will go back to parent
|
||||
sender.!("reply")(context.parent) // alternative syntax (beware of the parens!)
|
||||
//#reply-with-sender
|
||||
case x ⇒
|
||||
//#reply-without-sender
|
||||
}
|
||||
}))
|
||||
sender ! x // replies will go to this actor
|
||||
//#reply-without-sender
|
||||
}
|
||||
}
|
||||
|
||||
"replying with own or other sender" in {
|
||||
val actor = system.actorOf(Props(classOf[Replier], this))
|
||||
implicit val me = testActor
|
||||
actor ! 42
|
||||
expectMsg(42)
|
||||
|
|
@ -430,51 +552,51 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
})
|
||||
}
|
||||
|
||||
"using ComposableActor" in {
|
||||
//#receive-orElse2
|
||||
class PartialFunctionBuilder[A, B] {
|
||||
import scala.collection.immutable.Vector
|
||||
//#receive-orElse2
|
||||
class PartialFunctionBuilder[A, B] {
|
||||
import scala.collection.immutable.Vector
|
||||
|
||||
// Abbreviate to make code fit
|
||||
type PF = PartialFunction[A, B]
|
||||
// Abbreviate to make code fit
|
||||
type PF = PartialFunction[A, B]
|
||||
|
||||
private var pfsOption: Option[Vector[PF]] = Some(Vector.empty)
|
||||
private var pfsOption: Option[Vector[PF]] = Some(Vector.empty)
|
||||
|
||||
private def mapPfs[C](f: Vector[PF] ⇒ (Option[Vector[PF]], C)): C = {
|
||||
pfsOption.fold(throw new IllegalStateException("Already built"))(f) match {
|
||||
case (newPfsOption, result) ⇒ {
|
||||
pfsOption = newPfsOption
|
||||
result
|
||||
}
|
||||
private def mapPfs[C](f: Vector[PF] ⇒ (Option[Vector[PF]], C)): C = {
|
||||
pfsOption.fold(throw new IllegalStateException("Already built"))(f) match {
|
||||
case (newPfsOption, result) ⇒ {
|
||||
pfsOption = newPfsOption
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
def +=(pf: PF): Unit =
|
||||
mapPfs { case pfs ⇒ (Some(pfs :+ pf), ()) }
|
||||
|
||||
def result(): PF =
|
||||
mapPfs { case pfs ⇒ (None, pfs.foldLeft[PF](Map.empty) { _ orElse _ }) }
|
||||
}
|
||||
|
||||
trait ComposableActor extends Actor {
|
||||
protected lazy val receiveBuilder = new PartialFunctionBuilder[Any, Unit]
|
||||
final def receive = receiveBuilder.result()
|
||||
}
|
||||
def +=(pf: PF): Unit =
|
||||
mapPfs { case pfs ⇒ (Some(pfs :+ pf), ()) }
|
||||
|
||||
trait TheirComposableActor extends ComposableActor {
|
||||
receiveBuilder += {
|
||||
case "foo" ⇒ sender ! "foo received"
|
||||
}
|
||||
}
|
||||
def result(): PF =
|
||||
mapPfs { case pfs ⇒ (None, pfs.foldLeft[PF](Map.empty) { _ orElse _ }) }
|
||||
}
|
||||
|
||||
class MyComposableActor extends TheirComposableActor {
|
||||
receiveBuilder += {
|
||||
case "bar" ⇒ sender ! "bar received"
|
||||
}
|
||||
}
|
||||
//#receive-orElse2
|
||||
trait ComposableActor extends Actor {
|
||||
protected lazy val receiveBuilder = new PartialFunctionBuilder[Any, Unit]
|
||||
final def receive = receiveBuilder.result()
|
||||
}
|
||||
|
||||
val composed = system.actorOf(Props(new MyComposableActor))
|
||||
trait TheirComposableActor extends ComposableActor {
|
||||
receiveBuilder += {
|
||||
case "foo" ⇒ sender ! "foo received"
|
||||
}
|
||||
}
|
||||
|
||||
class MyComposableActor extends TheirComposableActor {
|
||||
receiveBuilder += {
|
||||
case "bar" ⇒ sender ! "bar received"
|
||||
}
|
||||
}
|
||||
//#receive-orElse2
|
||||
|
||||
"using ComposableActor" in {
|
||||
val composed = system.actorOf(Props(classOf[MyComposableActor], this))
|
||||
implicit val me = testActor
|
||||
composed ! "foo"
|
||||
expectMsg("foo received")
|
||||
|
|
|
|||
|
|
@ -14,180 +14,181 @@ import scala.collection.immutable
|
|||
|
||||
class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
|
||||
|
||||
"simple finite state machine" must {
|
||||
//#fsm-code-elided
|
||||
//#simple-imports
|
||||
import akka.actor.{ Actor, ActorRef, FSM }
|
||||
import scala.concurrent.duration._
|
||||
//#simple-imports
|
||||
//#simple-events
|
||||
// received events
|
||||
case class SetTarget(ref: ActorRef)
|
||||
case class Queue(obj: Any)
|
||||
case object Flush
|
||||
//#fsm-code-elided
|
||||
//#simple-imports
|
||||
import akka.actor.{ Actor, ActorRef, FSM }
|
||||
import scala.concurrent.duration._
|
||||
//#simple-imports
|
||||
//#simple-events
|
||||
// received events
|
||||
case class SetTarget(ref: ActorRef)
|
||||
case class Queue(obj: Any)
|
||||
case object Flush
|
||||
|
||||
// sent events
|
||||
case class Batch(obj: immutable.Seq[Any])
|
||||
//#simple-events
|
||||
//#simple-state
|
||||
// states
|
||||
sealed trait State
|
||||
case object Idle extends State
|
||||
case object Active extends State
|
||||
// sent events
|
||||
case class Batch(obj: immutable.Seq[Any])
|
||||
//#simple-events
|
||||
//#simple-state
|
||||
// states
|
||||
sealed trait State
|
||||
case object Idle extends State
|
||||
case object Active extends State
|
||||
|
||||
sealed trait Data
|
||||
case object Uninitialized extends Data
|
||||
case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data
|
||||
//#simple-state
|
||||
//#simple-fsm
|
||||
class Buncher extends Actor with FSM[State, Data] {
|
||||
sealed trait Data
|
||||
case object Uninitialized extends Data
|
||||
case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data
|
||||
//#simple-state
|
||||
//#simple-fsm
|
||||
class Buncher extends Actor with FSM[State, Data] {
|
||||
|
||||
//#fsm-body
|
||||
startWith(Idle, Uninitialized)
|
||||
//#fsm-body
|
||||
startWith(Idle, Uninitialized)
|
||||
|
||||
//#when-syntax
|
||||
when(Idle) {
|
||||
case Event(SetTarget(ref), Uninitialized) ⇒
|
||||
stay using Todo(ref, Vector.empty)
|
||||
//#when-syntax
|
||||
when(Idle) {
|
||||
case Event(SetTarget(ref), Uninitialized) ⇒
|
||||
stay using Todo(ref, Vector.empty)
|
||||
}
|
||||
//#when-syntax
|
||||
|
||||
//#transition-elided
|
||||
onTransition {
|
||||
case Active -> Idle ⇒
|
||||
stateData match {
|
||||
case Todo(ref, queue) ⇒ ref ! Batch(queue)
|
||||
}
|
||||
}
|
||||
//#transition-elided
|
||||
//#when-syntax
|
||||
|
||||
when(Active, stateTimeout = 1 second) {
|
||||
case Event(Flush | StateTimeout, t: Todo) ⇒
|
||||
goto(Idle) using t.copy(queue = Vector.empty)
|
||||
}
|
||||
//#when-syntax
|
||||
|
||||
//#unhandled-elided
|
||||
whenUnhandled {
|
||||
// common code for both states
|
||||
case Event(Queue(obj), t @ Todo(_, v)) ⇒
|
||||
goto(Active) using t.copy(queue = v :+ obj)
|
||||
|
||||
case Event(e, s) ⇒
|
||||
log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
|
||||
stay
|
||||
}
|
||||
//#unhandled-elided
|
||||
//#fsm-body
|
||||
|
||||
initialize()
|
||||
}
|
||||
//#simple-fsm
|
||||
object DemoCode {
|
||||
trait StateType
|
||||
case object SomeState extends StateType
|
||||
case object Processing extends StateType
|
||||
case object Error extends StateType
|
||||
case object Idle extends StateType
|
||||
case object Active extends StateType
|
||||
|
||||
class Dummy extends Actor with FSM[StateType, Int] {
|
||||
class X
|
||||
val newData = 42
|
||||
object WillDo
|
||||
object Tick
|
||||
|
||||
//#modifier-syntax
|
||||
when(SomeState) {
|
||||
case Event(msg, _) ⇒
|
||||
goto(Processing) using (newData) forMax (5 seconds) replying (WillDo)
|
||||
}
|
||||
//#when-syntax
|
||||
//#modifier-syntax
|
||||
|
||||
//#transition-elided
|
||||
//#transition-syntax
|
||||
onTransition {
|
||||
case Active -> Idle ⇒
|
||||
stateData match {
|
||||
case Todo(ref, queue) ⇒ ref ! Batch(queue)
|
||||
}
|
||||
case Idle -> Active ⇒ setTimer("timeout", Tick, 1 second, true)
|
||||
case Active -> _ ⇒ cancelTimer("timeout")
|
||||
case x -> Idle ⇒ log.info("entering Idle from " + x)
|
||||
}
|
||||
//#transition-elided
|
||||
//#when-syntax
|
||||
//#transition-syntax
|
||||
|
||||
when(Active, stateTimeout = 1 second) {
|
||||
case Event(Flush | StateTimeout, t: Todo) ⇒
|
||||
goto(Idle) using t.copy(queue = Vector.empty)
|
||||
//#alt-transition-syntax
|
||||
onTransition(handler _)
|
||||
|
||||
def handler(from: StateType, to: StateType) {
|
||||
// handle it here ...
|
||||
}
|
||||
//#when-syntax
|
||||
//#alt-transition-syntax
|
||||
|
||||
//#unhandled-elided
|
||||
//#stop-syntax
|
||||
when(Error) {
|
||||
case Event("stop", _) ⇒
|
||||
// do cleanup ...
|
||||
stop()
|
||||
}
|
||||
//#stop-syntax
|
||||
|
||||
//#transform-syntax
|
||||
when(SomeState)(transform {
|
||||
case Event(bytes: ByteString, read) ⇒ stay using (read + bytes.length)
|
||||
} using {
|
||||
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 ⇒
|
||||
goto(Processing)
|
||||
})
|
||||
//#transform-syntax
|
||||
|
||||
//#alt-transform-syntax
|
||||
val processingTrigger: PartialFunction[State, State] = {
|
||||
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 ⇒
|
||||
goto(Processing)
|
||||
}
|
||||
|
||||
when(SomeState)(transform {
|
||||
case Event(bytes: ByteString, read) ⇒ stay using (read + bytes.length)
|
||||
} using processingTrigger)
|
||||
//#alt-transform-syntax
|
||||
|
||||
//#termination-syntax
|
||||
onTermination {
|
||||
case StopEvent(FSM.Normal, state, data) ⇒ // ...
|
||||
case StopEvent(FSM.Shutdown, state, data) ⇒ // ...
|
||||
case StopEvent(FSM.Failure(cause), state, data) ⇒ // ...
|
||||
}
|
||||
//#termination-syntax
|
||||
|
||||
//#unhandled-syntax
|
||||
whenUnhandled {
|
||||
// common code for both states
|
||||
case Event(Queue(obj), t @ Todo(_, v)) ⇒
|
||||
goto(Active) using t.copy(queue = v :+ obj)
|
||||
|
||||
case Event(e, s) ⇒
|
||||
log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
|
||||
case Event(x: X, data) ⇒
|
||||
log.info("Received unhandled event: " + x)
|
||||
stay
|
||||
case Event(msg, _) ⇒
|
||||
log.warning("Received unknown event: " + msg)
|
||||
goto(Error)
|
||||
}
|
||||
//#unhandled-elided
|
||||
//#fsm-body
|
||||
|
||||
initialize()
|
||||
}
|
||||
//#simple-fsm
|
||||
object DemoCode {
|
||||
trait StateType
|
||||
case object SomeState extends StateType
|
||||
case object Processing extends StateType
|
||||
case object Error extends StateType
|
||||
case object Idle extends StateType
|
||||
case object Active extends StateType
|
||||
|
||||
class Dummy extends Actor with FSM[StateType, Int] {
|
||||
class X
|
||||
val newData = 42
|
||||
object WillDo
|
||||
object Tick
|
||||
|
||||
//#modifier-syntax
|
||||
when(SomeState) {
|
||||
case Event(msg, _) ⇒
|
||||
goto(Processing) using (newData) forMax (5 seconds) replying (WillDo)
|
||||
}
|
||||
//#modifier-syntax
|
||||
|
||||
//#transition-syntax
|
||||
onTransition {
|
||||
case Idle -> Active ⇒ setTimer("timeout", Tick, 1 second, true)
|
||||
case Active -> _ ⇒ cancelTimer("timeout")
|
||||
case x -> Idle ⇒ log.info("entering Idle from " + x)
|
||||
}
|
||||
//#transition-syntax
|
||||
|
||||
//#alt-transition-syntax
|
||||
onTransition(handler _)
|
||||
|
||||
def handler(from: StateType, to: StateType) {
|
||||
// handle it here ...
|
||||
}
|
||||
//#alt-transition-syntax
|
||||
|
||||
//#stop-syntax
|
||||
when(Error) {
|
||||
case Event("stop", _) ⇒
|
||||
// do cleanup ...
|
||||
stop()
|
||||
}
|
||||
//#stop-syntax
|
||||
|
||||
//#transform-syntax
|
||||
when(SomeState)(transform {
|
||||
case Event(bytes: ByteString, read) ⇒ stay using (read + bytes.length)
|
||||
} using {
|
||||
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 ⇒
|
||||
goto(Processing)
|
||||
})
|
||||
//#transform-syntax
|
||||
|
||||
//#alt-transform-syntax
|
||||
val processingTrigger: PartialFunction[State, State] = {
|
||||
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 ⇒
|
||||
goto(Processing)
|
||||
}
|
||||
|
||||
when(SomeState)(transform {
|
||||
case Event(bytes: ByteString, read) ⇒ stay using (read + bytes.length)
|
||||
} using processingTrigger)
|
||||
//#alt-transform-syntax
|
||||
|
||||
//#termination-syntax
|
||||
onTermination {
|
||||
case StopEvent(FSM.Normal, state, data) ⇒ // ...
|
||||
case StopEvent(FSM.Shutdown, state, data) ⇒ // ...
|
||||
case StopEvent(FSM.Failure(cause), state, data) ⇒ // ...
|
||||
}
|
||||
//#termination-syntax
|
||||
|
||||
//#unhandled-syntax
|
||||
whenUnhandled {
|
||||
case Event(x: X, data) ⇒
|
||||
log.info("Received unhandled event: " + x)
|
||||
stay
|
||||
case Event(msg, _) ⇒
|
||||
log.warning("Received unknown event: " + msg)
|
||||
goto(Error)
|
||||
}
|
||||
//#unhandled-syntax
|
||||
|
||||
}
|
||||
|
||||
//#logging-fsm
|
||||
import akka.actor.LoggingFSM
|
||||
class MyFSM extends Actor with LoggingFSM[StateType, Data] {
|
||||
//#body-elided
|
||||
override def logDepth = 12
|
||||
onTermination {
|
||||
case StopEvent(FSM.Failure(_), state, data) ⇒
|
||||
val lastEvents = getLog.mkString("\n\t")
|
||||
log.warning("Failure in state " + state + " with data " + data + "\n" +
|
||||
"Events leading up to this point:\n\t" + lastEvents)
|
||||
}
|
||||
// ...
|
||||
//#body-elided
|
||||
}
|
||||
//#logging-fsm
|
||||
//#unhandled-syntax
|
||||
|
||||
}
|
||||
//#fsm-code-elided
|
||||
|
||||
//#logging-fsm
|
||||
import akka.actor.LoggingFSM
|
||||
class MyFSM extends Actor with LoggingFSM[StateType, Data] {
|
||||
//#body-elided
|
||||
override def logDepth = 12
|
||||
onTermination {
|
||||
case StopEvent(FSM.Failure(_), state, data) ⇒
|
||||
val lastEvents = getLog.mkString("\n\t")
|
||||
log.warning("Failure in state " + state + " with data " + data + "\n" +
|
||||
"Events leading up to this point:\n\t" + lastEvents)
|
||||
}
|
||||
// ...
|
||||
//#body-elided
|
||||
}
|
||||
//#logging-fsm
|
||||
|
||||
}
|
||||
//#fsm-code-elided
|
||||
|
||||
"simple finite state machine" must {
|
||||
|
||||
"demonstrate NullFunction" in {
|
||||
class A extends Actor with FSM[Int, Null] {
|
||||
|
|
@ -199,7 +200,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
|
|||
}
|
||||
|
||||
"batch correctly" in {
|
||||
val buncher = system.actorOf(Props(new Buncher))
|
||||
val buncher = system.actorOf(Props(classOf[Buncher], this))
|
||||
buncher ! SetTarget(testActor)
|
||||
buncher ! Queue(42)
|
||||
buncher ! Queue(43)
|
||||
|
|
@ -212,7 +213,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
|
|||
}
|
||||
|
||||
"not batch if uninitialized" in {
|
||||
val buncher = system.actorOf(Props(new Buncher))
|
||||
val buncher = system.actorOf(Props(classOf[Buncher], this))
|
||||
buncher ! Queue(42)
|
||||
expectNoMsg
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ class CounterService extends Actor {
|
|||
|
||||
case Entry(k, v) if k == key && counter == None ⇒
|
||||
// Reply from Storage of the initial value, now we can create the Counter
|
||||
val c = context.actorOf(Props(new Counter(key, v)))
|
||||
val c = context.actorOf(Props(classOf[Counter], key, v))
|
||||
counter = Some(c)
|
||||
// Tell the counter to use current storage
|
||||
c ! UseStorage(storage)
|
||||
|
|
|
|||
|
|
@ -38,27 +38,30 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
}
|
||||
|
||||
"schedule a recurring task" in {
|
||||
//#schedule-recurring
|
||||
val Tick = "tick"
|
||||
val tickActor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case Tick ⇒ //Do something
|
||||
new AnyRef {
|
||||
//#schedule-recurring
|
||||
val Tick = "tick"
|
||||
class TickActor extends Actor {
|
||||
def receive = {
|
||||
case Tick ⇒ //Do something
|
||||
}
|
||||
}
|
||||
}))
|
||||
//Use system's dispatcher as ExecutionContext
|
||||
import system.dispatcher
|
||||
val tickActor = system.actorOf(Props(classOf[TickActor], this))
|
||||
//Use system's dispatcher as ExecutionContext
|
||||
import system.dispatcher
|
||||
|
||||
//This will schedule to send the Tick-message
|
||||
//to the tickActor after 0ms repeating every 50ms
|
||||
val cancellable =
|
||||
system.scheduler.schedule(0 milliseconds,
|
||||
50 milliseconds,
|
||||
tickActor,
|
||||
Tick)
|
||||
//This will schedule to send the Tick-message
|
||||
//to the tickActor after 0ms repeating every 50ms
|
||||
val cancellable =
|
||||
system.scheduler.schedule(0 milliseconds,
|
||||
50 milliseconds,
|
||||
tickActor,
|
||||
Tick)
|
||||
|
||||
//This cancels further Ticks to be sent
|
||||
cancellable.cancel()
|
||||
//#schedule-recurring
|
||||
system.stop(tickActor)
|
||||
//This cancels further Ticks to be sent
|
||||
cancellable.cancel()
|
||||
//#schedule-recurring
|
||||
system.stop(tickActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,8 +44,8 @@ object CustomRouteExample {
|
|||
// example from a MicroKernel
|
||||
val system = ActorSystem("some-system")
|
||||
val producer = system.actorOf(Props[Producer1])
|
||||
val mediator = system.actorOf(Props(new Transformer(producer)))
|
||||
val consumer = system.actorOf(Props(new Consumer3(mediator)))
|
||||
val mediator = system.actorOf(Props(classOf[Transformer], producer))
|
||||
val consumer = system.actorOf(Props(classOf[Consumer3], mediator))
|
||||
CamelExtension(system).context.addRoutes(new CustomRouteBuilder)
|
||||
//#CustomRouteExample
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,8 +43,8 @@ object HttpExample {
|
|||
// to your boot class.
|
||||
val system = ActorSystem("some-system")
|
||||
val httpTransformer = system.actorOf(Props[HttpTransformer])
|
||||
val httpProducer = system.actorOf(Props(new HttpProducer(httpTransformer)))
|
||||
val httpConsumer = system.actorOf(Props(new HttpConsumer(httpProducer)))
|
||||
val httpProducer = system.actorOf(Props(classOf[HttpProducer], httpTransformer))
|
||||
val httpConsumer = system.actorOf(Props(classOf[HttpConsumer], httpProducer))
|
||||
//#HttpExample
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ object Producers {
|
|||
}
|
||||
val system = ActorSystem("some-system")
|
||||
val receiver = system.actorOf(Props[ResponseReceiver])
|
||||
val forwardResponse = system.actorOf(Props(
|
||||
new Forwarder("http://localhost:8080/news/akka", receiver)))
|
||||
val forwardResponse = system.actorOf(
|
||||
Props(classOf[Forwarder], this, "http://localhost:8080/news/akka", receiver))
|
||||
// the Forwarder sends out a request to the web page and forwards the response to
|
||||
// the ResponseReceiver
|
||||
forwardResponse ! "some request"
|
||||
|
|
@ -81,7 +81,7 @@ object Producers {
|
|||
}
|
||||
|
||||
val system = ActorSystem("some-system")
|
||||
val producer = system.actorOf(Props(new OnewaySender("activemq:FOO.BAR")))
|
||||
val producer = system.actorOf(Props(classOf[OnewaySender], this, "activemq:FOO.BAR"))
|
||||
producer ! "Some message"
|
||||
//#Oneway
|
||||
|
||||
|
|
|
|||
|
|
@ -1,47 +1,44 @@
|
|||
package docs.camel
|
||||
|
||||
object PublishSubscribe {
|
||||
{
|
||||
//#PubSub
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
||||
import akka.camel.{ Producer, CamelMessage, Consumer }
|
||||
//#PubSub
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
||||
import akka.camel.{ Producer, CamelMessage, Consumer }
|
||||
|
||||
class Subscriber(name: String, uri: String) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
class Subscriber(name: String, uri: String) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ println("%s received: %s" format (name, msg.body))
|
||||
}
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ println("%s received: %s" format (name, msg.body))
|
||||
}
|
||||
|
||||
class Publisher(name: String, uri: String) extends Actor with Producer {
|
||||
|
||||
def endpointUri = uri
|
||||
|
||||
// one-way communication with JMS
|
||||
override def oneway = true
|
||||
}
|
||||
|
||||
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ {
|
||||
publisher ! msg.bodyAs[String]
|
||||
sender ! ("message published")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add below to a Boot class
|
||||
// Setup publish/subscribe example
|
||||
val system = ActorSystem("some-system")
|
||||
val jmsUri = "jms:topic:test"
|
||||
val jmsSubscriber1 = system.actorOf(Props(new Subscriber("jms-subscriber-1", jmsUri)))
|
||||
val jmsSubscriber2 = system.actorOf(Props(new Subscriber("jms-subscriber-2", jmsUri)))
|
||||
val jmsPublisher = system.actorOf(Props(new Publisher("jms-publisher", jmsUri)))
|
||||
val jmsPublisherBridge = system.actorOf(Props(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)))
|
||||
//#PubSub
|
||||
|
||||
}
|
||||
|
||||
class Publisher(name: String, uri: String) extends Actor with Producer {
|
||||
|
||||
def endpointUri = uri
|
||||
|
||||
// one-way communication with JMS
|
||||
override def oneway = true
|
||||
}
|
||||
|
||||
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage ⇒ {
|
||||
publisher ! msg.bodyAs[String]
|
||||
sender ! ("message published")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add below to a Boot class
|
||||
// Setup publish/subscribe example
|
||||
val system = ActorSystem("some-system")
|
||||
val jmsUri = "jms:topic:test"
|
||||
val jmsSubscriber1 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-1", jmsUri))
|
||||
val jmsSubscriber2 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-2", jmsUri))
|
||||
val jmsPublisher = system.actorOf(Props(classOf[Publisher], "jms-publisher", jmsUri))
|
||||
val jmsPublisherBridge = system.actorOf(Props(classOf[PublisherBridge], "jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))
|
||||
//#PubSub
|
||||
}
|
||||
|
|
|
|||
|
|
@ -210,11 +210,11 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
|||
}
|
||||
|
||||
"defining priority dispatcher" in {
|
||||
//#prio-dispatcher
|
||||
new AnyRef {
|
||||
//#prio-dispatcher
|
||||
|
||||
// We create a new Actor that just prints out what it processes
|
||||
val a = system.actorOf(
|
||||
Props(new Actor {
|
||||
// We create a new Actor that just prints out what it processes
|
||||
class Logger extends Actor {
|
||||
val log: LoggingAdapter = Logging(context.system, this)
|
||||
|
||||
self ! 'lowpriority
|
||||
|
|
@ -229,22 +229,24 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
|||
def receive = {
|
||||
case x ⇒ log.info(x.toString)
|
||||
}
|
||||
}).withDispatcher("prio-dispatcher"))
|
||||
}
|
||||
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher("prio-dispatcher"))
|
||||
|
||||
/*
|
||||
Logs:
|
||||
'highpriority
|
||||
'highpriority
|
||||
'pigdog
|
||||
'pigdog2
|
||||
'pigdog3
|
||||
'lowpriority
|
||||
'lowpriority
|
||||
*/
|
||||
//#prio-dispatcher
|
||||
/*
|
||||
* Logs:
|
||||
* 'highpriority
|
||||
* 'highpriority
|
||||
* 'pigdog
|
||||
* 'pigdog2
|
||||
* 'pigdog3
|
||||
* 'lowpriority
|
||||
* 'lowpriority
|
||||
*/
|
||||
//#prio-dispatcher
|
||||
|
||||
watch(a)
|
||||
expectMsgPF() { case Terminated(`a`) ⇒ () }
|
||||
watch(a)
|
||||
expectMsgPF() { case Terminated(`a`) ⇒ () }
|
||||
}
|
||||
}
|
||||
|
||||
"defining balancing dispatcher" in {
|
||||
|
|
|
|||
|
|
@ -72,21 +72,24 @@ class LoggingDocSpec extends AkkaSpec {
|
|||
import LoggingDocSpec.MyActor
|
||||
|
||||
"use a logging actor" in {
|
||||
val myActor = system.actorOf(Props(new MyActor))
|
||||
val myActor = system.actorOf(Props[MyActor])
|
||||
myActor ! "test"
|
||||
}
|
||||
|
||||
"allow registration to dead letters" in {
|
||||
//#deadletters
|
||||
import akka.actor.{ Actor, DeadLetter, Props }
|
||||
new AnyRef {
|
||||
//#deadletters
|
||||
import akka.actor.{ Actor, DeadLetter, Props }
|
||||
|
||||
val listener = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case d: DeadLetter ⇒ println(d)
|
||||
class Listener extends Actor {
|
||||
def receive = {
|
||||
case d: DeadLetter ⇒ println(d)
|
||||
}
|
||||
}
|
||||
}))
|
||||
system.eventStream.subscribe(listener, classOf[DeadLetter])
|
||||
//#deadletters
|
||||
val listener = system.actorOf(Props(classOf[Listener], this))
|
||||
system.eventStream.subscribe(listener, classOf[DeadLetter])
|
||||
//#deadletters
|
||||
}
|
||||
}
|
||||
|
||||
"demonstrate logging more arguments" in {
|
||||
|
|
|
|||
|
|
@ -237,6 +237,6 @@ case class OKResponse(body: ByteString, keepAlive: Boolean)
|
|||
object Main extends App {
|
||||
val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 8080
|
||||
val system = ActorSystem()
|
||||
val server = system.actorOf(Props(new HttpServer(port)))
|
||||
val server = system.actorOf(Props(classOf[HttpServer], port))
|
||||
}
|
||||
//#main
|
||||
|
|
|
|||
|
|
@ -167,37 +167,8 @@ class PipelinesDocSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"demonstrate management port and context" in {
|
||||
//#actor
|
||||
class Processor(cmds: ActorRef, evts: ActorRef) extends Actor {
|
||||
|
||||
val ctx = new HasActorContext with HasByteOrder {
|
||||
def getContext = Processor.this.context
|
||||
def byteOrder = java.nio.ByteOrder.BIG_ENDIAN
|
||||
}
|
||||
|
||||
val pipeline = PipelineFactory.buildWithSinkFunctions(ctx,
|
||||
new TickGenerator(1000.millis) >>
|
||||
new MessageStage >>
|
||||
new LengthFieldFrame(10000) //
|
||||
)(
|
||||
// failure in the pipeline will fail this actor
|
||||
cmd ⇒ cmds ! cmd.get,
|
||||
evt ⇒ evts ! evt.get)
|
||||
|
||||
def receive = {
|
||||
case m: Message ⇒ pipeline.injectCommand(m)
|
||||
case b: ByteString ⇒ pipeline.injectEvent(b)
|
||||
case t: TickGenerator.Trigger ⇒ pipeline.managementCommand(t)
|
||||
}
|
||||
}
|
||||
//#actor
|
||||
|
||||
import TickGenerator.Tick
|
||||
val proc = system.actorOf(Props(new Processor(testActor, testActor) {
|
||||
override def receive = ({
|
||||
case "fail!" ⇒ throw new RuntimeException("FAIL!")
|
||||
}: Receive) orElse super.receive
|
||||
}), "processor")
|
||||
val proc = system.actorOf(Props(classOf[P], this, testActor, testActor), "processor")
|
||||
expectMsgType[Tick]
|
||||
proc ! msg
|
||||
val encoded = expectMsgType[ByteString]
|
||||
|
|
@ -222,4 +193,35 @@ class PipelinesDocSpec extends AkkaSpec {
|
|||
|
||||
}
|
||||
|
||||
//#actor
|
||||
class Processor(cmds: ActorRef, evts: ActorRef) extends Actor {
|
||||
|
||||
val ctx = new HasActorContext with HasByteOrder {
|
||||
def getContext = Processor.this.context
|
||||
def byteOrder = java.nio.ByteOrder.BIG_ENDIAN
|
||||
}
|
||||
|
||||
val pipeline = PipelineFactory.buildWithSinkFunctions(ctx,
|
||||
new TickGenerator(1000.millis) >>
|
||||
new MessageStage >>
|
||||
new LengthFieldFrame(10000) //
|
||||
)(
|
||||
// failure in the pipeline will fail this actor
|
||||
cmd ⇒ cmds ! cmd.get,
|
||||
evt ⇒ evts ! evt.get)
|
||||
|
||||
def receive = {
|
||||
case m: Message ⇒ pipeline.injectCommand(m)
|
||||
case b: ByteString ⇒ pipeline.injectEvent(b)
|
||||
case t: TickGenerator.Trigger ⇒ pipeline.managementCommand(t)
|
||||
}
|
||||
}
|
||||
//#actor
|
||||
|
||||
class P(cmds: ActorRef, evts: ActorRef) extends Processor(cmds, evts) {
|
||||
override def receive = ({
|
||||
case "fail!" ⇒ throw new RuntimeException("FAIL!")
|
||||
}: Receive) orElse super.receive
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -88,12 +88,12 @@ class SchedulerPatternSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"send periodic ticks from the constructor" taggedAs TimingTest in {
|
||||
testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))),
|
||||
testSchedule(system.actorOf(Props(classOf[ScheduleInConstructor], testActor)),
|
||||
3000 millis, 2000 millis)
|
||||
}
|
||||
|
||||
"send ticks from the preStart and receive" taggedAs TimingTest in {
|
||||
testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))),
|
||||
testSchedule(system.actorOf(Props(classOf[ScheduleInConstructor], testActor)),
|
||||
3000 millis, 2500 millis)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,15 +34,15 @@ class TestKitUsageSpec
|
|||
with WordSpec with ShouldMatchers with BeforeAndAfterAll {
|
||||
import TestKitUsageSpec._
|
||||
|
||||
val echoRef = system.actorOf(Props(new EchoActor))
|
||||
val forwardRef = system.actorOf(Props(new ForwardingActor(testActor)))
|
||||
val filterRef = system.actorOf(Props(new FilteringActor(testActor)))
|
||||
val echoRef = system.actorOf(Props[EchoActor])
|
||||
val forwardRef = system.actorOf(Props(classOf[ForwardingActor], testActor))
|
||||
val filterRef = system.actorOf(Props(classOf[FilteringActor], testActor))
|
||||
val randomHead = Random.nextInt(6)
|
||||
val randomTail = Random.nextInt(10)
|
||||
val headList = immutable.Seq().padTo(randomHead, "0")
|
||||
val tailList = immutable.Seq().padTo(randomTail, "1")
|
||||
val seqRef =
|
||||
system.actorOf(Props(new SequencingActor(testActor, headList, tailList)))
|
||||
system.actorOf(Props(classOf[SequencingActor], testActor, headList, tailList))
|
||||
|
||||
override def afterAll {
|
||||
system.shutdown()
|
||||
|
|
|
|||
|
|
@ -208,9 +208,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
|
||||
"demonstrate probe watch" in {
|
||||
import akka.testkit.TestProbe
|
||||
val target = system.actorOf(Props(new Actor {
|
||||
def receive = Actor.emptyBehavior
|
||||
}))
|
||||
val target = system.actorOf(Props.empty)
|
||||
//#test-probe-watch
|
||||
val probe = TestProbe()
|
||||
probe watch target
|
||||
|
|
@ -237,7 +235,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
import akka.actor.Props
|
||||
//#test-probe-forward
|
||||
val probe = TestProbe()
|
||||
val source = system.actorOf(Props(new Source(probe.ref)))
|
||||
val source = system.actorOf(Props(classOf[Source], probe.ref))
|
||||
val dest = system.actorOf(Props[Destination])
|
||||
source ! "start"
|
||||
probe.expectMsg("work")
|
||||
|
|
|
|||
|
|
@ -199,6 +199,7 @@ class TransactorDocSpec extends AkkaSpec {
|
|||
|
||||
val system = ActorSystem("transactors")
|
||||
|
||||
// FIXME, or remove the whole transactor module, srsly
|
||||
lazy val underlyingCounter = new Counter
|
||||
val counter = system.actorOf(Props(underlyingCounter), name = "counter")
|
||||
val coordinated = Coordinated()(Timeout(5 seconds))
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
package docs.zeromq
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.util.ByteString
|
||||
|
|
@ -12,6 +11,7 @@ import akka.testkit._
|
|||
import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind }
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ZeromqDocSpec {
|
||||
|
||||
|
|
@ -122,18 +122,25 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
|
|||
Bind("tcp://127.0.0.1:21231"))
|
||||
//#pub-socket
|
||||
|
||||
//#sub-socket
|
||||
import akka.zeromq._
|
||||
val listener = system.actorOf(Props(new Actor {
|
||||
def receive: Receive = {
|
||||
case Connecting ⇒ //...
|
||||
case m: ZMQMessage ⇒ //...
|
||||
case _ ⇒ //...
|
||||
val sub: { def subSocket: ActorRef; def listener: ActorRef } = new AnyRef {
|
||||
//#sub-socket
|
||||
import akka.zeromq._
|
||||
|
||||
class Listener extends Actor {
|
||||
def receive: Receive = {
|
||||
case Connecting ⇒ //...
|
||||
case m: ZMQMessage ⇒ //...
|
||||
case _ ⇒ //...
|
||||
}
|
||||
}
|
||||
}))
|
||||
val subSocket = ZeroMQExtension(system).newSocket(SocketType.Sub,
|
||||
Listener(listener), Connect("tcp://127.0.0.1:21231"), SubscribeAll)
|
||||
//#sub-socket
|
||||
|
||||
val listener = system.actorOf(Props(classOf[Listener], this))
|
||||
val subSocket = ZeroMQExtension(system).newSocket(SocketType.Sub,
|
||||
Listener(listener), Connect("tcp://127.0.0.1:21231"), SubscribeAll)
|
||||
//#sub-socket
|
||||
}
|
||||
val listener = sub.listener
|
||||
|
||||
//#sub-topic-socket
|
||||
val subTopicSocket = ZeroMQExtension(system).newSocket(SocketType.Sub,
|
||||
|
|
@ -149,7 +156,7 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
|
|||
pubSocket ! ZMQMessage(ByteString("foo.bar"), ByteString(payload))
|
||||
//#pub-topic
|
||||
|
||||
system.stop(subSocket)
|
||||
system.stop(sub.subSocket)
|
||||
system.stop(subTopicSocket)
|
||||
|
||||
//#high-watermark
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue