DOC: Updated dispatcher chapter (Java). See #1471

* Aligned with scala chapter
* Impl priority dispatcher sample in java
* Removed newPinnedDispatcher methods that takes ActorRef. Updated docs for PinnedDispatcher
This commit is contained in:
Patrik Nordwall 2011-12-13 16:18:51 +01:00
parent 66e7155ef1
commit 5e2dff2356
6 changed files with 244 additions and 219 deletions

View file

@ -97,28 +97,6 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
} }
} }
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout
* <p/>
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher(prerequisites, null, "anon", MailboxType, settings.DispatcherDefaultShutdown)
case some new PinnedDispatcher(prerequisites, some.underlying, some.path.toString, MailboxType, settings.DispatcherDefaultShutdown)
}
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* If capacity is negative, it's Integer.MAX_VALUE
* <p/>
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(prerequisites, null, "anon", mailboxType, settings.DispatcherDefaultShutdown)
case some new PinnedDispatcher(prerequisites, some.underlying, some.path.toString, mailboxType, settings.DispatcherDefaultShutdown)
}
/** /**
* Creates an thread based dispatcher serving a single actor through the same single thread. * Creates an thread based dispatcher serving a single actor through the same single thread.
* <p/> * <p/>

View file

@ -0,0 +1,5 @@
package akka.docs.dispatcher
import org.scalatest.junit.JUnitSuite
class DispatcherDocTest extends DispatcherDocTestBase with JUnitSuite

View file

@ -0,0 +1,131 @@
package akka.docs.dispatcher;
//#imports
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.MessageDispatcher;
//#imports
//#imports-prio
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.actor.Actors;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;
import akka.event.Logging;
import akka.event.LoggingAdapter;
//#imports-prio
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.Option;
import static org.junit.Assert.*;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.docs.actor.MyUntypedActor;
import akka.docs.actor.UntypedActorTestBase.MyActor;
import akka.testkit.AkkaSpec;
public class DispatcherDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("MySystem",
ConfigFactory.parseString(DispatcherDocSpec.config()).withFallback(AkkaSpec.testConf()));
}
@After
public void tearDown() {
system.stop();
}
@Test
public void defineDispatcher() {
//#defining-dispatcher
MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher");
ActorRef myActor1 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher),
"myactor1");
ActorRef myActor2 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher),
"myactor2");
//#defining-dispatcher
}
@Test
public void definePinnedDispatcher() {
//#defining-pinned-dispatcher
String name = "myactor";
MessageDispatcher dispatcher = system.dispatcherFactory().newPinnedDispatcher(name);
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher), name);
//#defining-pinned-dispatcher
}
@Test
public void priorityDispatcher() throws Exception {
//#prio-dispatcher
PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
@Override
public int gen(Object message) {
if (message.equals("highpriority"))
return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority"))
return 100; // 'lowpriority messages should be treated last if possible
else if (message.equals(Actors.poisonPill()))
return 1000; // PoisonPill when no other left
else
return 50; // We default to 50
}
};
// We create a new Priority dispatcher and seed it with the priority generator
MessageDispatcher dispatcher = system.dispatcherFactory()
.newDispatcher("foo", 5, new UnboundedPriorityMailbox(generator)).build();
ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes
new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedActor() {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
{
getSelf().tell("lowpriority");
getSelf().tell("lowpriority");
getSelf().tell("highpriority");
getSelf().tell("pigdog");
getSelf().tell("pigdog2");
getSelf().tell("pigdog3");
getSelf().tell("highpriority");
getSelf().tell(Actors.poisonPill());
}
public void onReceive(Object message) {
log.info(message.toString());
}
};
}
}).withDispatcher(dispatcher));
/*
Logs:
'highpriority
'highpriority
'pigdog
'pigdog2
'pigdog3
'lowpriority
'lowpriority
*/
//#prio-dispatcher
for (int i = 0; i < 10; i++) {
if (myActor.isTerminated())
break;
Thread.sleep(100);
}
}
}

View file

