Merge branch 'master' into wip-2103-cluster-routers-patriknw
Conflicts: akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala
This commit is contained in:
commit
b400b0b818
77 changed files with 720 additions and 490 deletions
164
akka-docs/cluster/cluster-usage.rst
Normal file
164
akka-docs/cluster/cluster-usage.rst
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
|
||||
.. _cluster_usage:
|
||||
|
||||
###############
|
||||
Cluster Usage
|
||||
###############
|
||||
|
||||
.. note:: This document describes how to use the features implemented so far of the
|
||||
new clustering coming in Akka Coltrane and is not available in the latest stable release.
|
||||
The API might change before it is released.
|
||||
|
||||
For introduction to the Akka Cluster concepts please see :ref:`cluster`.
|
||||
|
||||
Preparing Your Project for Clustering
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:
|
||||
|
||||
.. parsed-literal::
|
||||
|
||||
"com.typesafe.akka" % "akka-cluster_|scalaVersion|" % "2.1-SNAPSHOT"
|
||||
|
||||
If you are using the latest nightly build you should pick a timestamped Akka version from `<http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/akka-cluster/>`_.
|
||||
|
||||
A Simple Cluster Example
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The following small program together with its configuration starts an ``ActorSystem``
|
||||
with the Cluster extension enabled. It joins the cluster and logs some membership events.
|
||||
|
||||
Try it out:
|
||||
|
||||
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
|
||||
|
||||
|
||||
.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
|
||||
:language: none
|
||||
|
||||
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala`
|
||||
settings and the ``akka.cluster.seed-nodes`` to your ``application.conf`` file.
|
||||
|
||||
The seed nodes are configured contact points for initial, automatic, join of the cluster.
|
||||
|
||||
Note that if you are going to start the nodes on different machines you need to specify the
|
||||
ip-addresses or host names of the machines in ``application.conf`` instead of ``127.0.0.1``
|
||||
|
||||
2. Add the following main program to your project, place it in ``src/main/scala``:
|
||||
|
||||
.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala
|
||||
:language: scala
|
||||
|
||||
|
||||
3. Start the first seed node. Open a sbt session in one terminal window and run::
|
||||
|
||||
run-main sample.cluster.ClusterApp 2551
|
||||
|
||||
2551 corresponds to the port of the first seed-nodes element in the configuration.
|
||||
In the log output you see that the cluster node has been started and changed status to 'Up'.
|
||||
|
||||
4. Start the second seed node. Open a sbt session in another terminal window and run::
|
||||
|
||||
run-main sample.cluster.ClusterApp 2552
|
||||
|
||||
|
||||
2552 corresponds to the port of the second seed-nodes element in the configuration.
|
||||
In the log output you see that the cluster node has been started and joins the other seed node
|
||||
and becomes a member of the cluster. It's status changed to 'Up'.
|
||||
|
||||
Switch over to the first terminal window and see in the log output that the member joined.
|
||||
|
||||
5. Start another node. Open a sbt session in yet another terminal window and run::
|
||||
|
||||
run-main sample.cluster.ClusterApp
|
||||
|
||||
Now you don't need to specify the port number, and it will use a random available port.
|
||||
It joins one of the configured seed nodes. Look at the log output in the different terminal
|
||||
windows.
|
||||
|
||||
Start even more nodes in the same way, if you like.
|
||||
|
||||
6. Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows.
|
||||
The other nodes will detect the failure after a while, which you can see in the log
|
||||
output in the other terminals.
|
||||
|
||||
Look at the source code of the program again. What it does is to create an actor
|
||||
and register it as subscriber of certain cluster events. It gets notified with
|
||||
an snapshot event, 'CurrentClusterState' that holds full state information of
|
||||
the cluster. After that it receives events for changes that happen in the cluster.
|
||||
|
||||
Automatic vs. Manual Joining
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
You may decide if joining to the cluster should be done automatically or manually.
|
||||
By default it is automatic and you need to define the seed nodes in configuration
|
||||
so that a new node has an initial contact point. When a new node is started it
|
||||
sends a message to all seed nodes and then sends join command to the one that
|
||||
answers first. If no one of the seed nodes replied (might not be started yet)
|
||||
it retries this procedure until successful or shutdown.
|
||||
|
||||
There is one thing to be aware of regarding the seed node configured as the
|
||||
first element in the ``seed-nodes`` configuration list.
|
||||
The seed nodes can be started in any order and it is not necessary to have all
|
||||
seed nodes running, but the first seed node must be started when initially
|
||||
starting a cluster, otherwise the other seed-nodes will not become initialized
|
||||
and no other node can join the cluster. Once more than two seed nodes have been
|
||||
started it is no problem to shut down the first seed node. If it goes down it
|
||||
must be manually joined to the cluster again.
|
||||
Automatic joining of the first seed node is not possible, it would only join
|
||||
itself. It is only the first seed node that has this restriction.
|
||||
|
||||
You can disable automatic joining with configuration:
|
||||
|
||||
akka.cluster.auto-join = off
|
||||
|
||||
Then you need to join manually, using JMX or the provided script.
|
||||
You can join to any node in the cluster. It doesn't have to be configured as
|
||||
seed node. If you are not using auto-join there is no need to configure
|
||||
seed nodes at all.
|
||||
|
||||
Joining can also be performed programatically with ``Cluster(system).join``.
|
||||
|
||||
|
||||
Automatic vs. Manual Downing
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
When a member is considered by the failure detector to be unreachable the
|
||||
leader is not allowed to perform its duties, such as changing status of
|
||||
new joining members to 'Up'. The status of the unreachable member must be
|
||||
changed to 'Down'. This can be performed automatically or manually. By
|
||||
default it must be done manually, using using JMX or the provided script.
|
||||
|
||||
It can also be performed programatically with ``Cluster(system).down``.
|
||||
|
||||
You can enable automatic downing with configuration:
|
||||
|
||||
akka.cluster.auto-down = on
|
||||
|
||||
Be aware of that using auto-down implies that two separate clusters will
|
||||
automatically be formed in case of network partition. That might be
|
||||
desired by some applications but not by others.
|
||||
|
||||
Configuration
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
There are several configuration properties for the cluster. We refer to the following
|
||||
reference file for more information:
|
||||
|
||||
|
||||
.. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
It is recommended that you change the ``tick-duration`` to 33 ms or less
|
||||
of the default scheduler when using cluster, if you don't need to have it
|
||||
configured to a longer duration for other reasons. If you don't do this
|
||||
a dedicated scheduler will be used for periodic tasks of the cluster, which
|
||||
introduce the extra overhead of another thread.
|
||||
|
||||
::
|
||||
|
||||
# shorter tick-duration of default scheduler when using cluster
|
||||
akka.scheduler.tick-duration.tick-duration = 33ms
|
||||
|
||||
|
||||
|
||||
|
|
@ -5,3 +5,4 @@ Cluster
|
|||
:maxdepth: 2
|
||||
|
||||
cluster
|
||||
cluster-usage
|
||||
|
|
|
|||
|
|
@ -206,7 +206,7 @@ but processed afterwards.
|
|||
Normally stopping a child (i.e. not in response to a failure) will not
|
||||
automatically terminate the other children in an all-for-one strategy, that can
|
||||
easily be done by watching their lifecycle: if the :class:`Terminated` message
|
||||
is not handled by the supervisor, it will throw a :class:`DeathPathException`
|
||||
is not handled by the supervisor, it will throw a :class:`DeathPactException`
|
||||
which (depending on its supervisor) will restart it, and the default
|
||||
:meth:`preRestart` action will terminate all children. Of course this can be
|
||||
handled explicitly as well.
|
||||
|
|
|
|||
|
|
@ -317,7 +317,7 @@ public class UntypedActorDocTestBase {
|
|||
Procedure<Object> angry = new Procedure<Object>() {
|
||||
@Override
|
||||
public void apply(Object message) {
|
||||
if (message.equals("foo")) {
|
||||
if (message.equals("bar")) {
|
||||
getSender().tell("I am already angry?");
|
||||
} else if (message.equals("foo")) {
|
||||
getContext().become(happy);
|
||||
|
|
|
|||
|
|
@ -6,9 +6,10 @@ Futures (Java)
|
|||
Introduction
|
||||
------------
|
||||
|
||||
In Akka, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a data structure used
|
||||
to retrieve the result of some concurrent operation. This operation is usually performed by an ``Actor`` or
|
||||
by the ``Dispatcher`` directly. This result can be accessed synchronously (blocking) or asynchronously (non-blocking).
|
||||
In the Scala Standard Library, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a data structure
|
||||
used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking)
|
||||
or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface
|
||||
in ``akka.dispatch.Futures``.
|
||||
|
||||
Execution Contexts
|
||||
------------------
|
||||
|
|
@ -27,7 +28,7 @@ Use with Actors
|
|||
There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg)``),
|
||||
which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``.
|
||||
|
||||
Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future.
|
||||
Using the ``ActorRef``\'s ``ask`` method to send a message will return a ``Future``.
|
||||
To wait for and retrieve the actual result the simplest method is:
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocTestBase.java
|
||||
|
|
@ -68,7 +69,7 @@ Or failures:
|
|||
Functional Futures
|
||||
------------------
|
||||
|
||||
Akka's ``Future`` has several monadic methods that are very similar to the ones used by ``Scala``'s collections.
|
||||
Scala's ``Future`` has several monadic methods that are very similar to the ones used by ``Scala``'s collections.
|
||||
These allow you to create 'pipelines' or 'streams' that the result will travel through.
|
||||
|
||||
Future is a Monad
|
||||
|
|
@ -81,12 +82,12 @@ The return value of the ``map`` method is another ``Future`` that will contain t
|
|||
.. includecode:: code/docs/future/FutureDocTestBase.java
|
||||
:include: imports2,map
|
||||
|
||||
In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete,
|
||||
In this example we are joining two strings together within a ``Future``. Instead of waiting for f1 to complete,
|
||||
we apply our function that calculates the length of the string using the ``map`` method.
|
||||
Now we have a second Future, f2, that will eventually contain an ``Integer``.
|
||||
When our original ``Future``, f1, completes, it will also apply our function and complete the second Future
|
||||
Now we have a second ``Future``, f2, that will eventually contain an ``Integer``.
|
||||
When our original ``Future``, f1, completes, it will also apply our function and complete the second ``Future``
|
||||
with its result. When we finally ``get`` the result, it will contain the number 10.
|
||||
Our original Future still contains the string "HelloWorld" and is unaffected by the ``map``.
|
||||
Our original ``Future`` still contains the string "HelloWorld" and is unaffected by the ``map``.
|
||||
|
||||
Something to note when using these methods: if the ``Future`` is still being processed when one of these methods are called,
|
||||
it will be the completing thread that actually does the work.
|
||||
|
|
@ -115,7 +116,7 @@ to have this done concurrently, and for that we use ``flatMap``:
|
|||
.. includecode:: code/docs/future/FutureDocTestBase.java
|
||||
:include: flat-map
|
||||
|
||||
Now our second Future is executed concurrently as well. This technique can also be used to combine the results
|
||||
Now our second ``Future`` is executed concurrently as well. This technique can also be used to combine the results
|
||||
of several Futures into a single calculation, which will be better explained in the following sections.
|
||||
|
||||
If you need to do conditional propagation, you can use ``filter``:
|
||||
|
|
@ -157,7 +158,7 @@ That's all it takes!
|
|||
|
||||
|
||||
If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be empty String.
|
||||
In some cases you don't have a start-value and you're able to use the value of the first completing Future
|
||||
In some cases you don't have a start-value and you're able to use the value of the first completing ``Future``
|
||||
in the sequence as the start-value, you can use ``reduce``, it works like this:
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocTestBase.java
|
||||
|
|
@ -172,7 +173,7 @@ Callbacks
|
|||
---------
|
||||
|
||||
Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new Future, but by side-effecting.
|
||||
For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first.
|
||||
For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first.
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocTestBase.java
|
||||
:include: onSuccess
|
||||
|
|
@ -188,8 +189,8 @@ Ordering
|
|||
|
||||
Since callbacks are executed in any order and potentially in parallel,
|
||||
it can be tricky at the times when you need sequential ordering of operations.
|
||||
But there's a solution! And it's name is ``andThen``, and it creates a new Future with
|
||||
the specified callback, a Future that will have the same result as the Future it's called on,
|
||||
But there's a solution! And it's name is ``andThen``, and it creates a new ``Future`` with
|
||||
the specified callback, a ``Future`` that will have the same result as the ``Future`` it's called on,
|
||||
which allows for ordering like in the following sample:
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocTestBase.java
|
||||
|
|
|
|||
|
|
@ -75,7 +75,8 @@ How Routing is Designed within Akka
|
|||
|
||||
Routers behave like single actors, but they should also not hinder scalability.
|
||||
This apparent contradiction is solved by making routers be represented by a
|
||||
special :class:`RoutedActorRef`, which dispatches incoming messages destined
|
||||
special :class:`RoutedActorRef` (implementation detail, what the user gets is
|
||||
an :class:`ActorRef` as usual) which dispatches incoming messages destined
|
||||
for the routees without actually invoking the router actor’s behavior (and thus
|
||||
avoiding its mailbox; the single router actor’s task is to manage all aspects
|
||||
related to the lifecycle of the routees). This means that the code which decides
|
||||
|
|
|
|||
|
|
@ -81,7 +81,8 @@ a top level actor, that is supervised by the system (internal guardian actor).
|
|||
|
||||
The name parameter is optional, but you should preferably name your actors, since
|
||||
that is used in log messages and for identifying actors. The name must not be empty
|
||||
or start with ``$``. If the given name is already in use by another child to the
|
||||
or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space).
|
||||
If the given name is already in use by another child to the
|
||||
same parent actor an `InvalidActorNameException` is thrown.
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
|
|
|
|||
|
|
@ -181,6 +181,18 @@ v2.1::
|
|||
}
|
||||
}, ec);
|
||||
|
||||
API changes of DynamicAccess
|
||||
============================
|
||||
|
||||
All methods with scala.Either[Throwable, X] have been changed to used scala.util.Try[X].
|
||||
|
||||
DynamicAccess.withErrorHandling has been removed since scala.util.Try now fulfills that role.
|
||||
|
||||
API changes of Serialization
|
||||
============================
|
||||
|
||||
All methods with scala.Either[Throwable, X] have been changed to used scala.util.Try[X].
|
||||
|
||||
Empty Props
|
||||
===========
|
||||
|
||||
|
|
@ -201,7 +213,18 @@ v2.0 Java::
|
|||
|
||||
v2.1 Java::
|
||||
|
||||
ActorRef router2 = system.actorOf(new Props().withRouter(RoundRobinRouter.create(routees)));
|
||||
ActorRef router2 = system.actorOf(new Props().withRouter(RoundRobinRouter.create(routees)));
|
||||
|
||||
Props: Function-based creation
|
||||
==============================
|
||||
|
||||
v2.0 Scala::
|
||||
|
||||
Props(context => { case someMessage => context.sender ! someMessage })
|
||||
|
||||
v2.1 Scala::
|
||||
|
||||
Props(new Actor { def receive = { case someMessage => sender ! someMessage } })
|
||||
|
||||
Failing Send
|
||||
============
|
||||
|
|
@ -256,6 +279,13 @@ If you don't want these in the log you need to add this to your configuration::
|
|||
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
|
||||
Stash postStop
|
||||
==============
|
||||
|
||||
Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sure that
|
||||
stashed messages are put into the dead letters when the actor stops, make sure you call
|
||||
super.postStop if you override it.
|
||||
|
||||
|
||||
Custom Router or Resizer
|
||||
========================
|
||||
|
|
|
|||
|
|
@ -75,7 +75,8 @@ a top level actor, that is supervised by the system (internal guardian actor).
|
|||
|
||||
The name parameter is optional, but you should preferably name your actors, since
|
||||
that is used in log messages and for identifying actors. The name must not be empty
|
||||
or start with ``$``. If the given name is already in use by another child to the
|
||||
or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space).
|
||||
If the given name is already in use by another child to the
|
||||
same parent actor an `InvalidActorNameException` is thrown.
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
|
|
@ -108,6 +109,11 @@ Here is an example:
|
|||
meaning of an actor restart, which is described here:
|
||||
:ref:`supervision-restart`.
|
||||
|
||||
.. warning::
|
||||
|
||||
Also avoid passing mutable state into the constructor of the Actor, since
|
||||
the call-by-name block can be executed by another thread.
|
||||
|
||||
Props
|
||||
-----
|
||||
|
||||
|
|
@ -380,7 +386,7 @@ futures, because this is likely to be a common combination. Please note that
|
|||
all of the above is completely non-blocking and asynchronous: ``ask`` produces
|
||||
a :class:`Future`, three of which are composed into a new future using the
|
||||
for-comprehension and then ``pipeTo`` installs an ``onComplete``-handler on the
|
||||
future to effect the submission of the aggregated :class:`Result` to another
|
||||
future to affect the submission of the aggregated :class:`Result` to another
|
||||
actor.
|
||||
|
||||
Using ``ask`` will send a message to the receiving Actor as with ``tell``, and
|
||||
|
|
|
|||
|
|
@ -372,8 +372,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
val f: Future[Result] =
|
||||
for {
|
||||
x ← ask(actorA, Request).mapTo[Int] // call pattern directly
|
||||
s ← actorB ask Request mapTo manifest[String] // call by implicit conversion
|
||||
d ← actorC ? Request mapTo manifest[Double] // call by symbolic name
|
||||
s ← (actorB ask Request).mapTo[String] // call by implicit conversion
|
||||
d ← (actorC ? Request).mapTo[Double] // call by symbolic name
|
||||
} yield Result(x, s, d)
|
||||
|
||||
f pipeTo actorD // .. or ..
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val f1 = Future {
|
||||
"Hello" + "World"
|
||||
}
|
||||
val f2 = Promise.successful(3).future
|
||||
val f2 = Future.successful(3)
|
||||
val f3 = f1 map { x ⇒
|
||||
f2 map { y ⇒
|
||||
x.length * y
|
||||
|
|
@ -132,7 +132,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val f1 = Future {
|
||||
"Hello" + "World"
|
||||
}
|
||||
val f2 = Promise.successful(3).future
|
||||
val f2 = Future.successful(3)
|
||||
val f3 = f1 flatMap { x ⇒
|
||||
f2 map { y ⇒
|
||||
x.length * y
|
||||
|
|
@ -145,7 +145,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
|
||||
"demonstrate usage of filter" in {
|
||||
//#filter
|
||||
val future1 = Promise.successful(4).future
|
||||
val future1 = Future.successful(4)
|
||||
val future2 = future1.filter(_ % 2 == 0)
|
||||
val result = Await.result(future2, 1 second)
|
||||
result must be(4)
|
||||
|
|
@ -290,8 +290,8 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val msg1 = -1
|
||||
//#try-recover
|
||||
val future = akka.pattern.ask(actor, msg1) recoverWith {
|
||||
case e: ArithmeticException ⇒ Promise.successful(0).future
|
||||
case foo: IllegalArgumentException ⇒ Promise.failed[Int](new IllegalStateException("All br0ken!")).future
|
||||
case e: ArithmeticException ⇒ Future.successful(0)
|
||||
case foo: IllegalArgumentException ⇒ Future.failed[Int](new IllegalStateException("All br0ken!"))
|
||||
}
|
||||
//#try-recover
|
||||
Await.result(future, 1 second) must be(0)
|
||||
|
|
@ -343,7 +343,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
Await.result(future, 1 second) must be("foo")
|
||||
}
|
||||
{
|
||||
val future = Promise.failed[String](new IllegalStateException("OHNOES")).future
|
||||
val future = Future.failed[String](new IllegalStateException("OHNOES"))
|
||||
//#onFailure
|
||||
future onFailure {
|
||||
case ise: IllegalStateException if ise.getMessage == "OHNOES" ⇒
|
||||
|
|
@ -367,12 +367,12 @@ class FutureDocSpec extends AkkaSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"demonstrate usage of Promise.success & Promise.failed" in {
|
||||
"demonstrate usage of Future.successful & Future.failed" in {
|
||||
//#successful
|
||||
val future = Promise.successful("Yay!").future
|
||||
val future = Future.successful("Yay!")
|
||||
//#successful
|
||||
//#failed
|
||||
val otherFuture = Promise.failed[String](new IllegalArgumentException("Bang!")).future
|
||||
val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!"))
|
||||
//#failed
|
||||
Await.result(future, 1 second) must be("Yay!")
|
||||
intercept[IllegalArgumentException] { Await.result(otherFuture, 1 second) }
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@
|
|||
package docs.routing
|
||||
|
||||
import RouterDocSpec.MyActor
|
||||
import akka.actor.{ Props, Actor }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.routing.RoundRobinRouter
|
||||
import akka.actor.{ ActorRef, Props, Actor }
|
||||
|
||||
object RouterDocSpec {
|
||||
class MyActor extends Actor {
|
||||
|
|
@ -21,7 +21,7 @@ class RouterDocSpec extends AkkaSpec {
|
|||
import RouterDocSpec._
|
||||
|
||||
//#dispatchers
|
||||
val router = system.actorOf(Props[MyActor]
|
||||
val router: ActorRef = system.actorOf(Props[MyActor]
|
||||
.withRouter(RoundRobinRouter(5, routerDispatcher = "router")) // “head” will run on "router" dispatcher
|
||||
.withDispatcher("workers")) // MyActor workers will run on "workers" dispatcher
|
||||
//#dispatchers
|
||||
|
|
|
|||
|
|
@ -49,12 +49,12 @@ object ZeromqDocSpec {
|
|||
val timestamp = System.currentTimeMillis
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity)
|
||||
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload)))
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity)
|
||||
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload)))
|
||||
}
|
||||
|
|
@ -71,18 +71,12 @@ object ZeromqDocSpec {
|
|||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Heap]) match {
|
||||
case Right(Heap(timestamp, used, max)) ⇒
|
||||
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get
|
||||
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
|
||||
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.load" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Load]) match {
|
||||
case Right(Load(timestamp, loadAverage)) ⇒
|
||||
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1), classOf[Load]).get
|
||||
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
|
||||
}
|
||||
}
|
||||
//#logger
|
||||
|
|
@ -97,13 +91,10 @@ object ZeromqDocSpec {
|
|||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Heap]) match {
|
||||
case Right(Heap(timestamp, used, max)) ⇒
|
||||
if ((used.toDouble / max) > 0.9) count += 1
|
||||
else count = 0
|
||||
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get
|
||||
if ((used.toDouble / max) > 0.9) count += 1
|
||||
else count = 0
|
||||
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
|
||||
}
|
||||
}
|
||||
//#alerter
|
||||
|
|
|
|||
|
|
@ -7,9 +7,9 @@ Futures (Scala)
|
|||
Introduction
|
||||
------------
|
||||
|
||||
In Akka, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a data structure used to
|
||||
retrieve the result of some concurrent operation. This operation is usually performed by an ``Actor``
|
||||
or by the ``Dispatcher`` directly. This result can be accessed synchronously (blocking) or asynchronously (non-blocking).
|
||||
In the Scala Standard Library, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a data structure
|
||||
used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking)
|
||||
or asynchronously (non-blocking).
|
||||
|
||||
Execution Contexts
|
||||
------------------
|
||||
|
|
@ -28,7 +28,7 @@ Use With Actors
|
|||
There are generally two ways of getting a reply from an ``Actor``: the first is by a sent message (``actor ! msg``),
|
||||
which only works if the original sender was an ``Actor``) and the second is through a ``Future``.
|
||||
|
||||
Using an ``Actor``\'s ``?`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
|
||||
Using an ``Actor``\'s ``?`` method to send a message will return a ``Future``. To wait for and retrieve the actual result the simplest method is:
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocSpec.scala
|
||||
:include: ask-blocking
|
||||
|
|
@ -61,7 +61,7 @@ with the return value of the block used to complete the ``Future`` (in this case
|
|||
Unlike a ``Future`` that is returned from an ``Actor``, this ``Future`` is properly typed,
|
||||
and we also avoid the overhead of managing an ``Actor``.
|
||||
|
||||
You can also create already completed Futures using the ``Promise`` companion, which can be either successes:
|
||||
You can also create already completed Futures using the ``Future`` companion, which can be either successes:
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocSpec.scala
|
||||
:include: successful
|
||||
|
|
@ -74,7 +74,7 @@ Or failures:
|
|||
Functional Futures
|
||||
------------------
|
||||
|
||||
Akka's ``Future`` has several monadic methods that are very similar to the ones used by Scala's collections.
|
||||
Scala's ``Future`` has several monadic methods that are very similar to the ones used by Scala's collections.
|
||||
These allow you to create 'pipelines' or 'streams' that the result will travel through.
|
||||
|
||||
Future is a Monad
|
||||
|
|
@ -148,7 +148,7 @@ Here we have 2 actors processing a single message each. Once the 2 results are a
|
|||
(note that we don't block to get these results!), they are being added together and sent to a third ``Actor``,
|
||||
which replies with a string, which we assign to 'result'.
|
||||
|
||||
This is fine when dealing with a known amount of Actors, but can grow unwieldy if we have more then a handful.
|
||||
This is fine when dealing with a known amount of Actors, but can grow unwieldy if we have more than a handful.
|
||||
The ``sequence`` and ``traverse`` helper methods can make it easier to handle more complex use cases.
|
||||
Both of these methods are ways of turning, for a subclass ``T`` of ``Traversable``, ``T[Future[A]]`` into a ``Future[T[A]]``.
|
||||
For example:
|
||||
|
|
@ -185,20 +185,20 @@ That's all it takes!
|
|||
|
||||
|
||||
If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be 0.
|
||||
In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence
|
||||
In some cases you don't have a start-value and you're able to use the value of the first completing ``Future`` in the sequence
|
||||
as the start-value, you can use ``reduce``, it works like this:
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocSpec.scala
|
||||
:include: reduce
|
||||
|
||||
Same as with ``fold``, the execution will be done asynchronously when the last of the Future is completed,
|
||||
Same as with ``fold``, the execution will be done asynchronously when the last of the ``Future`` is completed,
|
||||
you can also parallelize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
|
||||
|
||||
Callbacks
|
||||
---------
|
||||
|
||||
Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new Future, but by side-effecting.
|
||||
For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first.
|
||||
Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new ``Future``, but by side-effecting.
|
||||
For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first.
|
||||
|
||||
.. includecode:: code/docs/future/FutureDocSpec.scala
|
||||
:include: onSuccess
|
||||
|
|
|
|||
|
|
@ -75,7 +75,8 @@ How Routing is Designed within Akka
|
|||
|
||||
Routers behave like single actors, but they should also not hinder scalability.
|
||||
This apparent contradiction is solved by making routers be represented by a
|
||||
special :class:`RoutedActorRef`, which dispatches incoming messages destined
|
||||
special :class:`RoutedActorRef` (implementation detail, what the user gets is
|
||||
an :class:`ActorRef` as usual) which dispatches incoming messages destined
|
||||
for the routees without actually invoking the router actor’s behavior (and thus
|
||||
avoiding its mailbox; the single router actor’s task is to manage all aspects
|
||||
related to the lifecycle of the routees). This means that the code which decides
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue