From 1e7ce2bfc736cfa66c9701628d17ec212686e923 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 24 Feb 2012 14:28:17 +0100 Subject: [PATCH] #1859 - rewriting dispatcher docs --- .../dispatcher/DispatcherDocTestBase.java | 31 +-- akka-docs/java/dispatchers.rst | 233 +++++++----------- .../docs/dispatcher/DispatcherDocSpec.scala | 82 ++++-- akka-docs/scala/dispatchers.rst | 232 ++++++++--------- 4 files changed, 264 insertions(+), 314 deletions(-) diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index 1aaa76ee11..14291fc27e 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -60,19 +60,17 @@ public class DispatcherDocTestBase { @Test public void defineDispatcher() { //#defining-dispatcher - ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), - "myactor1"); - ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), - "myactor2"); + ActorRef myActor = + system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), + "myactor3"); //#defining-dispatcher } @Test public void definePinnedDispatcher() { //#defining-pinned-dispatcher - String name = "myactor"; ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class) - .withDispatcher("myactor-dispatcher"), name); + .withDispatcher("my-pinned-dispatcher")); //#defining-pinned-dispatcher } @@ -80,11 +78,13 @@ public class DispatcherDocTestBase { public void priorityDispatcher() throws Exception { //#prio-dispatcher - ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes + // We create a new Actor that just prints out what it processes + ActorRef myActor = system.actorOf( new Props().withCreator(new UntypedActorFactory() { public UntypedActor create() { return new UntypedActor() { - LoggingAdapter log = Logging.getLogger(getContext().system(), this); + LoggingAdapter log = + Logging.getLogger(getContext().system(), this); { getSelf().tell("lowpriority"); getSelf().tell("lowpriority"); @@ -101,7 +101,7 @@ public class DispatcherDocTestBase { } }; } - }).withDispatcher("prio-dispatcher-java")); + }).withDispatcher("prio-dispatcher")); /* Logs: @@ -123,19 +123,20 @@ public class DispatcherDocTestBase { } //#prio-mailbox - public static class PrioMailbox extends UnboundedPriorityMailbox { - public PrioMailbox(Config config) { // needed for reflective instantiation - super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important + public static class MyPrioMailbox extends UnboundedPriorityMailbox { + public MyPrioMailbox(Config config) { // needed for reflective instantiation + // Create a new PriorityGenerator, lower prio means more important + super(new PriorityGenerator() { @Override public int gen(Object message) { if (message.equals("highpriority")) return 0; // 'highpriority messages should be treated first if possible else if (message.equals("lowpriority")) - return 100; // 'lowpriority messages should be treated last if possible + return 2; // 'lowpriority messages should be treated last if possible else if (message.equals(Actors.poisonPill())) - return 1000; // PoisonPill when no other left + return 3; // PoisonPill when no other left else - return 50; // We default to 50 + return 1; // By default they go between high and low prio } }); } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index b7a16d9283..e602c2c77c 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -7,204 +7,157 @@ Dispatchers (Java) .. contents:: :local: -The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. - -Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. - -The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM. +An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak. +All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used +to execute arbitrary code, for instance :ref:`futures-java`. Default dispatcher ------------------ -For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. -The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` -section of the :ref:`configuration`. +Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. +The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. -If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors -for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. -See below for details on which ones are available and how they can be configured. +Setting the dispatcher for an Actor +----------------------------------- -.. warning:: - Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher - as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used. +So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is: -Setting the dispatcher ----------------------- +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher -You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key -of the dispatcher settings. +.. note:: + The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. + So in this example it's a top-level section, but you could for instance put it as a sub-section, + where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - :include: imports,defining-dispatcher +And then you just need to configure that dispatcher in your configuration: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + +And here's another example that uses the "thread-pool-executor": + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + +For more options, see the default-dispatcher section of the :ref:`configuration`. Types of dispatchers -------------------- There are 4 different types of message dispatchers: -* Thread-based (Pinned) -* Event-based -* Priority event-based -* Work-sharing (Balancing) +* Dispatcher -It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. + - Sharability: Unlimited -Example of a custom event-based dispatcher, which can be used with -``new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher")`` -as in the example above: + - Mailboxes: Any, creates one per Actor -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + - Use cases: Default dispatcher, Bulkheading -Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See -:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override -the values for the ``default-dispatcher`` in your configuration. + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` -.. note:: +* PinnedDispatcher - It should be noted that the ``dispatcher-id`` used in :class:`Props` is in - fact an absolute path into the configuration object, i.e. you can declare a - dispatcher configuration nested within other configuration objects and refer - to it like so: ``"my.config.object.myAwesomeDispatcher"`` + - Sharability: None -There are two different executor services: + - Mailboxes: Any, creates one per Actor -* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for - ``default-dispatcher``. -* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + - Use cases: Bulkheading -Note that the pool size is configured differently for the two executor services. The configuration above -is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + - Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator`` + by default a "thread-pool-executor" -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config +* BalancingDispatcher -Let's now walk through the different dispatchers in more detail. + - Sharability: Actors of the same type only -Thread-based -^^^^^^^^^^^^ + - Mailboxes: Any, creates one for all Actors -The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a -`LinkedBlockingQueue `_ -which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher -has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes -a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with -this dispatcher is that Actors do not block threads for each other. + - Use cases: Work-sharing -The ``PinnedDispatcher`` is configured like this: + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` + +* CallingThreadDispatcher + + - Sharability: Unlimited + + - Mailboxes: Any, creates one per Actor per Thread (on demand) + + - Use cases: Testing + + - Driven by: The calling thread (duh) + +More dispatcher configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Configuring a ``PinnedDispatcher``: .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config -Note that it must be used with ``executor = "thread-pool-executor"``. +And then using it: -Event-based -^^^^^^^^^^^ +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher -The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a -`BlockingQueue `_. This dispatcher is highly configurable -and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool. +Mailboxes +--------- -The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. -the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor -that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific -and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to -design and implement your system in the most efficient way in regards to performance, throughput and latency. +An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``. +Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance. -It comes with many different predefined BlockingQueue configurations: +Builtin implementations +^^^^^^^^^^^^^^^^^^^^^^^ -* Bounded `LinkedBlockingQueue `_ -* Unbounded `LinkedBlockingQueue `_ -* Bounded `ArrayBlockingQueue `_ -* Unbounded `ArrayBlockingQueue `_ -* `SynchronousQueue `_ +Akka comes shipped with a number of default mailbox implementations: -When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's -thread as a way to slow him down and balance producer/consumer. +* UnboundedMailbox -Here is an example of a bounded mailbox: + - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue`` -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config + - Blocking: No -The standard :class:`Dispatcher` allows you to define the ``throughput`` it -should have, as shown above. This defines the number of messages for a specific -Actor the dispatcher should process in one single sweep; in other words, the -dispatcher will batch process up to ``throughput`` messages together when -having elected an actor to run. Setting this to a higher number will increase -throughput but lower fairness, and vice versa. If you don't specify it explicitly -then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`. + - Bounded: No -Browse the `ScalaDoc `_ or look at the code for all the options available. +* BoundedMailbox -Priority event-based -^^^^^^^^^^^^^^^^^^^^ + - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` -Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply -an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a -``akka.dispatch.PriorityGenerator`` (recommended). + - Blocking: Yes -Creating a Dispatcher with a mailbox using PriorityGenerator: + - Bounded: Yes -Config: +* UnboundedPriorityMailbox -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-dispatcher-config-java + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` -Priority mailbox: + - Blocking: Yes -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - :include: imports-prio-mailbox,prio-mailbox + - Bounded: No -Usage: +* BoundedPriorityMailbox -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - :include: imports-prio,prio-dispatcher + - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + - Blocking: Yes -Work-sharing event-based -^^^^^^^^^^^^^^^^^^^^^^^^^ + - Bounded: Yes -The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to -share this dispatcher and during execution time the different actors will steal messages from other actors if they -have less messages to process. -Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably -best described as "work donating" because the actor of which work is being stolen takes the initiative. -This can be a great way to improve throughput at the cost of a little higher latency. +* Durable mailboxes, see :ref:`durable-mailboxes`. -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config +Mailbox configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques `_ -Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala `_ +How to create a PriorityMailbox: -Making the Actor mailbox bounded --------------------------------- +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherTestBase.java#prio-mailbox -Global configuration -^^^^^^^^^^^^^^^^^^^^ +And then add it to the configuration: -You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under -``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers, -if not specified otherwise. +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config -.. code-block:: ruby - - akka { - actor { - default-dispatcher { - # If negative (or zero) then an unbounded mailbox is used (default) - # If positive then a bounded mailbox is used and the capacity is set to the number specified - mailbox-capacity = 1000 - } - } - } - -Per-instance based configuration -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -You can also do it on a specific dispatcher instance. - -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config - - -For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor. -Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). -When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") -if the message cannot be added to the mailbox within the time specified by the pushTimeout. +And then an example on how you would use it: +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 6717ad96cf..32ecfa1705 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -6,15 +6,10 @@ package akka.docs.dispatcher import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec -import akka.actor.Props -import akka.actor.Actor import akka.event.Logging import akka.event.LoggingAdapter import akka.util.duration._ -import akka.actor.PoisonPill -import akka.dispatch.MessageDispatcherConfigurator -import akka.dispatch.MessageDispatcher -import akka.dispatch.DispatcherPrerequisites +import akka.actor.{ Props, Actor, PoisonPill } object DispatcherDocSpec { val config = """ @@ -33,8 +28,9 @@ object DispatcherDocSpec { # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } - # Throughput defines the number of messages that are processed in a batch before the - # thread is returned to the pool. Set to 1 for as fair as possible. + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. throughput = 100 } //#my-dispatcher-config @@ -54,8 +50,9 @@ object DispatcherDocSpec { # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } - # Throughput defines the number of messages that are processed in a batch before the - # thread is returned to the pool. Set to 1 for as fair as possible. + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. throughput = 100 } //#my-thread-pool-dispatcher-config @@ -94,13 +91,14 @@ object DispatcherDocSpec { //#prio-dispatcher-config prio-dispatcher { - mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$PrioMailbox" + mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" } //#prio-dispatcher-config //#prio-dispatcher-config-java prio-dispatcher-java { - mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$PrioMailbox" + mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox" + //Other dispatcher configuration goes here } //#prio-dispatcher-config-java """ @@ -108,17 +106,24 @@ object DispatcherDocSpec { //#prio-mailbox import akka.dispatch.PriorityGenerator import akka.dispatch.UnboundedPriorityMailbox - import akka.dispatch.MailboxType - import akka.actor.ActorContext import com.typesafe.config.Config - // We create a new Priority dispatcher and seed it with the priority generator - class PrioMailbox(config: Config) extends UnboundedPriorityMailbox( - PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important - case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible - case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible - case PoisonPill ⇒ 1000 // PoisonPill when no other left - case otherwise ⇒ 50 // We default to 50 + // We inherit, in this case, from UnboundedPriorityMailbox + // and seed it with the priority generator + class MyPrioMailbox(config: Config) extends UnboundedPriorityMailbox( + // Create a new PriorityGenerator, lower prio means more important + PriorityGenerator { + // 'highpriority messages should be treated first if possible + case 'highpriority ⇒ 0 + + // 'lowpriority messages should be treated last if possible + case 'lowpriority ⇒ 2 + + // PoisonPill when no other left + case PoisonPill ⇒ 3 + + // We default to 1, which is in between high and low + case otherwise ⇒ 1 }) //#prio-mailbox @@ -127,6 +132,29 @@ object DispatcherDocSpec { case x ⇒ } } + + //#mailbox-implementation-example + case class MyUnboundedMailbox() extends akka.dispatch.MailboxType { + import akka.actor.ActorContext + import com.typesafe.config.Config + import java.util.concurrent.ConcurrentLinkedQueue + import akka.dispatch.{ + Envelope, + MessageQueue, + QueueBasedMessageQueue, + UnboundedMessageQueueSemantics + } + + // This constructor signature must exist, it will be called by Akka + def this(config: Config) = this() + + // The create method is called to create the MessageQueue + final override def create(owner: Option[ActorContext]): MessageQueue = + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final val queue = new ConcurrentLinkedQueue[Envelope]() + } + //#mailbox-implementation-example + } } class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { @@ -134,10 +162,11 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { import DispatcherDocSpec.MyActor "defining dispatcher" in { + val context = system //#defining-dispatcher import akka.actor.Props - val myActor1 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor1") - val myActor2 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor2") + val myActor = + context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1") //#defining-dispatcher } @@ -146,15 +175,18 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { } "defining pinned dispatcher" in { + val context = system //#defining-pinned-dispatcher - val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor") + val myActor = + context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2") //#defining-pinned-dispatcher } "defining priority dispatcher" in { //#prio-dispatcher - val a = system.actorOf( // We create a new Actor that just prints out what it processes + // We create a new Actor that just prints out what it processes + val a = system.actorOf( Props(new Actor { val log: LoggingAdapter = Logging(context.system, this) diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 94ae563e64..18169fb2ef 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -7,202 +7,166 @@ Dispatchers (Scala) .. contents:: :local: -The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. - -Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. - -The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM. +An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak. +All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used +to execute arbitrary code, for instance :ref:`futures-scala`. Default dispatcher ------------------ -For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. -The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` -section of the :ref:`configuration`. +Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. +The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. -If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors -for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. -See below for details on which ones are available and how they can be configured. +Setting the dispatcher for an Actor +----------------------------------- -.. warning:: - Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher - as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used. +So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is: -Setting the dispatcher ----------------------- +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher -You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key -of the dispatcher settings. +.. note:: + The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. + So in this example it's a top-level section, but you could for instance put it as a sub-section, + where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: imports,defining-dispatcher +And then you just need to configure that dispatcher in your configuration: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + +And here's another example that uses the "thread-pool-executor": + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + +For more options, see the default-dispatcher section of the :ref:`configuration`. Types of dispatchers -------------------- There are 4 different types of message dispatchers: -* Thread-based (Pinned) -* Event-based -* Priority event-based -* Work-sharing (Balancing) +* Dispatcher -It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. + - Sharability: Unlimited -Example of a custom event-based dispatcher, which can be used with ``Props[MyActor].withDispatcher("my-dispatcher")`` -as in the example above: + - Mailboxes: Any, creates one per Actor -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + - Use cases: Default dispatcher, Bulkheading -Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See -:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override -the values for the ``default-dispatcher`` in your configuration. + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` -.. note:: +* PinnedDispatcher - It should be noted that the ``dispatcher-id`` used in :class:`Props` is in - fact an absolute path into the configuration object, i.e. you can declare a - dispatcher configuration nested within other configuration objects and refer - to it like so: ``"my.config.object.myAwesomeDispatcher"`` + - Sharability: None -There are two different executor services: + - Mailboxes: Any, creates one per Actor -* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for - ``default-dispatcher``. -* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + - Use cases: Bulkheading -Note that the pool size is configured differently for the two executor services. The configuration above -is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + - Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator`` + by default a "thread-pool-executor" -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config +* BalancingDispatcher -Let's now walk through the different dispatchers in more detail. + - Sharability: Actors of the same type only -Thread-based -^^^^^^^^^^^^ + - Mailboxes: Any, creates one for all Actors -The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a -`LinkedBlockingQueue `_ -which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher -has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes -a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with -this dispatcher is that Actors do not block threads for each other. + - Use cases: Work-sharing -The ``PinnedDispatcher`` is configured like this: + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config +* CallingThreadDispatcher -Note that it must be used with ``executor = "thread-pool-executor"``. + - Sharability: Unlimited -Event-based -^^^^^^^^^^^ + - Mailboxes: Any, creates one per Actor per Thread (on demand) -The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a -`BlockingQueue `_. This dispatcher is highly configurable -and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool. + - Use cases: Testing -The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. -the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor -that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific -and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to -design and implement your system in the most efficient way in regards to performance, throughput and latency. + - Driven by: The calling thread (duh) -It comes with many different predefined BlockingQueue configurations: +More dispatcher configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -* Bounded `LinkedBlockingQueue `_ -* Unbounded `LinkedBlockingQueue `_ -* Bounded `ArrayBlockingQueue `_ -* Unbounded `ArrayBlockingQueue `_ -* `SynchronousQueue `_ +Configuring a ``PinnedDispatcher``: -When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's -thread as a way to slow him down and balance producer/consumer. +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config -Here is an example of a bounded mailbox: +And then using it: -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher -The standard :class:`Dispatcher` allows you to define the ``throughput`` it -should have, as shown above. This defines the number of messages for a specific -Actor the dispatcher should process in one single sweep; in other words, the -dispatcher will batch process up to ``throughput`` messages together when -having elected an actor to run. Setting this to a higher number will increase -throughput but lower fairness, and vice versa. If you don't specify it explicitly -then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`. +Mailboxes +--------- -Browse the `ScalaDoc `_ or look at the code for all the options available. +An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``. +Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance. -Priority event-based -^^^^^^^^^^^^^^^^^^^^ +Builtin implementations +^^^^^^^^^^^^^^^^^^^^^^^ -Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply -an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a -``akka.dispatch.PriorityGenerator`` (recommended). +Akka comes shipped with a number of default mailbox implementations: -Creating a Dispatcher with a mailbox using PriorityGenerator: +* UnboundedMailbox -Config: + - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-dispatcher-config + - Blocking: No -Priority mailbox: + - Bounded: No -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-mailbox +* BoundedMailbox -Usage: + - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-dispatcher + - Blocking: Yes -Work-sharing event-based -^^^^^^^^^^^^^^^^^^^^^^^^^ + - Bounded: Yes -The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to -share this dispatcher and during execution time the different actors will steal messages from other actors if they -have less messages to process. -Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably -best described as "work donating" because the actor of which work is being stolen takes the initiative. -This can be a great way to improve throughput at the cost of a little higher latency. +* UnboundedPriorityMailbox -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` -Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques `_ -Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala `_ + - Blocking: Yes -Making the Actor mailbox bounded --------------------------------- + - Bounded: No -Global configuration -^^^^^^^^^^^^^^^^^^^^ +* BoundedPriorityMailbox -You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under -``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers, -if not specified otherwise. + - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` -.. code-block:: ruby + - Blocking: Yes - akka { - actor { - default-dispatcher { - # If negative (or zero) then an unbounded mailbox is used (default) - # If positive then a bounded mailbox is used and the capacity is set to the number specified - mailbox-capacity = 1000 - } - } - } + - Bounded: Yes -Per-instance based configuration -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +* Durable mailboxes, see :ref:`durable-mailboxes`. -You can also do it on a specific dispatcher instance. +Mailbox configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config +How to create a PriorityMailbox: +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-mailbox -For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor. -Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). -When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") -if the message cannot be added to the mailbox within the time specified by the pushTimeout. +And then add it to the configuration: +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config + +And then an example on how you would use it: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher + +Creating your own Mailbox type +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An example is worth a thousand quacks: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example + +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. \ No newline at end of file