diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 3cc54ea6bb..b2265367c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -334,7 +334,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte mustStop(t) } - "be able to use work-stealing dispatcher" in { + "be able to use balancing dispatcher" in { val props = Props( timeout = Timeout(6600), dispatcher = system.dispatcherFactory.newBalancingDispatcher("pooled-dispatcher") diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index e6ae3c0457..471cd957c0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -10,8 +10,18 @@ import akka.testkit.AkkaSpec import scala.collection.JavaConverters._ import com.typesafe.config.ConfigFactory +object DispatchersSpec { + val config = """ + myapp { + mydispatcher { + throughput = 17 + } + } + """ +} + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DispatchersSpec extends AkkaSpec { +class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { val df = system.dispatcherFactory import df._ @@ -34,14 +44,6 @@ class DispatchersSpec extends AkkaSpec { val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher") - val dispatcherConf = ConfigFactory.parseString(""" - myapp { - mydispatcher { - throughput = 17 - } - } - """) - lazy val allDispatchers: Map[String, Option[MessageDispatcher]] = { validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t).asJava).withFallback(defaultDispatcherConfig)))).toMap } @@ -59,15 +61,20 @@ class DispatchersSpec extends AkkaSpec { } "use defined properties when newFromConfig" in { - val dispatcher = newFromConfig("myapp.mydispatcher", defaultGlobalDispatcher, dispatcherConf) + val dispatcher = newFromConfig("myapp.mydispatcher") dispatcher.throughput must be(17) } "use specific name when newFromConfig" in { - val dispatcher = newFromConfig("myapp.mydispatcher", defaultGlobalDispatcher, dispatcherConf) + val dispatcher = newFromConfig("myapp.mydispatcher") dispatcher.name must be("mydispatcher") } + "use default dispatcher when not configured" in { + val dispatcher = newFromConfig("myapp.other-dispatcher") + dispatcher must be === defaultGlobalDispatcher + } + "throw IllegalArgumentException if type does not exist" in { intercept[IllegalArgumentException] { from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist").asJava).withFallback(defaultDispatcherConfig)) @@ -81,6 +88,13 @@ class DispatchersSpec extends AkkaSpec { assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1).get))) } + "provide lookup of dispatchers by key" in { + val d1 = lookup("myapp.mydispatcher") + val d2 = lookup("myapp.mydispatcher") + d1 must be === d2 + d1.name must be("mydispatcher") + } + } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 8d96834092..0c759f8079 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -103,7 +103,7 @@ akka { type = "Dispatcher" # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor - name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted + name = "DefaultDispatcher" # Name used in log messages and thread names. keep-alive-time = 60s # Keep alive time for threads core-pool-size-min = 8 # minimum number of threads to cap factor-based core number to core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) @@ -115,7 +115,8 @@ akka { task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out - throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness + throughput = 5 # Throughput defines the number of messages that are processed in a batch before the + # thread is returned to the pool. Set to 1 for as fair as possible. throughput-deadline-time = 0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) # If positive then a bounded mailbox is used and the capacity is set using the property diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index ddae9654c4..cdcb056372 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -4,16 +4,19 @@ package akka.dispatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.ConcurrentHashMap + import akka.actor.LocalActorRef import akka.actor.newUuid import akka.util.{ Duration, ReflectiveAccess } -import java.util.concurrent.TimeUnit import akka.actor.ActorSystem import akka.event.EventStream import akka.actor.Scheduler import akka.actor.ActorSystem.Settings import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import akka.config.ConfigurationException trait DispatcherPrerequisites { def eventStream: EventStream @@ -27,6 +30,10 @@ case class DefaultDispatcherPrerequisites( val scheduler: Scheduler) extends DispatcherPrerequisites /** + * It is recommended to define the dispatcher in configuration to allow for tuning + * for different environments. Use the `lookup` or `newFromConfig` method to create + * a dispatcher as specified in configuration. + * * Scala API. Dispatcher factory. *
* Example usage: @@ -62,9 +69,33 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher") - // TODO PN Shouldn't we fail hard if default-dispatcher is wrong? - lazy val defaultGlobalDispatcher = - from(defaultDispatcherConfig) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build + lazy val defaultGlobalDispatcher: MessageDispatcher = + from(defaultDispatcherConfig) getOrElse { + throw new ConfigurationException("Wrong configuration [akka.actor.default-dispatcher]") + } + + // FIXME: Dispatchers registered here are are not removed, see ticket #1494 + private val dispatchers = new ConcurrentHashMap[String, MessageDispatcher] + + /** + * Returns a dispatcher as specified in configuration, or if not defined it uses + * the default dispatcher. The same dispatcher instance is returned for subsequent + * lookups. + */ + def lookup(key: String): MessageDispatcher = { + dispatchers.get(key) match { + case null ⇒ + // It doesn't matter if we create a dispatcher that isn't used due to concurrent lookup. + // That shouldn't happen often and in case it does the actual ExecutorService isn't + // created until used, i.e. cheap. + val newDispatcher = newFromConfig(key) + dispatchers.putIfAbsent(key, newDispatcher) match { + case null ⇒ newDispatcher + case existing ⇒ existing + } + case existing ⇒ existing + } + } /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -133,7 +164,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc new Dispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** - * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ @@ -142,7 +173,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** - * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ @@ -152,7 +183,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** - * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ @@ -162,7 +193,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** - * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ @@ -170,6 +201,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) + /** * Creates a new dispatcher as specified in configuration * or if not defined it uses the supplied dispatcher. @@ -183,7 +215,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc case true ⇒ val conf = cfg.getConfig(key) val confWithName = conf.withFallback(ConfigFactory.parseMap(Map("name" -> simpleName).asJava)) - from(confWithName).getOrElse(default) + from(confWithName).getOrElse(throw new ConfigurationException("Wrong configuration [%s]".format(key))) } } diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index 96467312bd..795445387c 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -28,10 +28,10 @@ configuration for each actor system, and grab the specific configuration when in :: myapp1 { - akka.logLevel = WARNING + akka.loglevel = WARNING } myapp2 { - akka.logLevel = ERROR + akka.loglevel = ERROR } .. code-block:: scala @@ -120,7 +120,7 @@ A custom ``application.conf`` might look like this:: actor { default-dispatcher { - throughput = 10 # Throughput for default Dispatcher, set to 1 for complete fairness + throughput = 10 # Throughput for default Dispatcher, set to 1 for as fair as possible } } diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java index 756618eef5..e043245be1 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java @@ -73,7 +73,7 @@ public class UntypedActorTestBase { public void propsActorOf() { ActorSystem system = ActorSystem.create("MySystem"); //#creating-props - MessageDispatcher dispatcher = system.dispatcherFactory().newFromConfig("my-dispatcher"); + MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher"); ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher), "myactor"); //#creating-props diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index 744f439c91..84db3e4415 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -187,7 +187,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "creating actor with Props" in { //#creating-props import akka.actor.Props - val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher") + val dispatcher = system.dispatcherFactory.lookup("my-dispatcher") val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") //#creating-props diff --git a/akka-docs/scala/code/DispatcherDocSpec.scala b/akka-docs/scala/code/DispatcherDocSpec.scala new file mode 100644 index 0000000000..f132a5e90f --- /dev/null +++ b/akka-docs/scala/code/DispatcherDocSpec.scala @@ -0,0 +1,127 @@ +package akka.docs.dispatcher + +import org.scalatest.{ BeforeAndAfterAll, WordSpec } +import org.scalatest.matchers.MustMatchers +import akka.testkit.AkkaSpec +import akka.dispatch.PriorityGenerator +import akka.actor.Props +import akka.actor.Actor +import akka.dispatch.UnboundedPriorityMailbox +import akka.event.Logging +import akka.event.LoggingAdapter +import akka.util.duration._ +import akka.actor.PoisonPill + +object DispatcherDocSpec { + val config = """ + //#my-dispatcher-config + my-dispatcher { + type = Dispatcher # Dispatcher is the name of the event-based dispatcher + core-pool-size-min = 2 # minimum number of threads to cap factor-based core number to + core-pool-size-factor = 2.0 # No of core threads ... ceil(available processors * factor) + core-pool-size-max = 10 # maximum number of threads to cap factor-based number to + throughput = 100 # Throughput defines the number of messages that are processed in a batch before the + # thread is returned to the pool. Set to 1 for as fair as possible. + } + //#my-dispatcher-config + + //#my-pinned-config + my-pinned-dispatcher { + type = Dispatcher + core-pool-size-min = 1 + core-pool-size-max = 1 + } + //#my-pinned-config + + //#my-bounded-config + my-dispatcher-bounded-queue { + type = Dispatcher + core-pool-size-factor = 8.0 + max-pool-size-factor = 16.0 + task-queue-size = 100 # Specifies the bounded capacity of the task queue + task-queue-type = "array" # Specifies which type of task queue will be used, can be "array" or "linked" (default) + throughput = 3 + } + //#my-bounded-config + + //#my-balancing-config + my-balancing-dispatcher { + type = BalancingDispatcher + } + //#my-balancing-config + """ + + class MyActor extends Actor { + def receive = { + case x ⇒ + } + } +} + +class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { + + import DispatcherDocSpec.MyActor + + "defining dispatcher" in { + //#defining-dispatcher + import akka.actor.Props + val dispatcher = system.dispatcherFactory.lookup("my-dispatcher") + val myActor1 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor1") + val myActor2 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor2") + //#defining-dispatcher + } + + "defining dispatcher with bounded queue" in { + val dispatcher = system.dispatcherFactory.lookup("my-dispatcher-bounded-queue") + } + + "defining priority dispatcher" in { + //#prio-dispatcher + val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important + case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible + case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible + case PoisonPill ⇒ 1000 // PoisonPill when no other left + case otherwise ⇒ 50 // We default to 50 + } + + // We create a new Priority dispatcher and seed it with the priority generator + val dispatcher = system.dispatcherFactory.newDispatcher("foo", 5, UnboundedPriorityMailbox(gen)).build + + val a = system.actorOf( // We create a new Actor that just prints out what it processes + Props(new Actor { + val log: LoggingAdapter = Logging(context.system, this) + + self ! 'lowpriority + self ! 'lowpriority + self ! 'highpriority + self ! 'pigdog + self ! 'pigdog2 + self ! 'pigdog3 + self ! 'highpriority + self ! PoisonPill + + def receive = { + case x ⇒ log.info(x.toString) + } + }).withDispatcher(dispatcher)) + + /* + Logs: + 'highpriority + 'highpriority + 'pigdog + 'pigdog2 + 'pigdog3 + 'lowpriority + 'lowpriority + */ + //#prio-dispatcher + + awaitCond(a.isTerminated, 5 seconds) + } + + "defining balancing dispatcher" in { + val dispatcher = system.dispatcherFactory.lookup("my-balancing-dispatcher") + } + +} diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index c93567e490..c2c03c16ae 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -16,107 +16,97 @@ The event-based Actors currently consume ~600 bytes per Actor which means that y Default dispatcher ------------------ -For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. The dispatcher used is this one: +For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. +The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher`` +section of the :ref:`configuration`. -.. code-block:: scala - - Dispatchers.globalDispatcher - -But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured. +If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors +for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. +See below for details on which ones are available and how they can be configured. Setting the dispatcher ---------------------- -Normally you set the dispatcher from within the Actor itself. The dispatcher is defined by the 'dispatcher: MessageDispatcher' member field in 'ActorRef'. +You specify the dispatcher to use when creating an actor. -.. code-block:: scala - - class MyActor extends Actor { - self.dispatcher = ... // set the dispatcher - ... - } - -You can also set the dispatcher for an Actor **before** it has been started: - -.. code-block:: scala - - actorRef.dispatcher = dispatcher +.. includecode:: code/DispatcherDocSpec.scala + :include: imports,defining-dispatcher Types of dispatchers -------------------- -There are six different types of message dispatchers: +There are 4 different types of message dispatchers: -* Thread-based +* Thread-based (Pinned) * Event-based * Priority event-based -* Work-stealing +* Work-stealing (Balancing) -Factory methods for all of these, including global versions of some of them, are in the 'akka.dispatch.Dispatchers' object. +It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments. + +Example of a custom event-based dispatcher, which can be fetched with ``system.dispatcherFactory.lookup("my-dispatcher")`` +as in the example above: + +.. includecode:: code/DispatcherDocSpec.scala#my-dispatcher-config + +Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. + +.. warning:: + + Factory methods for creating dispatchers programmatically are available in ``akka.dispatch.Dispatchers``, i.e. + ``dispatcherFactory`` of the ``ActorSystem``. These methods will probably be changed or removed before + 2.0 final release, because dispatchers need to be defined by configuration to work in a clustered setup. Let's now walk through the different dispatchers in more detail. Thread-based ^^^^^^^^^^^^ -The 'PinnedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'PinnedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other. +The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a +`LinkedBlockingQueue