@ -1,212 +1,140 @@
.. _dispatchers-java: .. _dispatchers-java:
Dispatchers (Java) Dispatchers (Java)
================== ===================
.. sidebar:: Contents .. sidebar:: Contents
.. contents:: :local: .. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs.
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread.
The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM. The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM.
Default dispatcher Default dispatcher
------------------ ------------------
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. The default dispatcher used is "GlobalDispatcher" which also is retrievable in ``akka.dispatch.Dispatchers.globalDispatcher``. For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created.
The Dispatcher specified in the :ref:`configuration` as "default-dispatcher" is as ``Dispatchers.defaultGlobalDispatcher``. The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher``
section of the :ref:`configuration`.
The "GlobalDispatcher" is not configurable but will use default parameters given by Akka itself. If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher.
But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured. See below for details on which ones are available and how they can be configured.
Setting the dispatcher Setting the dispatcher
---------------------- ----------------------
Normally you set the dispatcher from within the Actor itself. The dispatcher is defined by the 'dispatcher: MessageDispatcher' member field in 'ActorRef'. You specify the dispatcher to use when creating an actor.
.. code-block:: java .. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports,defining-dispatcher
class MyActor extends UntypedActor {
public MyActor() {
getContext().setDispatcher(..); // set the dispatcher
}
...
}
You can also set the dispatcher for an Actor **before** it has been started:
.. code-block:: java
actorRef.setDispatcher(dispatcher);
Types of dispatchers Types of dispatchers
-------------------- --------------------
There are six different types of message dispatchers: There are 4 different types of message dispatchers:
* Thread-based * Thread-based (Pinned)
* Event-based * Event-based
* Priority event-based * Priority event-based
* Work-stealing event-based * Work-sharing (Balancing)
Factory methods for all of these, including global versions of some of them, are in the 'akka.dispatch.Dispatchers' object. It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
Example of a custom event-based dispatcher, which can be fetched with ``system.dispatcherFactory().lookup("my-dispatcher")``
as in the example above:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined.
.. warning::
Factory methods for creating dispatchers programmatically are available in ``akka.dispatch.Dispatchers``, i.e.
``dispatcherFactory`` of the ``ActorSystem``. These methods will probably be changed or removed before
2.0 final release, because dispatchers need to be defined by configuration to work in a clustered setup.
Let's now walk through the different dispatchers in more detail. Let's now walk through the different dispatchers in more detail.
Thread-based Thread-based
^^^^^^^^^^^^ ^^^^^^^^^^^^
The 'PinnedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'PinnedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other. The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other.
.. code-block:: java The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this:
Dispatcher dispatcher = Dispatchers.newPinnedDispatcher(actorRef); .. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher
It would normally by used from within the actor like this:
.. code-block:: java
class MyActor extends UntypedActor {
public MyActor() {
getContext().setDispatcher(Dispatchers.newPinnedDispatcher(getContext()));
}
...
}
Event-based Event-based
^^^^^^^^^^^ ^^^^^^^^^^^
The 'Dispatcher' binds a set of Actors to a thread pool backed up by a 'BlockingQueue'. This dispatcher is highly configurable and supports a fluent configuration API to configure the 'BlockingQueue' (type of queue, max items etc.) as well as the thread pool. The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
The event-driven dispatchers **must be shared** between multiple Typed Actors and/or Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency. The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g.
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
design and implement your system in the most efficient way in regards to performance, throughput and latency.
It comes with many different predefined BlockingQueue configurations: It comes with many different predefined BlockingQueue configurations:
* Bounded LinkedBlockingQueue * Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Unbounded LinkedBlockingQueue * Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Bounded ArrayBlockingQueue * Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* Unbounded ArrayBlockingQueue * Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* SynchronousQueue * `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
You can also set the rejection policy that should be used, e.g. what should be done if the dispatcher (e.g. the Actor) can't keep up and the mailbox is growing up to the limit defined. You can choose between four different rejection policies: When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's
thread as a way to slow him down and balance producer/consumer.
* java.util.concurrent.ThreadPoolExecutor.CallerRuns - will run the message processing in the caller's thread as a way to slow him down and balance producer/consumer Here is an example of a bounded mailbox:
* java.util.concurrent.ThreadPoolExecutor.AbortPolicy - rejected messages by throwing a 'RejectedExecutionException'
* java.util.concurrent.ThreadPoolExecutor.DiscardPolicy - discards the message (throws it away)
* java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy - discards the oldest message in the mailbox (throws it away)
You cane read more about these policies `here <http://java.sun.com/javase/6/docs/api/index.html?java/util/concurrent/RejectedExecutionHandler.html>`_. .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
Here is an example:
.. code-block:: java
import akka.actor.Actor;
import akka.dispatch.Dispatchers;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
class MyActor extends UntypedActor {
public MyActor() {
getContext().setDispatcher(Dispatchers.newDispatcher(name)
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy())
.build());
}
...
}
The standard :class:`Dispatcher` allows you to define the ``throughput`` it The standard :class:`Dispatcher` allows you to define the ``throughput`` it
should have, as shown above. This defines the number of messages for a specific should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will bunch up to ``throughput`` message invocations together when dispatcher will batch process up to ``throughput`` messages together when
having elected an actor to run. Setting this to a higher number will increase having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it explicitly throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`. then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
Browse the :ref:`scaladoc` or look at the code for all the options available. Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available.
Priority event-based Priority event-based
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply either Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a java.util.Comparator[MessageInvocation] or use a akka.dispatch.PriorityGenerator (recommended): an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended).
Creating a Dispatcher with a priority mailbox using PriorityGenerator: Creating a Dispatcher using PriorityGenerator:
.. code-block:: java .. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports-prio,prio-dispatcher
package some.pkg;
import akka.actor.*; Work-sharing event-based
import akka.dispatch.*;
public class Main {
// A simple Actor that just prints the messages it processes
public static class MyActor extends UntypedActor {
public MyActor() {
self.tell("lowpriority");
getSelf().tell("lowpriority");
getSelf().tell("highpriority");
getSelf().tell("pigdog");
getSelf().tell("pigdog2");
getSelf().tell("pigdog3");
getSelf().tell("highpriority");
}
public void onReceive(Object message) throws Exception {
System.out.println(message);
}
}
public static void main(String[] args) {
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator gen = new PriorityGenerator() {
public int gen(Object message) {
if (message.equals("highpriority")) return 0; // "highpriority" messages should be treated first if possible
else if (message.equals("lowpriority")) return 100; // "lowpriority" messages should be treated last if possible
else return 50; // We default to 50
}
};
// We create an instance of the actor that will print out the messages it processes
// We create a new Priority dispatcher and seed it with the priority generator
ActorRef ref = Actors.actorOf(new Props(MyActor.class).withDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen))));
}
}
Prints:
highpriority
highpriority
pigdog
pigdog2
pigdog3
lowpriority
lowpriority
Work-stealing event-based
^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^
The 'BalancingDispatcher' is a variation of the 'Dispatcher' in which Actors of the same type can be set up to share this dispatcher and during execution time the different actors will steal messages from other actors if they have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency. The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to
share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
Normally the way you use it is to define a static field to hold the dispatcher and then set in in the Actor explicitly. .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config
.. code-block:: java
class MyActor extends UntypedActor {
public static MessageDispatcher dispatcher = Dispatchers.newBalancingDispatcher(name).build();
public MyActor() {
getContext().setDispatcher(dispatcher);
}
...
}
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_ Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_ Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
@ -217,14 +145,18 @@ Making the Actor mailbox bounded
Global configuration Global configuration
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the configuration file under 'default-dispatcher'. This will set it globally. You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
if not specified otherwise.
.. code-block:: ruby .. code-block:: ruby
actor { akka {
default-dispatcher { actor {
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) default-dispatcher {
# If positive then a bounded mailbox is used and the capacity is set to the number specified task-queue-size = 1000 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specified
}
} }
} }
@ -233,33 +165,11 @@ Per-instance based configuration
You can also do it on a specific dispatcher instance. You can also do it on a specific dispatcher instance.
For the 'Dispatcher' and the 'ExecutorBasedWorkStealingDispatcher' you can do it through their constructor .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
.. code-block:: java
class MyActor extends UntypedActor { For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.
public MyActor() { Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
int capacity = 100; When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
Duration pushTimeout = new FiniteDuration(10, TimeUnit.SECONDS); if the message cannot be added to the mailbox within the time specified by the pushTimeout.
MailboxType mailboxCapacity = new BoundedMailbox(false, capacity, pushTimeout);
MessageDispatcher dispatcher =
Dispatchers.newDispatcher(name, throughput, mailboxCapacity).build();
getContext().setDispatcher(dispatcher);
}
...
}
For the 'PinnedDispatcher', it is non-shareable between actors, and associates a dedicated Thread with the actor.
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") if the message cannot be added to the mailbox within the time specified by the pushTimeout.
.. code-block:: java
class MyActor extends UntypedActor {
public MyActor() {
int mailboxCapacity = 100;
Duration pushTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
getContext().setDispatcher(Dispatchers.newPinnedDispatcher(getContext(), mailboxCapacity, pushTimeout));
}
...
}

View file

@ -26,14 +26,6 @@ object DispatcherDocSpec {
} }
//#my-dispatcher-config //#my-dispatcher-config
//#my-pinned-config
my-pinned-dispatcher {
type = Dispatcher
core-pool-size-min = 1
core-pool-size-max = 1
}
//#my-pinned-config
//#my-bounded-config //#my-bounded-config
my-dispatcher-bounded-queue { my-dispatcher-bounded-queue {
type = Dispatcher type = Dispatcher
@ -76,6 +68,14 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher-bounded-queue") val dispatcher = system.dispatcherFactory.lookup("my-dispatcher-bounded-queue")
} }
"defining pinned dispatcher" in {
//#defining-pinned-dispatcher
val name = "myactor"
val dispatcher = system.dispatcherFactory.newPinnedDispatcher(name)
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name)
//#defining-pinned-dispatcher
}
"defining priority dispatcher" in { "defining priority dispatcher" in {
//#prio-dispatcher //#prio-dispatcher
val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important

View file

@ -6,7 +6,7 @@ Dispatchers (Scala)
.. sidebar:: Contents .. sidebar:: Contents
.. contents:: :local: .. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs.
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread.
@ -29,7 +29,7 @@ Setting the dispatcher
You specify the dispatcher to use when creating an actor. You specify the dispatcher to use when creating an actor.
.. includecode:: code/DispatcherDocSpec.scala .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: imports,defining-dispatcher :include: imports,defining-dispatcher
Types of dispatchers Types of dispatchers
@ -40,14 +40,14 @@ There are 4 different types of message dispatchers:
* Thread-based (Pinned) * Thread-based (Pinned)
* Event-based * Event-based
* Priority event-based * Priority event-based
* Work-stealing (Balancing) * Work-sharing (Balancing)
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
Example of a custom event-based dispatcher, which can be fetched with ``system.dispatcherFactory.lookup("my-dispatcher")`` Example of a custom event-based dispatcher, which can be fetched with ``system.dispatcherFactory.lookup("my-dispatcher")``
as in the example above: as in the example above:
.. includecode:: code/DispatcherDocSpec.scala#my-dispatcher-config .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined.
@ -69,11 +69,9 @@ has worse performance and scalability than the event-based dispatcher but works
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other. this dispatcher is that Actors do not block threads for each other.
FIXME PN: Is this the way to configure a PinnedDispatcher, and then why "A ``PinnedDispatcher`` cannot be shared between actors." The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this:
The ``PinnedDispatcher`` is configured as a event-based dispatcher with with core pool size of 1. .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher
.. includecode:: code/DispatcherDocSpec.scala#my-pinned-config
Event-based Event-based
^^^^^^^^^^^ ^^^^^^^^^^^
@ -101,7 +99,7 @@ thread as a way to slow him down and balance producer/consumer.
Here is an example of a bounded mailbox: Here is an example of a bounded mailbox:
.. includecode:: code/DispatcherDocSpec.scala#my-bounded-config .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
The standard :class:`Dispatcher` allows you to define the ``throughput`` it The standard :class:`Dispatcher` allows you to define the ``throughput`` it
should have, as shown above. This defines the number of messages for a specific should have, as shown above. This defines the number of messages for a specific
@ -118,20 +116,23 @@ Priority event-based
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended): ``akka.dispatch.PriorityGenerator`` (recommended).
Creating a Dispatcher using PriorityGenerator: Creating a Dispatcher using PriorityGenerator:
.. includecode:: code/DispatcherDocSpec.scala#prio-dispatcher .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher
Work-stealing event-based Work-sharing event-based
^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to
share this dispatcher and during execution time the different actors will steal messages from other actors if they share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency. have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
.. includecode:: code/DispatcherDocSpec.scala#my-balancing-config .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_ Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_ Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
@ -162,7 +163,7 @@ Per-instance based configuration
You can also do it on a specific dispatcher instance. You can also do it on a specific dispatcher instance.
.. includecode:: code/DispatcherDocSpec.scala#my-bounded-config .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor. For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.