Merge branch 'master' of github.com:jboner/akka
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
commit
b5d1785e05
13 changed files with 291 additions and 178 deletions
|
|
@ -334,7 +334,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
|
|||
mustStop(t)
|
||||
}
|
||||
|
||||
"be able to use work-stealing dispatcher" in {
|
||||
"be able to use balancing dispatcher" in {
|
||||
val props = Props(
|
||||
timeout = Timeout(6600),
|
||||
dispatcher = system.dispatcherFactory.newBalancingDispatcher("pooled-dispatcher")
|
||||
|
|
|
|||
|
|
@ -10,8 +10,18 @@ import akka.testkit.AkkaSpec
|
|||
import scala.collection.JavaConverters._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object DispatchersSpec {
|
||||
val config = """
|
||||
myapp {
|
||||
mydispatcher {
|
||||
throughput = 17
|
||||
}
|
||||
}
|
||||
"""
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class DispatchersSpec extends AkkaSpec {
|
||||
class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) {
|
||||
|
||||
val df = system.dispatcherFactory
|
||||
import df._
|
||||
|
|
@ -34,14 +44,6 @@ class DispatchersSpec extends AkkaSpec {
|
|||
|
||||
val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")
|
||||
|
||||
val dispatcherConf = ConfigFactory.parseString("""
|
||||
myapp {
|
||||
mydispatcher {
|
||||
throughput = 17
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
lazy val allDispatchers: Map[String, Option[MessageDispatcher]] = {
|
||||
validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t).asJava).withFallback(defaultDispatcherConfig)))).toMap
|
||||
}
|
||||
|
|
@ -59,15 +61,20 @@ class DispatchersSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"use defined properties when newFromConfig" in {
|
||||
val dispatcher = newFromConfig("myapp.mydispatcher", defaultGlobalDispatcher, dispatcherConf)
|
||||
val dispatcher = newFromConfig("myapp.mydispatcher")
|
||||
dispatcher.throughput must be(17)
|
||||
}
|
||||
|
||||
"use specific name when newFromConfig" in {
|
||||
val dispatcher = newFromConfig("myapp.mydispatcher", defaultGlobalDispatcher, dispatcherConf)
|
||||
val dispatcher = newFromConfig("myapp.mydispatcher")
|
||||
dispatcher.name must be("mydispatcher")
|
||||
}
|
||||
|
||||
"use default dispatcher when not configured" in {
|
||||
val dispatcher = newFromConfig("myapp.other-dispatcher")
|
||||
dispatcher must be === defaultGlobalDispatcher
|
||||
}
|
||||
|
||||
"throw IllegalArgumentException if type does not exist" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist").asJava).withFallback(defaultDispatcherConfig))
|
||||
|
|
@ -81,6 +88,13 @@ class DispatchersSpec extends AkkaSpec {
|
|||
assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1).get)))
|
||||
}
|
||||
|
||||
"provide lookup of dispatchers by key" in {
|
||||
val d1 = lookup("myapp.mydispatcher")
|
||||
val d2 = lookup("myapp.mydispatcher")
|
||||
d1 must be === d2
|
||||
d1.name must be("mydispatcher")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ akka {
|
|||
type = "Dispatcher" # Must be one of the following
|
||||
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
||||
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
||||
name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted
|
||||
name = "DefaultDispatcher" # Name used in log messages and thread names.
|
||||
keep-alive-time = 60s # Keep alive time for threads
|
||||
core-pool-size-min = 8 # minimum number of threads to cap factor-based core number to
|
||||
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
|
||||
|
|
@ -115,7 +115,8 @@ akka {
|
|||
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
|
||||
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
||||
allow-core-timeout = on # Allow core threads to time out
|
||||
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
||||
throughput = 5 # Throughput defines the number of messages that are processed in a batch before the
|
||||
# thread is returned to the pool. Set to 1 for as fair as possible.
|
||||
throughput-deadline-time = 0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
|
||||
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||
# If positive then a bounded mailbox is used and the capacity is set using the property
|
||||
|
|
|
|||
|
|
@ -4,16 +4,19 @@
|
|||
|
||||
package akka.dispatch
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import akka.actor.LocalActorRef
|
||||
import akka.actor.newUuid
|
||||
import akka.util.{ Duration, ReflectiveAccess }
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.EventStream
|
||||
import akka.actor.Scheduler
|
||||
import akka.actor.ActorSystem.Settings
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.config.ConfigurationException
|
||||
|
||||
trait DispatcherPrerequisites {
|
||||
def eventStream: EventStream
|
||||
|
|
@ -27,6 +30,10 @@ case class DefaultDispatcherPrerequisites(
|
|||
val scheduler: Scheduler) extends DispatcherPrerequisites
|
||||
|
||||
/**
|
||||
* It is recommended to define the dispatcher in configuration to allow for tuning
|
||||
* for different environments. Use the `lookup` or `newFromConfig` method to create
|
||||
* a dispatcher as specified in configuration.
|
||||
*
|
||||
* Scala API. Dispatcher factory.
|
||||
* <p/>
|
||||
* Example usage:
|
||||
|
|
@ -62,9 +69,33 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
|
||||
val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")
|
||||
|
||||
// TODO PN Shouldn't we fail hard if default-dispatcher is wrong?
|
||||
lazy val defaultGlobalDispatcher =
|
||||
from(defaultDispatcherConfig) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build
|
||||
lazy val defaultGlobalDispatcher: MessageDispatcher =
|
||||
from(defaultDispatcherConfig) getOrElse {
|
||||
throw new ConfigurationException("Wrong configuration [akka.actor.default-dispatcher]")
|
||||
}
|
||||
|
||||
// FIXME: Dispatchers registered here are are not removed, see ticket #1494
|
||||
private val dispatchers = new ConcurrentHashMap[String, MessageDispatcher]
|
||||
|
||||
/**
|
||||
* Returns a dispatcher as specified in configuration, or if not defined it uses
|
||||
* the default dispatcher. The same dispatcher instance is returned for subsequent
|
||||
* lookups.
|
||||
*/
|
||||
def lookup(key: String): MessageDispatcher = {
|
||||
dispatchers.get(key) match {
|
||||
case null ⇒
|
||||
// It doesn't matter if we create a dispatcher that isn't used due to concurrent lookup.
|
||||
// That shouldn't happen often and in case it does the actual ExecutorService isn't
|
||||
// created until used, i.e. cheap.
|
||||
val newDispatcher = newFromConfig(key)
|
||||
dispatchers.putIfAbsent(key, newDispatcher) match {
|
||||
case null ⇒ newDispatcher
|
||||
case existing ⇒ existing
|
||||
}
|
||||
case existing ⇒ existing
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||
|
|
@ -133,7 +164,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
new Dispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||
* Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
*/
|
||||
|
|
@ -142,7 +173,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||
* Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
*/
|
||||
|
|
@ -152,7 +183,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||
* Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
*/
|
||||
|
|
@ -162,7 +193,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||
* Creates a executor-based event-driven dispatcher, with work-sharing, serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
*/
|
||||
|
|
@ -170,6 +201,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||
new BalancingDispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType,
|
||||
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||
|
||||
/**
|
||||
* Creates a new dispatcher as specified in configuration
|
||||
* or if not defined it uses the supplied dispatcher.
|
||||
|
|
@ -183,7 +215,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
case true ⇒
|
||||
val conf = cfg.getConfig(key)
|
||||
val confWithName = conf.withFallback(ConfigFactory.parseMap(Map("name" -> simpleName).asJava))
|
||||
from(confWithName).getOrElse(default)
|
||||
from(confWithName).getOrElse(throw new ConfigurationException("Wrong configuration [%s]".format(key)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class PinnedDispatcher(
|
|||
_shutdownTimeout) {
|
||||
|
||||
@volatile
|
||||
protected[akka] var owner: ActorCell = _actor
|
||||
private var owner: ActorCell = _actor
|
||||
|
||||
//Relies on an external lock provided by MessageDispatcher.attach
|
||||
protected[akka] override def register(actorCell: ActorCell) = {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import akka.actor.Actor
|
|||
import akka.actor.Props
|
||||
import akka.util.duration._
|
||||
|
||||
|
||||
//#imports1
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
|
|
@ -37,8 +36,7 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
50 milliseconds,
|
||||
new Runnable {
|
||||
def run = testActor ! "foo"
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
//#schedule-one-off-runnable
|
||||
|
||||
|
|
@ -57,9 +55,9 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
//to the tickActor after 0ms repeating every 50ms
|
||||
val cancellable =
|
||||
system.scheduler.schedule(0 milliseconds,
|
||||
50 milliseconds,
|
||||
tickActor,
|
||||
Tick)
|
||||
50 milliseconds,
|
||||
tickActor,
|
||||
Tick)
|
||||
|
||||
//This cancels further Ticks to be sent
|
||||
cancellable.cancel()
|
||||
|
|
|
|||
|
|
@ -28,10 +28,10 @@ configuration for each actor system, and grab the specific configuration when in
|
|||
::
|
||||
|
||||
myapp1 {
|
||||
akka.logLevel = WARNING
|
||||
akka.loglevel = WARNING
|
||||
}
|
||||
myapp2 {
|
||||
akka.logLevel = ERROR
|
||||
akka.loglevel = ERROR
|
||||
}
|
||||
|
||||
.. code-block:: scala
|
||||
|
|
@ -120,7 +120,7 @@ A custom ``application.conf`` might look like this::
|
|||
|
||||
actor {
|
||||
default-dispatcher {
|
||||
throughput = 10 # Throughput for default Dispatcher, set to 1 for complete fairness
|
||||
throughput = 10 # Throughput for default Dispatcher, set to 1 for as fair as possible
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -74,8 +74,10 @@ public class UntypedActorTestBase {
|
|||
public void propsActorOf() {
|
||||
ActorSystem system = ActorSystem.create("MySystem");
|
||||
//#creating-props
|
||||
MessageDispatcher dispatcher = system.dispatcherFactory().newFromConfig("my-dispatcher");
|
||||
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher), "myactor");
|
||||
MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher");
|
||||
ActorRef myActor = system.actorOf(
|
||||
new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher),
|
||||
"myactor");
|
||||
//#creating-props
|
||||
myActor.tell("test");
|
||||
system.stop();
|
||||
|
|
|
|||
|
|
@ -20,8 +20,9 @@ import org.junit.Test;
|
|||
import scala.Option;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
|
||||
|
|
@ -30,11 +31,11 @@ public class LoggingDocTestBase {
|
|||
@Test
|
||||
public void useLoggingActor() {
|
||||
ActorSystem system = ActorSystem.create("MySystem");
|
||||
ActorRef myActor = system.actorOf(new UntypedActorFactory() {
|
||||
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new MyActor();
|
||||
}
|
||||
});
|
||||
}));
|
||||
myActor.tell("test");
|
||||
system.stop();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
"creating actor with Props" in {
|
||||
//#creating-props
|
||||
import akka.actor.Props
|
||||
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
|
||||
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
||||
//#creating-props
|
||||
|
||||
|
|
|
|||
127
akka-docs/scala/code/DispatcherDocSpec.scala
Normal file
127
akka-docs/scala/code/DispatcherDocSpec.scala
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
package akka.docs.dispatcher
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.dispatch.PriorityGenerator
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.dispatch.UnboundedPriorityMailbox
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.util.duration._
|
||||
import akka.actor.PoisonPill
|
||||
|
||||
object DispatcherDocSpec {
|
||||
val config = """
|
||||
//#my-dispatcher-config
|
||||
my-dispatcher {
|
||||
type = Dispatcher # Dispatcher is the name of the event-based dispatcher
|
||||
core-pool-size-min = 2 # minimum number of threads to cap factor-based core number to
|
||||
core-pool-size-factor = 2.0 # No of core threads ... ceil(available processors * factor)
|
||||
core-pool-size-max = 10 # maximum number of threads to cap factor-based number to
|
||||
throughput = 100 # Throughput defines the number of messages that are processed in a batch before the
|
||||
# thread is returned to the pool. Set to 1 for as fair as possible.
|
||||
}
|
||||
//#my-dispatcher-config
|
||||
|
||||
//#my-pinned-config
|
||||
my-pinned-dispatcher {
|
||||
type = Dispatcher
|
||||
core-pool-size-min = 1
|
||||
core-pool-size-max = 1
|
||||
}
|
||||
//#my-pinned-config
|
||||
|
||||
//#my-bounded-config
|
||||
my-dispatcher-bounded-queue {
|
||||
type = Dispatcher
|
||||
core-pool-size-factor = 8.0
|
||||
max-pool-size-factor = 16.0
|
||||
task-queue-size = 100 # Specifies the bounded capacity of the task queue
|
||||
task-queue-type = "array" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
||||
throughput = 3
|
||||
}
|
||||
//#my-bounded-config
|
||||
|
||||
//#my-balancing-config
|
||||
my-balancing-dispatcher {
|
||||
type = BalancingDispatcher
|
||||
}
|
||||
//#my-balancing-config
|
||||
"""
|
||||
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case x ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||
|
||||
import DispatcherDocSpec.MyActor
|
||||
|
||||
"defining dispatcher" in {
|
||||
//#defining-dispatcher
|
||||
import akka.actor.Props
|
||||
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
|
||||
val myActor1 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor1")
|
||||
val myActor2 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor2")
|
||||
//#defining-dispatcher
|
||||
}
|
||||
|
||||
"defining dispatcher with bounded queue" in {
|
||||
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher-bounded-queue")
|
||||
}
|
||||
|
||||
"defining priority dispatcher" in {
|
||||
//#prio-dispatcher
|
||||
val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
||||
case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible
|
||||
case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible
|
||||
case PoisonPill ⇒ 1000 // PoisonPill when no other left
|
||||
case otherwise ⇒ 50 // We default to 50
|
||||
}
|
||||
|
||||
// We create a new Priority dispatcher and seed it with the priority generator
|
||||
val dispatcher = system.dispatcherFactory.newDispatcher("foo", 5, UnboundedPriorityMailbox(gen)).build
|
||||
|
||||
val a = system.actorOf( // We create a new Actor that just prints out what it processes
|
||||
Props(new Actor {
|
||||
val log: LoggingAdapter = Logging(context.system, this)
|
||||
|
||||
self ! 'lowpriority
|
||||
self ! 'lowpriority
|
||||
self ! 'highpriority
|
||||
self ! 'pigdog
|
||||
self ! 'pigdog2
|
||||
self ! 'pigdog3
|
||||
self ! 'highpriority
|
||||
self ! PoisonPill
|
||||
|
||||
def receive = {
|
||||
case x ⇒ log.info(x.toString)
|
||||
}
|
||||
}).withDispatcher(dispatcher))
|
||||
|
||||
/*
|
||||
Logs:
|
||||
'highpriority
|
||||
'highpriority
|
||||
'pigdog
|
||||
'pigdog2
|
||||
'pigdog3
|
||||
'lowpriority
|
||||
'lowpriority
|
||||
*/
|
||||
//#prio-dispatcher
|
||||
|
||||
awaitCond(a.isTerminated, 5 seconds)
|
||||
}
|
||||
|
||||
"defining balancing dispatcher" in {
|
||||
val dispatcher = system.dispatcherFactory.lookup("my-balancing-dispatcher")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,107 +16,97 @@ The event-based Actors currently consume ~600 bytes per Actor which means that y
|
|||
Default dispatcher
|
||||
------------------
|
||||
|
||||
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created. The dispatcher used is this one:
|
||||
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created.
|
||||
The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher``
|
||||
section of the :ref:`configuration`.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
Dispatchers.globalDispatcher
|
||||
|
||||
But if you feel that you are starting to contend on the single dispatcher (the 'Executor' and its queue) or want to group a specific set of Actors for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher. See below for details on which ones are available and how they can be configured.
|
||||
If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors
|
||||
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher.
|
||||
See below for details on which ones are available and how they can be configured.
|
||||
|
||||
Setting the dispatcher
|
||||
----------------------
|
||||
|
||||
Normally you set the dispatcher from within the Actor itself. The dispatcher is defined by the 'dispatcher: MessageDispatcher' member field in 'ActorRef'.
|
||||
You specify the dispatcher to use when creating an actor.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class MyActor extends Actor {
|
||||
self.dispatcher = ... // set the dispatcher
|
||||
...
|
||||
}
|
||||
|
||||
You can also set the dispatcher for an Actor **before** it has been started:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
actorRef.dispatcher = dispatcher
|
||||
.. includecode:: code/DispatcherDocSpec.scala
|
||||
:include: imports,defining-dispatcher
|
||||
|
||||
Types of dispatchers
|
||||
--------------------
|
||||
|
||||
There are six different types of message dispatchers:
|
||||
There are 4 different types of message dispatchers:
|
||||
|
||||
* Thread-based
|
||||
* Thread-based (Pinned)
|
||||
* Event-based
|
||||
* Priority event-based
|
||||
* Work-stealing
|
||||
* Work-stealing (Balancing)
|
||||
|
||||
Factory methods for all of these, including global versions of some of them, are in the 'akka.dispatch.Dispatchers' object.
|
||||
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
|
||||
|
||||
Example of a custom event-based dispatcher, which can be fetched with ``system.dispatcherFactory.lookup("my-dispatcher")``
|
||||
as in the example above:
|
||||
|
||||
.. includecode:: code/DispatcherDocSpec.scala#my-dispatcher-config
|
||||
|
||||
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined.
|
||||
|
||||
.. warning::
|
||||
|
||||
Factory methods for creating dispatchers programmatically are available in ``akka.dispatch.Dispatchers``, i.e.
|
||||
``dispatcherFactory`` of the ``ActorSystem``. These methods will probably be changed or removed before
|
||||
2.0 final release, because dispatchers need to be defined by configuration to work in a clustered setup.
|
||||
|
||||
Let's now walk through the different dispatchers in more detail.
|
||||
|
||||
Thread-based
|
||||
^^^^^^^^^^^^
|
||||
|
||||
The 'PinnedDispatcher' binds a dedicated OS thread to each specific Actor. The messages are posted to a 'LinkedBlockingQueue' which feeds the messages to the dispatcher one by one. A 'PinnedDispatcher' cannot be shared between actors. This dispatcher has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other.
|
||||
The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a
|
||||
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
|
||||
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
|
||||
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
|
||||
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
|
||||
this dispatcher is that Actors do not block threads for each other.
|
||||
|
||||
It would normally by used from within the actor like this:
|
||||
FIXME PN: Is this the way to configure a PinnedDispatcher, and then why "A ``PinnedDispatcher`` cannot be shared between actors."
|
||||
|
||||
.. code-block:: scala
|
||||
The ``PinnedDispatcher`` is configured as a event-based dispatcher with with core pool size of 1.
|
||||
|
||||
class MyActor extends Actor {
|
||||
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
||||
...
|
||||
}
|
||||
.. includecode:: code/DispatcherDocSpec.scala#my-pinned-config
|
||||
|
||||
Event-based
|
||||
^^^^^^^^^^^
|
||||
|
||||
The 'Dispatcher' binds a set of Actors to a thread pool backed up by a 'BlockingQueue'. This dispatcher is highly configurable and supports a fluent configuration API to configure the 'BlockingQueue' (type of queue, max items etc.) as well as the thread pool.
|
||||
The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
|
||||
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable
|
||||
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
|
||||
|
||||
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency.
|
||||
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g.
|
||||
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor
|
||||
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
|
||||
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
|
||||
design and implement your system in the most efficient way in regards to performance, throughput and latency.
|
||||
|
||||
It comes with many different predefined BlockingQueue configurations:
|
||||
|
||||
* Bounded LinkedBlockingQueue
|
||||
* Unbounded LinkedBlockingQueue
|
||||
* Bounded ArrayBlockingQueue
|
||||
* Unbounded ArrayBlockingQueue
|
||||
* SynchronousQueue
|
||||
* Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
|
||||
* Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
|
||||
* Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
|
||||
* Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
|
||||
* `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
|
||||
|
||||
You can also set the rejection policy that should be used, e.g. what should be done if the dispatcher (e.g. the Actor) can't keep up and the mailbox is growing up to the limit defined. You can choose between four different rejection policies:
|
||||
When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's
|
||||
thread as a way to slow him down and balance producer/consumer.
|
||||
|
||||
* java.util.concurrent.ThreadPoolExecutor.CallerRuns - will run the message processing in the caller's thread as a way to slow him down and balance producer/consumer
|
||||
* java.util.concurrent.ThreadPoolExecutor.AbortPolicy - rejected messages by throwing a 'RejectedExecutionException'
|
||||
* java.util.concurrent.ThreadPoolExecutor.DiscardPolicy - discards the message (throws it away)
|
||||
* java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy - discards the oldest message in the mailbox (throws it away)
|
||||
Here is an example of a bounded mailbox:
|
||||
|
||||
You can read more about these policies `here <http://java.sun.com/javase/6/docs/api/index.html?java/util/concurrent/RejectedExecutionHandler.html>`_.
|
||||
|
||||
Here is an example:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.dispatch.Dispatchers
|
||||
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
|
||||
|
||||
class MyActor extends Actor {
|
||||
self.dispatcher = Dispatchers.newDispatcher(name, throughput = 15)
|
||||
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
|
||||
.setCorePoolSize(16)
|
||||
.setMaxPoolSize(128)
|
||||
.setKeepAliveTimeInMillis(60000)
|
||||
.setRejectionPolicy(new CallerRunsPolicy)
|
||||
.build
|
||||
...
|
||||
}
|
||||
.. includecode:: code/DispatcherDocSpec.scala#my-bounded-config
|
||||
|
||||
The standard :class:`Dispatcher` allows you to define the ``throughput`` it
|
||||
should have, as shown above. This defines the number of messages for a specific
|
||||
Actor the dispatcher should process in one single sweep; in other words, the
|
||||
dispatcher will bunch up to ``throughput`` message invocations together when
|
||||
dispatcher will batch process up to ``throughput`` messages together when
|
||||
having elected an actor to run. Setting this to a higher number will increase
|
||||
throughput but lower fairness, and vice versa. If you don't specify it explicitly
|
||||
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
|
||||
|
|
@ -127,62 +117,21 @@ Priority event-based
|
|||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply
|
||||
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a java.util.Comparator[MessageInvocation] or use a akka.dispatch.PriorityGenerator (recommended):
|
||||
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
|
||||
``akka.dispatch.PriorityGenerator`` (recommended):
|
||||
|
||||
Creating a Dispatcher using PriorityGenerator:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import akka.dispatch._
|
||||
import akka.actor._
|
||||
|
||||
val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
||||
case 'highpriority => 0 // 'highpriority messages should be treated first if possible
|
||||
case 'lowpriority => 100 // 'lowpriority messages should be treated last if possible
|
||||
case otherwise => 50 // We default to 50
|
||||
}
|
||||
|
||||
val a = Actor.actorOf( // We create a new Actor that just prints out what it processes
|
||||
Props(new Actor {
|
||||
self ! 'lowpriority
|
||||
self ! 'lowpriority
|
||||
self ! 'highpriority
|
||||
self ! 'pigdog
|
||||
self ! 'pigdog2
|
||||
self ! 'pigdog3
|
||||
self ! 'highpriority
|
||||
def receive = {
|
||||
case x => println(x)
|
||||
}
|
||||
}).withDispatcher(new Dispatcher("foo", 5, UnboundedPriorityMailbox(gen)))) // We create a new Priority dispatcher and seed it with the priority generator
|
||||
|
||||
Prints:
|
||||
|
||||
'highpriority
|
||||
'highpriority
|
||||
'pigdog
|
||||
'pigdog2
|
||||
'pigdog3
|
||||
'lowpriority
|
||||
'lowpriority
|
||||
.. includecode:: code/DispatcherDocSpec.scala#prio-dispatcher
|
||||
|
||||
Work-stealing event-based
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The 'BalancingDispatcher' is a variation of the 'Dispatcher' in which Actors of the same type can be set up to share this dispatcher and during execution time the different actors will steal messages from other actors if they have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency.
|
||||
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to
|
||||
share this dispatcher and during execution time the different actors will steal messages from other actors if they
|
||||
have less messages to process. This can be a great way to improve throughput at the cost of a little higher latency.
|
||||
|
||||
Normally the way you use it is to create an Actor companion object to hold the dispatcher and then set in in the Actor explicitly.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
object MyActor {
|
||||
val dispatcher = Dispatchers.newBalancingDispatcher(name).build
|
||||
}
|
||||
|
||||
class MyActor extends Actor {
|
||||
self.dispatcher = MyActor.dispatcher
|
||||
...
|
||||
}
|
||||
.. includecode:: code/DispatcherDocSpec.scala#my-balancing-config
|
||||
|
||||
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_
|
||||
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
|
||||
|
|
@ -193,14 +142,18 @@ Making the Actor mailbox bounded
|
|||
Global configuration
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the configuration file under 'default-dispatcher'. This will set it globally.
|
||||
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under
|
||||
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
|
||||
if not specified otherwise.
|
||||
|
||||
.. code-block:: ruby
|
||||
|
||||
actor {
|
||||
default-dispatcher {
|
||||
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||
# If positive then a bounded mailbox is used and the capacity is set to the number specified
|
||||
akka {
|
||||
actor {
|
||||
default-dispatcher {
|
||||
task-queue-size = 1000 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||
# If positive then a bounded mailbox is used and the capacity is set to the number specified
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -209,26 +162,11 @@ Per-instance based configuration
|
|||
|
||||
You can also do it on a specific dispatcher instance.
|
||||
|
||||
For the 'Dispatcher' and the 'ExecutorBasedWorkStealingDispatcher' you can do it through their constructor
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class MyActor extends Actor {
|
||||
val mailboxCapacity = BoundedMailbox(capacity = 100)
|
||||
self.dispatcher = Dispatchers.newDispatcher(name, throughput, mailboxCapacity).build
|
||||
...
|
||||
}
|
||||
|
||||
For the 'PinnedDispatcher', it is non-shareable between actors, and associates a dedicated Thread with the actor.
|
||||
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds). When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") if the message cannot be added to the mailbox within the time specified by the pushTimeout.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class MyActor extends Actor {
|
||||
import akka.util.duration._
|
||||
self.dispatcher = Dispatchers.newPinnedDispatcher(self, mailboxCapacity = 100,
|
||||
pushTimeOut = 10 seconds)
|
||||
...
|
||||
}
|
||||
.. includecode:: code/DispatcherDocSpec.scala#my-bounded-config
|
||||
|
||||
|
||||
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.
|
||||
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
|
||||
When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
|
||||
if the message cannot be added to the mailbox within the time specified by the pushTimeout.
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ The Typed Actors are implemented through `Typed Actors <http://en.wikipedia.org/
|
|||
|
||||
If you are using the `Spring Framework <http://springsource.org>`_ then take a look at Akka's `Spring integration <spring-integration>`_.
|
||||
|
||||
**WARNING:** Do not configure to use a ``WorkStealingDispatcher`` with your ``TypedActors``, it just isn't safe with how ``TypedActors`` currently are implemented. This limitation will most likely be removed in the future.
|
||||
**WARNING:** Do not configure to use a ``BalancingDispatcher`` with your ``TypedActors``, it just isn't safe with how ``TypedActors`` currently are implemented. This limitation will most likely be removed in the future.
|
||||
|
||||
Creating Typed Actors
|
||||
---------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue