#1859 - rewriting dispatcher docs

This commit is contained in:
Viktor Klang 2012-02-24 14:28:17 +01:00
parent 2c5f65b0b2
commit 1e7ce2bfc7
4 changed files with 264 additions and 314 deletions

View file

@ -60,19 +60,17 @@ public class DispatcherDocTestBase {
@Test @Test
public void defineDispatcher() { public void defineDispatcher() {
//#defining-dispatcher //#defining-dispatcher
ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), ActorRef myActor =
"myactor1"); system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor3");
"myactor2");
//#defining-dispatcher //#defining-dispatcher
} }
@Test @Test
public void definePinnedDispatcher() { public void definePinnedDispatcher() {
//#defining-pinned-dispatcher //#defining-pinned-dispatcher
String name = "myactor";
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class) ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)
.withDispatcher("myactor-dispatcher"), name); .withDispatcher("my-pinned-dispatcher"));
//#defining-pinned-dispatcher //#defining-pinned-dispatcher
} }
@ -80,11 +78,13 @@ public class DispatcherDocTestBase {
public void priorityDispatcher() throws Exception { public void priorityDispatcher() throws Exception {
//#prio-dispatcher //#prio-dispatcher
ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes // We create a new Actor that just prints out what it processes
ActorRef myActor = system.actorOf(
new Props().withCreator(new UntypedActorFactory() { new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() { public UntypedActor create() {
return new UntypedActor() { return new UntypedActor() {
LoggingAdapter log = Logging.getLogger(getContext().system(), this); LoggingAdapter log =
Logging.getLogger(getContext().system(), this);
{ {
getSelf().tell("lowpriority"); getSelf().tell("lowpriority");
getSelf().tell("lowpriority"); getSelf().tell("lowpriority");
@ -101,7 +101,7 @@ public class DispatcherDocTestBase {
} }
}; };
} }
}).withDispatcher("prio-dispatcher-java")); }).withDispatcher("prio-dispatcher"));
/* /*
Logs: Logs:
@ -123,19 +123,20 @@ public class DispatcherDocTestBase {
} }
//#prio-mailbox //#prio-mailbox
public static class PrioMailbox extends UnboundedPriorityMailbox { public static class MyPrioMailbox extends UnboundedPriorityMailbox {
public PrioMailbox(Config config) { // needed for reflective instantiation public MyPrioMailbox(Config config) { // needed for reflective instantiation
super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important // Create a new PriorityGenerator, lower prio means more important
super(new PriorityGenerator() {
@Override @Override
public int gen(Object message) { public int gen(Object message) {
if (message.equals("highpriority")) if (message.equals("highpriority"))
return 0; // 'highpriority messages should be treated first if possible return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority")) else if (message.equals("lowpriority"))
return 100; // 'lowpriority messages should be treated last if possible return 2; // 'lowpriority messages should be treated last if possible
else if (message.equals(Actors.poisonPill())) else if (message.equals(Actors.poisonPill()))
return 1000; // PoisonPill when no other left return 3; // PoisonPill when no other left
else else
return 50; // We default to 50 return 1; // By default they go between high and low prio
} }
}); });
} }

View file

@ -7,204 +7,157 @@ Dispatchers (Java)
.. contents:: :local: .. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. to execute arbitrary code, for instance :ref:`futures-java`.
The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM.
Default dispatcher Default dispatcher
------------------ ------------------
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases.
section of the :ref:`configuration`.
If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors Setting the dispatcher for an Actor
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. -----------------------------------
See below for details on which ones are available and how they can be configured.
.. warning:: So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is:
Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher
as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used.
Setting the dispatcher .. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher
----------------------
You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key .. note::
of the dispatcher settings. The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration.
So in this example it's a top-level section, but you could for instance put it as a sub-section,
where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java And then you just need to configure that dispatcher in your configuration:
:include: imports,defining-dispatcher
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
And here's another example that uses the "thread-pool-executor":
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
For more options, see the default-dispatcher section of the :ref:`configuration`.
Types of dispatchers Types of dispatchers
-------------------- --------------------
There are 4 different types of message dispatchers: There are 4 different types of message dispatchers:
* Thread-based (Pinned) * Dispatcher
* Event-based
* Priority event-based
* Work-sharing (Balancing)
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. - Sharability: Unlimited
Example of a custom event-based dispatcher, which can be used with - Mailboxes: Any, creates one per Actor
``new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher")``
as in the example above:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config - Use cases: Default dispatcher, Bulkheading
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See - Driven by: ``java.util.concurrent.ExecutorService``
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override specify using "executor" using "fork-join-executor",
the values for the ``default-dispatcher`` in your configuration. "thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
.. note:: * PinnedDispatcher
It should be noted that the ``dispatcher-id`` used in :class:`Props` is in - Sharability: None
fact an absolute path into the configuration object, i.e. you can declare a
dispatcher configuration nested within other configuration objects and refer
to it like so: ``"my.config.object.myAwesomeDispatcher"``
There are two different executor services: - Mailboxes: Any, creates one per Actor
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for - Use cases: Bulkheading
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
Note that the pool size is configured differently for the two executor services. The configuration above - Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator``
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: by default a "thread-pool-executor"
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config * BalancingDispatcher
Let's now walk through the different dispatchers in more detail. - Sharability: Actors of the same type only
Thread-based - Mailboxes: Any, creates one for all Actors
^^^^^^^^^^^^
The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a - Use cases: Work-sharing
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other.
The ``PinnedDispatcher`` is configured like this: - Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
* CallingThreadDispatcher
- Sharability: Unlimited
- Mailboxes: Any, creates one per Actor per Thread (on demand)
- Use cases: Testing
- Driven by: The calling thread (duh)
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Configuring a ``PinnedDispatcher``:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Note that it must be used with ``executor = "thread-pool-executor"``. And then using it:
Event-based .. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher
^^^^^^^^^^^
The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a Mailboxes
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable ---------
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance.
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
design and implement your system in the most efficient way in regards to performance, throughput and latency.
It comes with many different predefined BlockingQueue configurations: Builtin implementations
^^^^^^^^^^^^^^^^^^^^^^^
* Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_ Akka comes shipped with a number of default mailbox implementations:
* Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's * UnboundedMailbox
thread as a way to slow him down and balance producer/consumer.
Here is an example of a bounded mailbox: - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config - Blocking: No
The standard :class:`Dispatcher` allows you to define the ``throughput`` it - Bounded: No
should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will batch process up to ``throughput`` messages together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available. * BoundedMailbox
Priority event-based - Backed by a ``java.util.concurrent.LinkedBlockingQueue``
^^^^^^^^^^^^^^^^^^^^
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply - Blocking: Yes
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended).
Creating a Dispatcher with a mailbox using PriorityGenerator: - Bounded: Yes
Config: * UnboundedPriorityMailbox
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala - Backed by a ``java.util.concurrent.PriorityBlockingQueue``
:include: prio-dispatcher-config-java
Priority mailbox: - Blocking: Yes
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - Bounded: No
:include: imports-prio-mailbox,prio-mailbox
Usage: * BoundedPriorityMailbox
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
:include: imports-prio,prio-dispatcher
- Blocking: Yes
Work-sharing event-based - Bounded: Yes
^^^^^^^^^^^^^^^^^^^^^^^^^
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to * Durable mailboxes, see :ref:`durable-mailboxes`.
share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config Mailbox configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_ How to create a PriorityMailbox:
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
Making the Actor mailbox bounded .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherTestBase.java#prio-mailbox
--------------------------------
Global configuration And then add it to the configuration:
^^^^^^^^^^^^^^^^^^^^
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
if not specified otherwise.
.. code-block:: ruby And then an example on how you would use it:
akka {
actor {
default-dispatcher {
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specified
mailbox-capacity = 1000
}
}
}
Per-instance based configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also do it on a specific dispatcher instance.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
if the message cannot be added to the mailbox within the time specified by the pushTimeout.
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher

View file

@ -6,15 +6,10 @@ package akka.docs.dispatcher
import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.util.duration._ import akka.util.duration._
import akka.actor.PoisonPill import akka.actor.{ Props, Actor, PoisonPill }
import akka.dispatch.MessageDispatcherConfigurator
import akka.dispatch.MessageDispatcher
import akka.dispatch.DispatcherPrerequisites
object DispatcherDocSpec { object DispatcherDocSpec {
val config = """ val config = """
@ -33,8 +28,9 @@ object DispatcherDocSpec {
# Max number of threads to cap factor-based parallelism number to # Max number of threads to cap factor-based parallelism number to
parallelism-max = 10 parallelism-max = 10
} }
# Throughput defines the number of messages that are processed in a batch before the # Throughput defines the maximum number of messages to be
# thread is returned to the pool. Set to 1 for as fair as possible. # processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100 throughput = 100
} }
//#my-dispatcher-config //#my-dispatcher-config
@ -54,8 +50,9 @@ object DispatcherDocSpec {
# maximum number of threads to cap factor-based number to # maximum number of threads to cap factor-based number to
core-pool-size-max = 10 core-pool-size-max = 10
} }
# Throughput defines the number of messages that are processed in a batch before the # Throughput defines the maximum number of messages to be
# thread is returned to the pool. Set to 1 for as fair as possible. # processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100 throughput = 100
} }
//#my-thread-pool-dispatcher-config //#my-thread-pool-dispatcher-config
@ -94,13 +91,14 @@ object DispatcherDocSpec {
//#prio-dispatcher-config //#prio-dispatcher-config
prio-dispatcher { prio-dispatcher {
mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$PrioMailbox" mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
} }
//#prio-dispatcher-config //#prio-dispatcher-config
//#prio-dispatcher-config-java //#prio-dispatcher-config-java
prio-dispatcher-java { prio-dispatcher-java {
mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$PrioMailbox" mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox"
//Other dispatcher configuration goes here
} }
//#prio-dispatcher-config-java //#prio-dispatcher-config-java
""" """
@ -108,17 +106,24 @@ object DispatcherDocSpec {
//#prio-mailbox //#prio-mailbox
import akka.dispatch.PriorityGenerator import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedPriorityMailbox import akka.dispatch.UnboundedPriorityMailbox
import akka.dispatch.MailboxType
import akka.actor.ActorContext
import com.typesafe.config.Config import com.typesafe.config.Config
// We create a new Priority dispatcher and seed it with the priority generator // We inherit, in this case, from UnboundedPriorityMailbox
class PrioMailbox(config: Config) extends UnboundedPriorityMailbox( // and seed it with the priority generator
PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important class MyPrioMailbox(config: Config) extends UnboundedPriorityMailbox(
case 'highpriority 0 // 'highpriority messages should be treated first if possible // Create a new PriorityGenerator, lower prio means more important
case 'lowpriority 100 // 'lowpriority messages should be treated last if possible PriorityGenerator {
case PoisonPill 1000 // PoisonPill when no other left // 'highpriority messages should be treated first if possible
case otherwise 50 // We default to 50 case 'highpriority 0
// 'lowpriority messages should be treated last if possible
case 'lowpriority 2
// PoisonPill when no other left
case PoisonPill 3
// We default to 1, which is in between high and low
case otherwise 1
}) })
//#prio-mailbox //#prio-mailbox
@ -127,6 +132,29 @@ object DispatcherDocSpec {
case x case x
} }
} }
//#mailbox-implementation-example
case class MyUnboundedMailbox() extends akka.dispatch.MailboxType {
import akka.actor.ActorContext
import com.typesafe.config.Config
import java.util.concurrent.ConcurrentLinkedQueue
import akka.dispatch.{
Envelope,
MessageQueue,
QueueBasedMessageQueue,
UnboundedMessageQueueSemantics
}
// This constructor signature must exist, it will be called by Akka
def this(config: Config) = this()
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorContext]): MessageQueue =
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
//#mailbox-implementation-example
}
} }
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
@ -134,10 +162,11 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
import DispatcherDocSpec.MyActor import DispatcherDocSpec.MyActor
"defining dispatcher" in { "defining dispatcher" in {
val context = system
//#defining-dispatcher //#defining-dispatcher
import akka.actor.Props import akka.actor.Props
val myActor1 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor1") val myActor =
val myActor2 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor2") context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
//#defining-dispatcher //#defining-dispatcher
} }
@ -146,15 +175,18 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
} }
"defining pinned dispatcher" in { "defining pinned dispatcher" in {
val context = system
//#defining-pinned-dispatcher //#defining-pinned-dispatcher
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor") val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")
//#defining-pinned-dispatcher //#defining-pinned-dispatcher
} }
"defining priority dispatcher" in { "defining priority dispatcher" in {
//#prio-dispatcher //#prio-dispatcher
val a = system.actorOf( // We create a new Actor that just prints out what it processes // We create a new Actor that just prints out what it processes
val a = system.actorOf(
Props(new Actor { Props(new Actor {
val log: LoggingAdapter = Logging(context.system, this) val log: LoggingAdapter = Logging(context.system, this)

View file

@ -7,202 +7,166 @@ Dispatchers (Scala)
.. contents:: :local: .. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. to execute arbitrary code, for instance :ref:`futures-scala`.
The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM.
Default dispatcher Default dispatcher
------------------ ------------------
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases.
section of the :ref:`configuration`.
If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors Setting the dispatcher for an Actor
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. -----------------------------------
See below for details on which ones are available and how they can be configured.
.. warning:: So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is:
Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher
as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used.
Setting the dispatcher .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher
----------------------
You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key .. note::
of the dispatcher settings. The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration.
So in this example it's a top-level section, but you could for instance put it as a sub-section,
where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala And then you just need to configure that dispatcher in your configuration:
:include: imports,defining-dispatcher
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
And here's another example that uses the "thread-pool-executor":
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
For more options, see the default-dispatcher section of the :ref:`configuration`.
Types of dispatchers Types of dispatchers
-------------------- --------------------
There are 4 different types of message dispatchers: There are 4 different types of message dispatchers:
* Thread-based (Pinned) * Dispatcher
* Event-based
* Priority event-based
* Work-sharing (Balancing)
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. - Sharability: Unlimited
Example of a custom event-based dispatcher, which can be used with ``Props[MyActor].withDispatcher("my-dispatcher")`` - Mailboxes: Any, creates one per Actor
as in the example above:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config - Use cases: Default dispatcher, Bulkheading
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See - Driven by: ``java.util.concurrent.ExecutorService``
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override specify using "executor" using "fork-join-executor",
the values for the ``default-dispatcher`` in your configuration. "thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
.. note:: * PinnedDispatcher
It should be noted that the ``dispatcher-id`` used in :class:`Props` is in - Sharability: None
fact an absolute path into the configuration object, i.e. you can declare a
dispatcher configuration nested within other configuration objects and refer
to it like so: ``"my.config.object.myAwesomeDispatcher"``
There are two different executor services: - Mailboxes: Any, creates one per Actor
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for - Use cases: Bulkheading
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
Note that the pool size is configured differently for the two executor services. The configuration above - Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator``
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: by default a "thread-pool-executor"
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config * BalancingDispatcher
Let's now walk through the different dispatchers in more detail. - Sharability: Actors of the same type only
Thread-based - Mailboxes: Any, creates one for all Actors
^^^^^^^^^^^^
The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a - Use cases: Work-sharing
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other.
The ``PinnedDispatcher`` is configured like this: - Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config * CallingThreadDispatcher
Note that it must be used with ``executor = "thread-pool-executor"``. - Sharability: Unlimited
Event-based - Mailboxes: Any, creates one per Actor per Thread (on demand)
^^^^^^^^^^^
The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a - Use cases: Testing
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. - Driven by: The calling thread (duh)
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
design and implement your system in the most efficient way in regards to performance, throughput and latency.
It comes with many different predefined BlockingQueue configurations: More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_ Configuring a ``PinnedDispatcher``:
* Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
thread as a way to slow him down and balance producer/consumer.
Here is an example of a bounded mailbox: And then using it:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher
The standard :class:`Dispatcher` allows you to define the ``throughput`` it Mailboxes
should have, as shown above. This defines the number of messages for a specific ---------
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will batch process up to ``throughput`` messages together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available. An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance.
Priority event-based Builtin implementations
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply Akka comes shipped with a number of default mailbox implementations:
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended).
Creating a Dispatcher with a mailbox using PriorityGenerator: * UnboundedMailbox
Config: - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - Blocking: No
:include: prio-dispatcher-config
Priority mailbox: - Bounded: No
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala * BoundedMailbox
:include: prio-mailbox
Usage: - Backed by a ``java.util.concurrent.LinkedBlockingQueue``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - Blocking: Yes
:include: prio-dispatcher
Work-sharing event-based - Bounded: Yes
^^^^^^^^^^^^^^^^^^^^^^^^^
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to * UnboundedPriorityMailbox
share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config - Backed by a ``java.util.concurrent.PriorityBlockingQueue``
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_ - Blocking: Yes
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
Making the Actor mailbox bounded - Bounded: No
--------------------------------
Global configuration * BoundedPriorityMailbox
^^^^^^^^^^^^^^^^^^^^
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
if not specified otherwise.
.. code-block:: ruby - Blocking: Yes
akka { - Bounded: Yes
actor {
default-dispatcher {
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specified
mailbox-capacity = 1000
}
}
}
Per-instance based configuration * Durable mailboxes, see :ref:`durable-mailboxes`.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also do it on a specific dispatcher instance. Mailbox configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config How to create a PriorityMailbox:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-mailbox
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor. And then add it to the configuration:
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
if the message cannot be added to the mailbox within the time specified by the pushTimeout.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config
And then an example on how you would use it:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher
Creating your own Mailbox type
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
An example is worth a thousand quacks:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration.