diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java index 0906fd2168..cca0ebb577 100755 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java @@ -23,6 +23,8 @@ import com.typesafe.config.ConfigValueType; final class ConfigSubstitution extends AbstractConfigValue implements Unmergeable { + private static final long serialVersionUID = 1L; + // this is a list of String and SubstitutionExpression where the // SubstitutionExpression has to be resolved to values, then if there's more // than one piece everything is stringified and concatenated diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 881d036f05..7698d3f2f1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -361,8 +361,8 @@ case class UnboundedMailbox() extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorContext]): MessageQueue = - new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope]() + new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final def queue: Queue[Envelope] = this } } @@ -375,8 +375,8 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") final override def create(owner: Option[ActorContext]): MessageQueue = - new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](capacity) + new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics { + final def queue: BlockingQueue[Envelope] = this final val pushTimeOut = BoundedMailbox.this.pushTimeOut } } @@ -386,8 +386,8 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat */ class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { final override def create(owner: Option[ActorContext]): MessageQueue = - new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new PriorityBlockingQueue[Envelope](11, cmp) + new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final def queue: Queue[Envelope] = this } } @@ -400,8 +400,8 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") final override def create(owner: Option[ActorContext]): MessageQueue = - new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) + new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics { + final def queue: BlockingQueue[Envelope] = this final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } } diff --git a/akka-docs/additional/third-party-integrations.rst b/akka-docs/additional/third-party-integrations.rst index 93b3b36d2e..fea3b0ec13 100644 --- a/akka-docs/additional/third-party-integrations.rst +++ b/akka-docs/additional/third-party-integrations.rst @@ -15,3 +15,9 @@ Scalatra has Akka integration. Read more here: ``_ +Gatling +------- + +Gatling is an Open Source Stress Tool. + +Read more here: ``_ \ No newline at end of file diff --git a/akka-docs/dev/team.rst b/akka-docs/dev/team.rst index 8c70e016d2..953848163c 100644 --- a/akka-docs/dev/team.rst +++ b/akka-docs/dev/team.rst @@ -9,24 +9,27 @@ Name Role Email =================== ========================== ==================================== Jonas Bonér Founder, Despot, Committer jonas AT jonasboner DOT com -Viktor Klang Bad cop, Committer viktor DOT klang AT gmail DOT com -Debasish Ghosh Committer dghosh AT acm DOT org -Ross McDonald Alumni rossajmcd AT gmail DOT com -Eckhart Hertzler Alumni -Mikael Högqvist Alumni -Tim Perrett Alumni -Jeanfrancois Arcand Alumni jfarcand AT apache DOT org -Martin Krasser Committer krasserm AT googlemail DOT com -Jan Van Besien Alumni -Michael Kober Alumni -Peter Vlugter Committer -Peter Veentjer Committer -Irmo Manie Committer -Heiko Seeberger Committer -Hiram Chirino Committer -Scott Clasen Committer +Viktor Klang Project Owner viktor DOT klang AT gmail DOT com Roland Kuhn Committer Patrik Nordwall Committer patrik DOT nordwall AT gmail DOT com Derek Williams Committer derek AT nebvin DOT ca Henrik Engström Committer +Peter Vlugter Committer +Martin Krasser Committer krasserm AT googlemail DOT com +Raymond Roestenburg Committer +Piotr Gabryanczyk Committer +Debasish Ghosh Alumni dghosh AT acm DOT org +Ross McDonald Alumni rossajmcd AT gmail DOT com +Eckhart Hertzler Alumni +Mikael Högqvist Alumni +Tim Perrett Alumni +Jeanfrancois Arcand Alumni jfarcand AT apache DOT org +Jan Van Besien Alumni +Michael Kober Alumni +Peter Veentjer Alumni +Irmo Manie Alumni +Heiko Seeberger Alumni +Hiram Chirino Alumni +Scott Clasen Alumni + =================== ========================== ==================================== \ No newline at end of file diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index 1616e349bc..5c3bc7b8ad 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -60,19 +60,17 @@ public class DispatcherDocTestBase { @Test public void defineDispatcher() { //#defining-dispatcher - ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), - "myactor1"); - ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), - "myactor2"); + ActorRef myActor = + system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), + "myactor3"); //#defining-dispatcher } @Test public void definePinnedDispatcher() { //#defining-pinned-dispatcher - String name = "myactor"; ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class) - .withDispatcher("myactor-dispatcher"), name); + .withDispatcher("my-pinned-dispatcher")); //#defining-pinned-dispatcher } @@ -80,11 +78,13 @@ public class DispatcherDocTestBase { public void priorityDispatcher() throws Exception { //#prio-dispatcher - ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes + // We create a new Actor that just prints out what it processes + ActorRef myActor = system.actorOf( new Props().withCreator(new UntypedActorFactory() { public UntypedActor create() { return new UntypedActor() { - LoggingAdapter log = Logging.getLogger(getContext().system(), this); + LoggingAdapter log = + Logging.getLogger(getContext().system(), this); { getSelf().tell("lowpriority"); getSelf().tell("lowpriority"); @@ -101,7 +101,7 @@ public class DispatcherDocTestBase { } }; } - }).withDispatcher("prio-dispatcher-java")); + }).withDispatcher("prio-dispatcher")); /* Logs: @@ -123,19 +123,20 @@ public class DispatcherDocTestBase { } //#prio-mailbox - public static class PrioMailbox extends UnboundedPriorityMailbox { - public PrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation - super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important + public static class MyPrioMailbox extends UnboundedPriorityMailbox { + public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation + // Create a new PriorityGenerator, lower prio means more important + super(new PriorityGenerator() { @Override public int gen(Object message) { if (message.equals("highpriority")) return 0; // 'highpriority messages should be treated first if possible else if (message.equals("lowpriority")) - return 100; // 'lowpriority messages should be treated last if possible + return 2; // 'lowpriority messages should be treated last if possible else if (message.equals(Actors.poisonPill())) - return 1000; // PoisonPill when no other left + return 3; // PoisonPill when no other left else - return 50; // We default to 50 + return 1; // By default they go between high and low prio } }); } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index b7a16d9283..8940cce616 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -7,204 +7,167 @@ Dispatchers (Java) .. contents:: :local: -The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. - -Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. - -The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM. +An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak. +All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used +to execute arbitrary code, for instance :ref:`futures-java`. Default dispatcher ------------------ -For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. -The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` -section of the :ref:`configuration`. +Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. +The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. -If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors -for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. -See below for details on which ones are available and how they can be configured. +Setting the dispatcher for an Actor +----------------------------------- -.. warning:: - Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher - as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used. +So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is: -Setting the dispatcher ----------------------- +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher -You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key -of the dispatcher settings. +.. note:: + The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. + So in this example it's a top-level section, but you could for instance put it as a sub-section, + where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - :include: imports,defining-dispatcher +And then you just need to configure that dispatcher in your configuration: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + +And here's another example that uses the "thread-pool-executor": + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + +For more options, see the default-dispatcher section of the :ref:`configuration`. Types of dispatchers -------------------- There are 4 different types of message dispatchers: -* Thread-based (Pinned) -* Event-based -* Priority event-based -* Work-sharing (Balancing) +* Dispatcher -It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. + - Sharability: Unlimited -Example of a custom event-based dispatcher, which can be used with -``new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher")`` -as in the example above: + - Mailboxes: Any, creates one per Actor -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + - Use cases: Default dispatcher, Bulkheading -Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See -:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override -the values for the ``default-dispatcher`` in your configuration. + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` -.. note:: +* PinnedDispatcher - It should be noted that the ``dispatcher-id`` used in :class:`Props` is in - fact an absolute path into the configuration object, i.e. you can declare a - dispatcher configuration nested within other configuration objects and refer - to it like so: ``"my.config.object.myAwesomeDispatcher"`` + - Sharability: None -There are two different executor services: + - Mailboxes: Any, creates one per Actor -* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for - ``default-dispatcher``. -* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + - Use cases: Bulkheading -Note that the pool size is configured differently for the two executor services. The configuration above -is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + - Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator`` + by default a "thread-pool-executor" -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config +* BalancingDispatcher -Let's now walk through the different dispatchers in more detail. + - Sharability: Actors of the same type only -Thread-based -^^^^^^^^^^^^ + - Mailboxes: Any, creates one for all Actors -The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a -`LinkedBlockingQueue `_ -which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher -has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes -a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with -this dispatcher is that Actors do not block threads for each other. + - Use cases: Work-sharing -The ``PinnedDispatcher`` is configured like this: + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` + +* CallingThreadDispatcher + + - Sharability: Unlimited + + - Mailboxes: Any, creates one per Actor per Thread (on demand) + + - Use cases: Testing + + - Driven by: The calling thread (duh) + +More dispatcher configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Configuring a ``PinnedDispatcher``: .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config -Note that it must be used with ``executor = "thread-pool-executor"``. +And then using it: -Event-based -^^^^^^^^^^^ +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher -The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a -`BlockingQueue `_. This dispatcher is highly configurable -and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool. +Mailboxes +--------- -The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. -the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor -that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific -and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to -design and implement your system in the most efficient way in regards to performance, throughput and latency. +An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``. +Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance. -It comes with many different predefined BlockingQueue configurations: +Builtin implementations +^^^^^^^^^^^^^^^^^^^^^^^ -* Bounded `LinkedBlockingQueue `_ -* Unbounded `LinkedBlockingQueue `_ -* Bounded `ArrayBlockingQueue `_ -* Unbounded `ArrayBlockingQueue `_ -* `SynchronousQueue `_ +Akka comes shipped with a number of default mailbox implementations: -When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's -thread as a way to slow him down and balance producer/consumer. +* UnboundedMailbox -Here is an example of a bounded mailbox: + - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue`` -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config + - Blocking: No -The standard :class:`Dispatcher` allows you to define the ``throughput`` it -should have, as shown above. This defines the number of messages for a specific -Actor the dispatcher should process in one single sweep; in other words, the -dispatcher will batch process up to ``throughput`` messages together when -having elected an actor to run. Setting this to a higher number will increase -throughput but lower fairness, and vice versa. If you don't specify it explicitly -then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`. + - Bounded: No -Browse the `ScalaDoc `_ or look at the code for all the options available. +* BoundedMailbox -Priority event-based -^^^^^^^^^^^^^^^^^^^^ + - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` -Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply -an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a -``akka.dispatch.PriorityGenerator`` (recommended). + - Blocking: Yes -Creating a Dispatcher with a mailbox using PriorityGenerator: + - Bounded: Yes -Config: +* UnboundedPriorityMailbox -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-dispatcher-config-java + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` -Priority mailbox: + - Blocking: Yes -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - :include: imports-prio-mailbox,prio-mailbox + - Bounded: No -Usage: +* BoundedPriorityMailbox -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java - :include: imports-prio,prio-dispatcher + - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + - Blocking: Yes -Work-sharing event-based -^^^^^^^^^^^^^^^^^^^^^^^^^ + - Bounded: Yes -The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to -share this dispatcher and during execution time the different actors will steal messages from other actors if they -have less messages to process. -Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably -best described as "work donating" because the actor of which work is being stolen takes the initiative. -This can be a great way to improve throughput at the cost of a little higher latency. +* Durable mailboxes, see :ref:`durable-mailboxes`. -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config +Mailbox configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques `_ -Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala `_ +How to create a PriorityMailbox: -Making the Actor mailbox bounded --------------------------------- +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-mailbox -Global configuration -^^^^^^^^^^^^^^^^^^^^ +And then add it to the configuration: -You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under -``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers, -if not specified otherwise. +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config -.. code-block:: ruby +And then an example on how you would use it: - akka { - actor { - default-dispatcher { - # If negative (or zero) then an unbounded mailbox is used (default) - # If positive then a bounded mailbox is used and the capacity is set to the number specified - mailbox-capacity = 1000 - } - } - } +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher -Per-instance based configuration -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. note:: -You can also do it on a specific dispatcher instance. - -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config - - -For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor. -Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). -When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") -if the message cannot be added to the mailbox within the time specified by the pushTimeout. + Make sure to include a constructor which takes + ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` + arguments, as this constructor is invoked reflectively to construct your + mailbox type. The config passed in as second argument is that section from + the configuration which describes the dispatcher using this mailbox type; the + mailbox type will be instantiated once for each dispatcher using it. diff --git a/akka-docs/project/other-doc.rst b/akka-docs/project/other-doc.rst index 769f4b457e..cc6fc1a3e8 100644 --- a/akka-docs/project/other-doc.rst +++ b/akka-docs/project/other-doc.rst @@ -19,6 +19,12 @@ be found here: Release Versions ================ +1.3.1 +--- + +- Akka 1.3.1 - http://akka.io/docs/akka/1.3.1/ (or in `PDF format `__) +- Akka Modules 1.3.1 - http://akka.io/docs/akka-modules/1.3.1/ (or in `PDF format `__) + 1.2 --- diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index bb3831ea4a..1452d72088 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -6,16 +6,10 @@ package akka.docs.dispatcher import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec -import akka.actor.Props -import akka.actor.Actor import akka.event.Logging import akka.event.LoggingAdapter import akka.util.duration._ -import akka.actor.PoisonPill -import akka.dispatch.MessageDispatcherConfigurator -import akka.dispatch.MessageDispatcher -import akka.dispatch.DispatcherPrerequisites -import akka.actor.ActorSystem +import akka.actor.{ Props, Actor, PoisonPill, ActorSystem } object DispatcherDocSpec { val config = """ @@ -34,8 +28,9 @@ object DispatcherDocSpec { # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } - # Throughput defines the number of messages that are processed in a batch before the - # thread is returned to the pool. Set to 1 for as fair as possible. + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. throughput = 100 } //#my-dispatcher-config @@ -55,8 +50,9 @@ object DispatcherDocSpec { # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } - # Throughput defines the number of messages that are processed in a batch before the - # thread is returned to the pool. Set to 1 for as fair as possible. + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. throughput = 100 } //#my-thread-pool-dispatcher-config @@ -95,13 +91,14 @@ object DispatcherDocSpec { //#prio-dispatcher-config prio-dispatcher { - mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$PrioMailbox" + mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" } //#prio-dispatcher-config //#prio-dispatcher-config-java prio-dispatcher-java { - mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$PrioMailbox" + mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox" + //Other dispatcher configuration goes here } //#prio-dispatcher-config-java """ @@ -109,17 +106,24 @@ object DispatcherDocSpec { //#prio-mailbox import akka.dispatch.PriorityGenerator import akka.dispatch.UnboundedPriorityMailbox - import akka.dispatch.MailboxType - import akka.actor.ActorContext import com.typesafe.config.Config - // We create a new Priority dispatcher and seed it with the priority generator - class PrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox( - PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important - case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible - case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible - case PoisonPill ⇒ 1000 // PoisonPill when no other left - case otherwise ⇒ 50 // We default to 50 + // We inherit, in this case, from UnboundedPriorityMailbox + // and seed it with the priority generator + class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox( + // Create a new PriorityGenerator, lower prio means more important + PriorityGenerator { + // 'highpriority messages should be treated first if possible + case 'highpriority ⇒ 0 + + // 'lowpriority messages should be treated last if possible + case 'lowpriority ⇒ 2 + + // PoisonPill when no other left + case PoisonPill ⇒ 3 + + // We default to 1, which is in between high and low + case otherwise ⇒ 1 }) //#prio-mailbox @@ -128,6 +132,29 @@ object DispatcherDocSpec { case x ⇒ } } + + //#mailbox-implementation-example + case class MyUnboundedMailbox() extends akka.dispatch.MailboxType { + import akka.actor.ActorContext + import com.typesafe.config.Config + import java.util.concurrent.ConcurrentLinkedQueue + import akka.dispatch.{ + Envelope, + MessageQueue, + QueueBasedMessageQueue, + UnboundedMessageQueueSemantics + } + + // This constructor signature must exist, it will be called by Akka + def this(settings: ActorSystem.Settings, config: Config) = this() + + // The create method is called to create the MessageQueue + final override def create(owner: Option[ActorContext]): MessageQueue = + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final val queue = new ConcurrentLinkedQueue[Envelope]() + } + //#mailbox-implementation-example + } } class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { @@ -135,10 +162,11 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { import DispatcherDocSpec.MyActor "defining dispatcher" in { + val context = system //#defining-dispatcher import akka.actor.Props - val myActor1 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor1") - val myActor2 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor2") + val myActor = + context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1") //#defining-dispatcher } @@ -147,15 +175,18 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { } "defining pinned dispatcher" in { + val context = system //#defining-pinned-dispatcher - val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor") + val myActor = + context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2") //#defining-pinned-dispatcher } "defining priority dispatcher" in { //#prio-dispatcher - val a = system.actorOf( // We create a new Actor that just prints out what it processes + // We create a new Actor that just prints out what it processes + val a = system.actorOf( Props(new Actor { val log: LoggingAdapter = Logging(context.system, this) diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 94ae563e64..1dd050684e 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -7,202 +7,176 @@ Dispatchers (Scala) .. contents:: :local: -The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs. - -Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread. - -The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM. +An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak. +All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used +to execute arbitrary code, for instance :ref:`futures-scala`. Default dispatcher ------------------ -For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. -The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` -section of the :ref:`configuration`. +Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. +The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. -If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors -for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. -See below for details on which ones are available and how they can be configured. +Setting the dispatcher for an Actor +----------------------------------- -.. warning:: - Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher - as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used. +So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is: -Setting the dispatcher ----------------------- +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher -You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key -of the dispatcher settings. +.. note:: + The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. + So in this example it's a top-level section, but you could for instance put it as a sub-section, + where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: imports,defining-dispatcher +And then you just need to configure that dispatcher in your configuration: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + +And here's another example that uses the "thread-pool-executor": + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + +For more options, see the default-dispatcher section of the :ref:`configuration`. Types of dispatchers -------------------- There are 4 different types of message dispatchers: -* Thread-based (Pinned) -* Event-based -* Priority event-based -* Work-sharing (Balancing) +* Dispatcher -It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. + - Sharability: Unlimited -Example of a custom event-based dispatcher, which can be used with ``Props[MyActor].withDispatcher("my-dispatcher")`` -as in the example above: + - Mailboxes: Any, creates one per Actor -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config + - Use cases: Default dispatcher, Bulkheading -Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See -:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override -the values for the ``default-dispatcher`` in your configuration. + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` + +* PinnedDispatcher + + - Sharability: None + + - Mailboxes: Any, creates one per Actor + + - Use cases: Bulkheading + + - Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator`` + by default a "thread-pool-executor" + +* BalancingDispatcher + + - Sharability: Actors of the same type only + + - Mailboxes: Any, creates one for all Actors + + - Use cases: Work-sharing + + - Driven by: ``java.util.concurrent.ExecutorService`` + specify using "executor" using "fork-join-executor", + "thread-pool-executor" or the FQCN of + an ``akka.dispatcher.ExecutorServiceConfigurator`` + +* CallingThreadDispatcher + + - Sharability: Unlimited + + - Mailboxes: Any, creates one per Actor per Thread (on demand) + + - Use cases: Testing + + - Driven by: The calling thread (duh) + +More dispatcher configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Configuring a ``PinnedDispatcher``: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config + +And then using it: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher + +Mailboxes +--------- + +An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``. +Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance. + +Builtin implementations +^^^^^^^^^^^^^^^^^^^^^^^ + +Akka comes shipped with a number of default mailbox implementations: + +* UnboundedMailbox + + - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue`` + + - Blocking: No + + - Bounded: No + +* BoundedMailbox + + - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` + + - Blocking: Yes + + - Bounded: Yes + +* UnboundedPriorityMailbox + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` + + - Blocking: Yes + + - Bounded: No + +* BoundedPriorityMailbox + + - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + + - Blocking: Yes + + - Bounded: Yes + +* Durable mailboxes, see :ref:`durable-mailboxes`. + +Mailbox configuration examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +How to create a PriorityMailbox: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-mailbox + +And then add it to the configuration: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config + +And then an example on how you would use it: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher + +Creating your own Mailbox type +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An example is worth a thousand quacks: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example + +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. .. note:: - It should be noted that the ``dispatcher-id`` used in :class:`Props` is in - fact an absolute path into the configuration object, i.e. you can declare a - dispatcher configuration nested within other configuration objects and refer - to it like so: ``"my.config.object.myAwesomeDispatcher"`` - -There are two different executor services: - -* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for - ``default-dispatcher``. -* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. - -Note that the pool size is configured differently for the two executor services. The configuration above -is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config - -Let's now walk through the different dispatchers in more detail. - -Thread-based -^^^^^^^^^^^^ - -The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a -`LinkedBlockingQueue `_ -which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher -has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes -a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with -this dispatcher is that Actors do not block threads for each other. - -The ``PinnedDispatcher`` is configured like this: - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config - -Note that it must be used with ``executor = "thread-pool-executor"``. - -Event-based -^^^^^^^^^^^ - -The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a -`BlockingQueue `_. This dispatcher is highly configurable -and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool. - -The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. -the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor -that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific -and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to -design and implement your system in the most efficient way in regards to performance, throughput and latency. - -It comes with many different predefined BlockingQueue configurations: - -* Bounded `LinkedBlockingQueue `_ -* Unbounded `LinkedBlockingQueue `_ -* Bounded `ArrayBlockingQueue `_ -* Unbounded `ArrayBlockingQueue `_ -* `SynchronousQueue `_ - -When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's -thread as a way to slow him down and balance producer/consumer. - -Here is an example of a bounded mailbox: - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config - -The standard :class:`Dispatcher` allows you to define the ``throughput`` it -should have, as shown above. This defines the number of messages for a specific -Actor the dispatcher should process in one single sweep; in other words, the -dispatcher will batch process up to ``throughput`` messages together when -having elected an actor to run. Setting this to a higher number will increase -throughput but lower fairness, and vice versa. If you don't specify it explicitly -then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`. - -Browse the `ScalaDoc `_ or look at the code for all the options available. - -Priority event-based -^^^^^^^^^^^^^^^^^^^^ - -Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply -an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a -``akka.dispatch.PriorityGenerator`` (recommended). - -Creating a Dispatcher with a mailbox using PriorityGenerator: - -Config: - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-dispatcher-config - -Priority mailbox: - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-mailbox - -Usage: - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala - :include: prio-dispatcher - -Work-sharing event-based -^^^^^^^^^^^^^^^^^^^^^^^^^ - -The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to -share this dispatcher and during execution time the different actors will steal messages from other actors if they -have less messages to process. -Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably -best described as "work donating" because the actor of which work is being stolen takes the initiative. -This can be a great way to improve throughput at the cost of a little higher latency. - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config - -Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques `_ -Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala `_ - -Making the Actor mailbox bounded --------------------------------- - -Global configuration -^^^^^^^^^^^^^^^^^^^^ - -You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under -``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers, -if not specified otherwise. - -.. code-block:: ruby - - akka { - actor { - default-dispatcher { - # If negative (or zero) then an unbounded mailbox is used (default) - # If positive then a bounded mailbox is used and the capacity is set to the number specified - mailbox-capacity = 1000 - } - } - } - -Per-instance based configuration -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -You can also do it on a specific dispatcher instance. - -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config - - -For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor. -Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). -When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") -if the message cannot be added to the mailbox within the time specified by the pushTimeout. + Make sure to include a constructor which takes + ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` + arguments, as this constructor is invoked reflectively to construct your + mailbox type. The config passed in as second argument is that section from + the configuration which describes the dispatcher using this mailbox type; the + mailbox type will be instantiated once for each dispatcher using it. diff --git a/akka-docs/scala/extending-akka.rst b/akka-docs/scala/extending-akka.rst index 7627326767..f0beea996e 100644 --- a/akka-docs/scala/extending-akka.rst +++ b/akka-docs/scala/extending-akka.rst @@ -9,6 +9,17 @@ .. contents:: :local: +If you want to add features to Akka, there is a very elegant, but powerful mechanism for doing so. +It's called Akka Extensions and is comprised of 2 basic components: an ``Extension`` and an ``ExtensionId``. + +Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka. +You can choose to have your Extension loaded on-demand or at ``ActorSystem`` creation time through the Akka configuration. +Details on how to make that happens are below, in the "Loading from Configuration" section. + +.. warning:: + + Since an extension is a way to hook into Akka itself, the implementor of the extension needs to + ensure the thread safety of his/her extension. Building an Extension =====================