!act #2927 Add BalancingPool and deprecate BalancingDispatcher

This commit is contained in:
Patrik Nordwall 2014-01-10 17:14:10 +01:00
parent bfbee94fec
commit d7aab50da0
26 changed files with 482 additions and 206 deletions

View file

@ -23,7 +23,7 @@ object ActorConfigurationVerificationSpec {
val config = """
balancing-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
throughput = 1
}
pinned-dispatcher {

View file

@ -34,18 +34,18 @@ object ActorMailboxSpec {
}
balancing-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
}
balancing-bounded-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
mailbox-push-timeout-time = 10s
mailbox-capacity = 1000
mailbox-type = "akka.dispatch.BoundedMailbox"
}
requiring-balancing-bounded-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
mailbox-requirement = "akka.actor.ActorMailboxSpec$MCBoundedMessageQueueSemantics"
}

View file

@ -194,6 +194,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
mapping("round-robin-group") should be(classOf[akka.routing.RoundRobinGroup].getName)
mapping("random-pool") should be(classOf[akka.routing.RandomPool].getName)
mapping("random-group") should be(classOf[akka.routing.RandomGroup].getName)
mapping("balancing-pool") should be(classOf[akka.routing.BalancingPool].getName)
mapping("smallest-mailbox-pool") should be(classOf[akka.routing.SmallestMailboxPool].getName)
mapping("broadcast-pool") should be(classOf[akka.routing.BroadcastPool].getName)
mapping("broadcast-group") should be(classOf[akka.routing.BroadcastGroup].getName)

View file

@ -26,7 +26,7 @@ object TypedActorSpec {
val config = """
pooled-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 60

View file

@ -12,7 +12,7 @@ import akka.testkit.AkkaSpec
object BalancingDispatcherSpec {
val config = """
pooled-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
throughput = 1
}
"""

View file

@ -28,7 +28,7 @@ object DispatchersSpec {
type = PinnedDispatcher
}
balancing-dispatcher {
type = BalancingDispatcher
type = "akka.dispatch.BalancingDispatcherConfigurator"
}
}
akka.actor.deployment {
@ -41,7 +41,10 @@ object DispatchersSpec {
/pool1 {
router = random-pool
nr-of-instances = 3
pool-dispatcher.type = BalancingDispatcher
pool-dispatcher {
fork-join-executor.parallelism-min = 3
fork-join-executor.parallelism-max = 3
}
}
}
"""
@ -71,7 +74,6 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend
def ofType[T <: MessageDispatcher: ClassTag]: (MessageDispatcher) Boolean = _.getClass == implicitly[ClassTag[T]].runtimeClass
def typesAndValidators: Map[String, (MessageDispatcher) Boolean] = Map(
"BalancingDispatcher" -> ofType[BalancingDispatcher],
"PinnedDispatcher" -> ofType[PinnedDispatcher],
"Dispatcher" -> ofType[Dispatcher])

View file

@ -29,9 +29,6 @@ object ResizerSpec {
}
}
}
bal-disp {
type = BalancingDispatcher
}
"""
class TestActor extends Actor {

View file

@ -0,0 +1,88 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import language.postfixOps
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.{ Props, Actor }
import akka.testkit.{ TestLatch, ImplicitSender, AkkaSpec }
import akka.actor.ActorRef
import org.scalatest.BeforeAndAfterEach
object BalancingSpec {
val counter = new AtomicInteger(1)
class Worker(latch: TestLatch) extends Actor {
lazy val id = counter.getAndIncrement()
def receive = {
case msg
if (id == 1) Thread.sleep(10) // dispatch to other routees
else Await.ready(latch, 1.minute)
sender ! id
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BalancingSpec extends AkkaSpec(
"""
akka.actor.deployment {
/balancingPool-2 {
router = balancing-pool
nr-of-instances = 10
pool-dispatcher {
attempt-teamwork = on
}
}
}
""") with ImplicitSender with BeforeAndAfterEach {
import BalancingSpec._
val poolSize = 10
override def beforeEach(): Unit = {
counter.set(1)
}
def test(pool: ActorRef, latch: TestLatch): Unit = {
val iterationCount = 100
for (i 1 to iterationCount) {
pool ! "hit-" + i
}
// all but one worker are blocked
val replies1 = receiveN(iterationCount - poolSize + 1)
expectNoMsg(1.second)
// all replies from the unblocked worker so far
replies1.toSet should be(Set(1))
latch.countDown()
val replies2 = receiveN(poolSize - 1)
// the remaining replies come from the blocked
replies2.toSet should be((2 to poolSize).toSet)
expectNoMsg(500.millis)
}
"balancing pool" must {
"deliver messages in a balancing fashion when defined programatically" in {
val latch = TestLatch(1)
val pool = system.actorOf(BalancingPool(poolSize).props(routeeProps =
Props(classOf[Worker], latch)), name = "balancingPool-1")
test(pool, latch)
}
"deliver messages in a balancing fashion when defined in config" in {
val latch = TestLatch(1)
val pool = system.actorOf(BalancingPool(1).props(routeeProps =
Props(classOf[Worker], latch)), name = "balancingPool-2")
test(pool, latch)
}
}
}

View file

@ -30,7 +30,8 @@ object ConfiguredLocalRoutingSpec {
router = random-pool
nr-of-instances = 4
pool-dispatcher {
type = BalancingDispatcher
fork-join-executor.parallelism-min = 4
fork-join-executor.parallelism-max = 4
}
}
/paths {

View file

@ -27,9 +27,6 @@ object ResizerSpec {
}
}
}
bal-disp {
type = BalancingDispatcher
}
"""
class TestActor extends Actor {

View file

@ -102,6 +102,7 @@ akka {
round-robin-group = "akka.routing.RoundRobinGroup"
random-pool = "akka.routing.RandomPool"
random-group = "akka.routing.RandomGroup"
balancing-pool = "akka.routing.BalancingPool"
smallest-mailbox-pool = "akka.routing.SmallestMailboxPool"
broadcast-pool = "akka.routing.BroadcastPool"
broadcast-group = "akka.routing.BroadcastGroup"
@ -173,7 +174,8 @@ akka {
# 'pool-dispatcher' in the deployment section of the router.
# For example:
# pool-dispatcher {
# type = BalancingDispatcher
# fork-join-executor.parallelism-min = 5
# fork-join-executor.parallelism-max = 5
# }
# Routers with dynamically resizable number of routees; this feature is
@ -227,12 +229,11 @@ akka {
default-dispatcher {
# Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are
# of the same type), PinnedDispatcher, or a FQCN to a class inheriting
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used toghether with executor=thread-pool-executor.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
# Which kind of ExecutorService to use for this dispatcher

View file

@ -28,6 +28,7 @@ import scala.concurrent.duration.FiniteDuration
* @see akka.dispatch.BalancingDispatcher
* @see akka.dispatch.Dispatchers
*/
@deprecated("Use BalancingPool instead of BalancingDispatcher", "2.3")
class BalancingDispatcher(
_configurator: MessageDispatcherConfigurator,
_id: String,

View file

@ -83,7 +83,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
* using this dispatcher, because the details can only be checked by trying
* to instantiate it, which might be undesirable when just checking.
*/
def hasDispatcher(id: String): Boolean = cachingConfig.hasPath(id)
def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) || cachingConfig.hasPath(id)
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
dispatcherConfigurators.get(id) match {
@ -104,17 +104,40 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
}
}
//INTERNAL API
/**
* Register a [[MessageDispatcherConfigurator]] that will be
* used by [[#lookup]] and [[#hasDispatcher]] instead of looking
* up the configurator from the system configuration.
* This enables dynamic addition of dispatchers, as used by the
* [[akka.routing.BalancingPool]].
*
* A configurator for a certain id can only be registered once, i.e.
* it can not be replaced. It is safe to call this method multiple times,
* but only the first registration will be used. This method returns `true` if
* the specified configurator was successfully registered.
*/
def registerConfigurator(id: String, configurator: MessageDispatcherConfigurator): Boolean =
dispatcherConfigurators.putIfAbsent(id, configurator) == null
/**
* INTERNAL API
*/
private[akka] def config(id: String): Config = {
config(id, settings.config.getConfig(id))
}
/**
* INTERNAL API
*/
private[akka] def config(id: String, appConfig: Config): Config = {
import scala.collection.JavaConverters._
def simpleName = id.substring(id.lastIndexOf('.') + 1)
idConfig(id)
.withFallback(settings.config.getConfig(id))
.withFallback(appConfig)
.withFallback(ConfigFactory.parseMap(Map("name" -> simpleName).asJava))
.withFallback(defaultDispatcherConfig)
}
//INTERNAL API
private def idConfig(id: String): Config = {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(Map("id" -> id).asJava)
@ -134,8 +157,6 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
*/
private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher()
private[akka] def isBalancingDispatcher(id: String): Boolean = settings.config.hasPath(id) && config(id).getString("type") == "BalancingDispatcher"
/**
* INTERNAL API
*
@ -150,9 +171,13 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
cfg.getString("type") match {
case "Dispatcher" new DispatcherConfigurator(cfg, prerequisites)
case "BalancingDispatcher" new BalancingDispatcherConfigurator(cfg, prerequisites)
case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
case "Dispatcher" new DispatcherConfigurator(cfg, prerequisites)
case "BalancingDispatcher"
// FIXME remove this case in 2.4
throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
classOf[BalancingDispatcherConfigurator].getName)
case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({

View file

@ -564,7 +564,7 @@ object UnboundedMailbox {
/**
* SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producersingle consumer, unbounded MailboxType,
* the only drawback is that you can't have multiple consumers,
* which rules out using it with BalancingDispatcher for instance.
* which rules out using it with BalancingPool (BalancingDispatcher) for instance.
*/
case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] {

View file

@ -0,0 +1,149 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import scala.collection.immutable
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.ActorContext
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.dispatch.BalancingDispatcherConfigurator
import akka.dispatch.Dispatchers
/**
* INTERNAL API
*/
private[akka] object BalancingRoutingLogic {
def apply(): BalancingRoutingLogic = new BalancingRoutingLogic
}
/**
* INTERNAL API
* Selects the first routee, balancing will be done by the dispatcher.
*/
@SerialVersionUID(1L)
private[akka] final class BalancingRoutingLogic extends RoutingLogic {
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else routees.head
}
/**
* A router pool that will try to redistribute work from busy routees to idle routees.
* All routees share the same mailbox.
*
* Although the technique used in this implementation is commonly known as "work stealing", the
* actual implementation is probably best described as "work donating" because the actor of which
* work is being stolen takes the initiative.
*
* The configuration parameter trumps the constructor arguments. This means that
* if you provide `nrOfInstances` during instantiation they will be ignored if
* the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param nrOfInstances initial number of routees in the pool
*
* @param supervisorStrategy strategy for supervising the routees, see 'Supervision Setup'
*
* @param routerDispatcher dispatcher to use for the router head actor, which handles
* supervision, death watch and router management messages
*/
@SerialVersionUID(1L)
final case class BalancingPool(
override val nrOfInstances: Int,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends Pool {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"))
/**
* Java API
* @param nr initial number of routees in the pool
*/
def this(nr: Int) = this(nrOfInstances = nr)
override def createRouter(system: ActorSystem): Router = new Router(BalancingRoutingLogic())
/**
* Setting the supervisor strategy to be used for the head Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): BalancingPool = copy(supervisorStrategy = strategy)
/**
* Setting the dispatcher to be used for the router head actor, which handles
* supervision, death watch and router management messages.
*/
def withDispatcher(dispatcherId: String): BalancingPool = copy(routerDispatcher = dispatcherId)
/**
* INTERNAL API
*/
override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
val deployPath = context.self.path.elements.drop(1).mkString("/", "/", "")
val dispatcherId = s"BalancingPool-$deployPath"
def dispatchers = context.system.dispatchers
if (!dispatchers.hasDispatcher(dispatcherId)) {
// dynamically create the config and register the dispatcher configurator for the
// dispatcher of this pool
val deployDispatcherConfigPath = s"akka.actor.deployment.$deployPath.pool-dispatcher"
val systemConfig = context.system.settings.config
val dispatcherConfig = context.system.dispatchers.config(dispatcherId,
// use the user defined 'pool-dispatcher' config as fallback, if any
if (systemConfig.hasPath(deployDispatcherConfigPath)) systemConfig.getConfig(deployDispatcherConfigPath)
else ConfigFactory.empty)
dispatchers.registerConfigurator(dispatcherId, new BalancingDispatcherConfigurator(dispatcherConfig,
dispatchers.prerequisites))
}
val routeePropsWithDispatcher = routeeProps.withDispatcher(dispatcherId)
ActorRefRoutee(context.actorOf(routeePropsWithDispatcher))
}
/**
* Uses the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one.
*/
override def withFallback(other: RouterConfig): RouterConfig =
if (other == NoRouter) this // NoRouter is the default, hence neutral
else {
other match {
case p: Pool
if ((this.supervisorStrategy eq Pool.defaultSupervisorStrategy)
&& (p.supervisorStrategy ne Pool.defaultSupervisorStrategy))
this.withSupervisorStrategy(p.supervisorStrategy)
else this
case _ this
}
}
/**
* Resizer cannot be used together with BalancingPool
*/
override val resizer: Option[Resizer] = None
}

View file

@ -20,7 +20,6 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import akka.event.Logging.Warning
import akka.dispatch.{ MailboxType, MessageDispatcher }
import akka.dispatch.BalancingDispatcher
/**
* Sending this message to a router will make it send back its currently used routees.

View file

@ -202,99 +202,111 @@ public class RouterDocTest {
ActorRef router8 =
getContext().actorOf(new RandomGroup(paths).props(), "router8");
//#random-group-2
//#smallest-mailbox-pool-1
//#balancing-pool-1
ActorRef router9 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router9");
//#smallest-mailbox-pool-1
//#balancing-pool-1
//#smallest-mailbox-pool-2
//#balancing-pool-2
ActorRef router10 =
getContext().actorOf(new SmallestMailboxPool(5).props(
Props.create(Worker.class)), "router10");
//#smallest-mailbox-pool-2
//#balancing-pool-2
//#broadcast-pool-1
//#smallest-mailbox-pool-1
ActorRef router11 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router11");
//#smallest-mailbox-pool-1
//#smallest-mailbox-pool-2
ActorRef router12 =
getContext().actorOf(new SmallestMailboxPool(5).props(
Props.create(Worker.class)), "router12");
//#smallest-mailbox-pool-2
//#broadcast-pool-1
ActorRef router13 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router13");
//#broadcast-pool-1
//#broadcast-pool-2
ActorRef router12 =
ActorRef router14 =
getContext().actorOf(new BroadcastPool(5).props(Props.create(Worker.class)),
"router12");
"router14");
//#broadcast-pool-2
//#broadcast-group-1
ActorRef router13 =
getContext().actorOf(FromConfig.getInstance().props(), "router13");
ActorRef router15 =
getContext().actorOf(FromConfig.getInstance().props(), "router15");
//#broadcast-group-1
//#broadcast-group-2
ActorRef router14 =
getContext().actorOf(new BroadcastGroup(paths).props(), "router14");
ActorRef router16 =
getContext().actorOf(new BroadcastGroup(paths).props(), "router16");
//#broadcast-group-2
//#scatter-gather-pool-1
ActorRef router15 =
ActorRef router17 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router15");
Props.create(Worker.class)), "router17");
//#scatter-gather-pool-1
//#scatter-gather-pool-2
FiniteDuration within = FiniteDuration.create(10, TimeUnit.SECONDS);
ActorRef router16 =
ActorRef router18 =
getContext().actorOf(new ScatterGatherFirstCompletedPool(5, within).props(
Props.create(Worker.class)), "router16");
Props.create(Worker.class)), "router18");
//#scatter-gather-pool-2
//#scatter-gather-group-1
ActorRef router17 =
getContext().actorOf(FromConfig.getInstance().props(), "router17");
ActorRef router19 =
getContext().actorOf(FromConfig.getInstance().props(), "router19");
//#scatter-gather-group-1
//#scatter-gather-group-2
FiniteDuration within2 = FiniteDuration.create(10, TimeUnit.SECONDS);
ActorRef router18 =
ActorRef router20 =
getContext().actorOf(new ScatterGatherFirstCompletedGroup(paths, within2).props(),
"router18");
"router20");
//#scatter-gather-group-2
//#consistent-hashing-pool-1
ActorRef router19 =
ActorRef router21 =
getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)),
"router19");
"router21");
//#consistent-hashing-pool-1
//#consistent-hashing-pool-2
ActorRef router20 =
ActorRef router22 =
getContext().actorOf(new ConsistentHashingPool(5).props(
Props.create(Worker.class)), "router20");
Props.create(Worker.class)), "router22");
//#consistent-hashing-pool-2
//#consistent-hashing-group-1
ActorRef router21 =
getContext().actorOf(FromConfig.getInstance().props(), "router21");
ActorRef router23 =
getContext().actorOf(FromConfig.getInstance().props(), "router23");
//#consistent-hashing-group-1
//#consistent-hashing-group-2
ActorRef router22 =
getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router22");
ActorRef router24 =
getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router24");
//#consistent-hashing-group-2
//#resize-pool-1
ActorRef router23 =
ActorRef router25 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router23");
Props.create(Worker.class)), "router25");
//#resize-pool-1
//#resize-pool-2
DefaultResizer resizer = new DefaultResizer(2, 15);
ActorRef router24 =
ActorRef router26 =
getContext().actorOf(new RoundRobinPool(5).withResizer(resizer).props(
Props.create(Worker.class)), "router24");
Props.create(Worker.class)), "router26");
//#resize-pool-2
public void onReceive(Object msg) {}

View file

@ -87,27 +87,6 @@ There are 4 different types of message dispatchers:
- Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator``
by default a "thread-pool-executor"
* BalancingDispatcher
- This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.
- All the actors share a single Mailbox that they get their messages from.
- It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
- Sharability: Actors of the same type only
- Mailboxes: Any, creates one for all Actors
- Use cases: Work-sharing
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
- Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. (You can however use it for the **Routees**)
* CallingThreadDispatcher
- This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads,

View file

@ -4,8 +4,8 @@ Mailboxes
#########
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingDispatcher``
all actors with the same ``BalancingDispatcher`` will share a single instance.
Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingPool``
all routees will share a single mailbox instance.
Mailbox Selection
=================

View file

@ -252,6 +252,31 @@ RandomGroup defined in code:
.. includecode:: code/docs/jrouting/RouterDocTest.java
:include: paths,random-group-2
.. _balancing-pool-java:
BalancingPool
-------------
A Router that will try to redistribute work from busy routees to idle routees.
All routees share the same mailbox.
BalancingPool defined in configuration:
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-balancing-pool
.. includecode:: code/docs/jrouting/RouterDocTest.java#balancing-pool-1
BalancingPool defined in code:
.. includecode:: code/docs/jrouting/RouterDocTest.java#balancing-pool-2
Addition configuration for the balancing dispatcher, which is used by the pool,
can be configured in the ``pool-dispatcher`` section of the router deployment
configuration.
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-balancing-pool2
There is no Group variant of the BalancingPool.
SmallestMailboxPool
-------------------
@ -628,11 +653,7 @@ Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the pool will be taken from
``Props`` as described in :ref:`dispatchers-scala`. For a pool it
makes sense to configure the ``BalancingDispatcher`` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
``Props`` as described in :ref:`dispatchers-scala`.
To make it easy to define the dispatcher of the routees of the pool you can
define the dispatcher inline in the deployment section of the config.
@ -642,7 +663,6 @@ define the dispatcher inline in the deployment section of the config.
That is the only thing you need to do enable a dedicated dispatcher for a
pool.
.. note::
If you use a group of actors and route to their paths, then they will still use the same dispatcher
@ -662,28 +682,6 @@ implement the method in a suitable way.
.. note::
It is not allowed to configure the ``routerDispatcher`` to be a
:class:`BalancingDispatcher` since the messages meant for the special
router actor cannot be processed by any other actor.
At first glance there seems to be an overlap between the
:class:`BalancingDispatcher` and Routers, but they complement each other.
The balancing dispatcher is in charge of running the actors while the routers
are in charge of deciding which message goes where. A router can also have
children that span multiple actor systems, even remote ones, but a dispatcher
lives inside a single actor system.
When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher`
there are some configuration settings to take into account.
- There can only be ``nr-of-instances`` messages being processed at the same
time no matter how many threads are configured for the
:class:`BalancingDispatcher`.
- Having ``throughput`` set to a low number makes no sense since you will only
be handing off to another actor that processes the same :class:`MailBox`
as yourself, which can be costly. Either the message just got into the
mailbox and you can receive it as well as anybody else, or everybody else
is busy and you are the only one available to receive the message.
- Resizing the number of routees only introduce inertia, since resizing
is performed at specified intervals, but work stealing is instantaneous.
:class:`akka.dispatch.BalancingDispatcherConfigurator` since the messages meant
for the special router actor cannot be processed by any other actor.

View file

@ -134,3 +134,13 @@ The following, previously deprecated, features have been removed:
* DefaultScheduler superseded by LightArrayRevolverScheduler
BalancingDispatcher is Deprecated
=================================
Use ``BalancingPool`` instead of ``BalancingDispatcher``. See :ref:`documentation for Scala <balancing-pool-scala>` and
:ref:`documentation for Java <balancing-pool-java>`.
During a migration period you can still use BalancingDispatcher by specifying the full class name in the dispatcher configuration::
type = "akka.dispatch.BalancingDispatcherConfigurator"

View file

@ -114,17 +114,6 @@ object DispatcherDocSpec {
}
//#my-bounded-config
//#my-balancing-config
my-balancing-dispatcher {
type = BalancingDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 8.0
max-pool-size-factor = 16.0
}
}
//#my-balancing-config
//#prio-dispatcher-config
prio-dispatcher {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
@ -342,10 +331,6 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
}
}
"defining balancing dispatcher" in {
val dispatcher = system.dispatchers.lookup("my-balancing-dispatcher")
}
"require custom mailbox on dispatcher" in {
val myActor = system.actorOf(Props[MyActor].withDispatcher(
"custom-dispatcher"))

View file

@ -20,6 +20,7 @@ import akka.routing.DefaultResizer
import akka.routing.ScatterGatherFirstCompletedGroup
import akka.routing.RandomGroup
import akka.routing.ScatterGatherFirstCompletedPool
import akka.routing.BalancingPool
object RouterDocSpec {
@ -59,10 +60,31 @@ akka.actor.deployment {
}
}
#//#config-random-group
#//#config-balancing-pool
akka.actor.deployment {
/parent/router9 {
router = balancing-pool
nr-of-instances = 5
}
}
#//#config-balancing-pool
#//#config-balancing-pool2
akka.actor.deployment {
/parent/router9b {
router = balancing-pool
nr-of-instances = 5
pool-dispatcher {
attempt-teamwork = off
}
}
}
#//#config-balancing-pool2
#//#config-smallest-mailbox-pool
akka.actor.deployment {
/parent/router9 {
/parent/router11 {
router = smallest-mailbox-pool
nr-of-instances = 5
}
@ -71,7 +93,7 @@ akka.actor.deployment {
#//#config-broadcast-pool
akka.actor.deployment {
/parent/router11 {
/parent/router13 {
router = broadcast-pool
nr-of-instances = 5
}
@ -80,7 +102,7 @@ akka.actor.deployment {
#//#config-broadcast-group
akka.actor.deployment {
/parent/router13 {
/parent/router15 {
router = broadcast-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
}
@ -89,7 +111,7 @@ akka.actor.deployment {
#//#config-scatter-gather-pool
akka.actor.deployment {
/parent/router15 {
/parent/router17 {
router = scatter-gather-pool
nr-of-instances = 5
within = 10 seconds
@ -99,7 +121,7 @@ akka.actor.deployment {
#//#config-scatter-gather-group
akka.actor.deployment {
/parent/router17 {
/parent/router19 {
router = scatter-gather-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
within = 10 seconds
@ -109,7 +131,7 @@ akka.actor.deployment {
#//#config-consistent-hashing-pool
akka.actor.deployment {
/parent/router19 {
/parent/router21 {
router = consistent-hashing-pool
nr-of-instances = 5
virtual-nodes-factor = 10
@ -119,7 +141,7 @@ akka.actor.deployment {
#//#config-consistent-hashing-group
akka.actor.deployment {
/parent/router21 {
/parent/router23 {
router = consistent-hashing-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
virtual-nodes-factor = 10
@ -129,7 +151,7 @@ akka.actor.deployment {
#//#config-resize-pool
akka.actor.deployment {
/parent/router23 {
/parent/router25 {
router = round-robin-pool
resizer {
lower-bound = 2
@ -146,7 +168,8 @@ akka.actor.deployment {
router = random-pool
nr-of-instances = 5
pool-dispatcher {
type = BalancingDispatcher
fork-join-executor.parallelism-min = 5
fork-join-executor.parallelism-max = 5
}
}
}
@ -249,89 +272,99 @@ router-dispatcher {}
context.actorOf(RandomGroup(paths).props(), "router8")
//#random-group-2
//#smallest-mailbox-pool-1
//#balancing-pool-1
val router9: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router9")
//#balancing-pool-1
//#balancing-pool-2
val router10: ActorRef =
context.actorOf(BalancingPool(5).props(Props[Worker]), "router10")
//#balancing-pool-2
//#smallest-mailbox-pool-1
val router11: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router11")
//#smallest-mailbox-pool-1
//#smallest-mailbox-pool-2
val router10: ActorRef =
context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router10")
val router12: ActorRef =
context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router12")
//#smallest-mailbox-pool-2
//#broadcast-pool-1
val router11: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router11")
val router13: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router13")
//#broadcast-pool-1
//#broadcast-pool-2
val router12: ActorRef =
context.actorOf(BroadcastPool(5).props(Props[Worker]), "router12")
val router14: ActorRef =
context.actorOf(BroadcastPool(5).props(Props[Worker]), "router14")
//#broadcast-pool-2
//#broadcast-group-1
val router13: ActorRef =
context.actorOf(FromConfig.props(), "router13")
val router15: ActorRef =
context.actorOf(FromConfig.props(), "router15")
//#broadcast-group-1
//#broadcast-group-2
val router14: ActorRef =
context.actorOf(BroadcastGroup(paths).props(), "router14")
val router16: ActorRef =
context.actorOf(BroadcastGroup(paths).props(), "router16")
//#broadcast-group-2
//#scatter-gather-pool-1
val router15: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router15")
val router17: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router17")
//#scatter-gather-pool-1
//#scatter-gather-pool-2
val router16: ActorRef =
val router18: ActorRef =
context.actorOf(ScatterGatherFirstCompletedPool(5, within = 10.seconds).
props(Props[Worker]), "router16")
props(Props[Worker]), "router18")
//#scatter-gather-pool-2
//#scatter-gather-group-1
val router17: ActorRef =
context.actorOf(FromConfig.props(), "router17")
val router19: ActorRef =
context.actorOf(FromConfig.props(), "router19")
//#scatter-gather-group-1
//#scatter-gather-group-2
val router18: ActorRef =
val router20: ActorRef =
context.actorOf(ScatterGatherFirstCompletedGroup(paths,
within = 10.seconds).props(), "router18")
within = 10.seconds).props(), "router20")
//#scatter-gather-group-2
//#consistent-hashing-pool-1
val router19: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router19")
val router21: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router21")
//#consistent-hashing-pool-1
//#consistent-hashing-pool-2
val router20: ActorRef =
val router22: ActorRef =
context.actorOf(ConsistentHashingPool(5).props(Props[Worker]),
"router20")
"router22")
//#consistent-hashing-pool-2
//#consistent-hashing-group-1
val router21: ActorRef =
context.actorOf(FromConfig.props(), "router21")
val router23: ActorRef =
context.actorOf(FromConfig.props(), "router23")
//#consistent-hashing-group-1
//#consistent-hashing-group-2
val router22: ActorRef =
context.actorOf(ConsistentHashingGroup(paths).props(), "router22")
val router24: ActorRef =
context.actorOf(ConsistentHashingGroup(paths).props(), "router24")
//#consistent-hashing-group-2
//#resize-pool-1
val router23: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router23")
val router25: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router25")
//#resize-pool-1
//#resize-pool-2
val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
val router24: ActorRef =
val router26: ActorRef =
context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]),
"router24")
"router26")
//#resize-pool-2
def receive = {

View file

@ -4,8 +4,8 @@ Mailboxes
#########
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingDispatcher``
all actors with the same ``BalancingDispatcher`` will share a single instance.
Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingPool``
all routees will share a single mailbox instance.
Mailbox Selection
=================

View file

@ -251,6 +251,31 @@ RandomGroup defined in code:
.. includecode:: code/docs/routing/RouterDocSpec.scala
:include: paths,random-group-2
.. _balancing-pool-scala:
BalancingPool
-------------
A Router that will try to redistribute work from busy routees to idle routees.
All routees share the same mailbox.
BalancingPool defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-balancing-pool
.. includecode:: code/docs/routing/RouterDocSpec.scala#balancing-pool-1
BalancingPool defined in code:
.. includecode:: code/docs/routing/RouterDocSpec.scala#balancing-pool-2
Addition configuration for the balancing dispatcher, which is used by the pool,
can be configured in the ``pool-dispatcher`` section of the router deployment
configuration.
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-balancing-pool2
There is no Group variant of the BalancingPool.
SmallestMailboxPool
-------------------
@ -515,7 +540,7 @@ and when you receive the ``Routees`` reply you know that the preceeding change h
Dynamically Resizable Pool
^^^^^^^^^^^^^^^^^^^^^^^^^^
All pools can be used with a fixed number of routees or with a resize strategy to adjust the number
Most pools can be used with a fixed number of routees or with a resize strategy to adjust the number
of routees dynamically.
Pool with resizer defined in configuration:
@ -627,11 +652,7 @@ Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the pool will be taken from
``Props`` as described in :ref:`dispatchers-scala`. For a pool it
makes sense to configure the ``BalancingDispatcher`` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
``Props`` as described in :ref:`dispatchers-scala`.
To make it easy to define the dispatcher of the routees of the pool you can
define the dispatcher inline in the deployment section of the config.
@ -660,28 +681,5 @@ implement the method in a suitable way.
.. note::
It is not allowed to configure the ``routerDispatcher`` to be a
:class:`BalancingDispatcher` since the messages meant for the special
router actor cannot be processed by any other actor.
At first glance there seems to be an overlap between the
:class:`BalancingDispatcher` and Routers, but they complement each other.
The balancing dispatcher is in charge of running the actors while the routers
are in charge of deciding which message goes where. A router can also have
children that span multiple actor systems, even remote ones, but a dispatcher
lives inside a single actor system.
When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher`
there are some configuration settings to take into account.
- There can only be ``nr-of-instances`` messages being processed at the same
time no matter how many threads are configured for the
:class:`BalancingDispatcher`.
- Having ``throughput`` set to a low number makes no sense since you will only
be handing off to another actor that processes the same :class:`MailBox`
as yourself, which can be costly. Either the message just got into the
mailbox and you can receive it as well as anybody else, or everybody else
is busy and you are the only one available to receive the message.
- Resizing the number of routees only introduce inertia, since resizing
is performed at specified intervals, but work stealing is instantaneous.
:class:`akka.dispatch.BalancingDispatcherConfigurator` since the messages meant
for the special router actor cannot be processed by any other actor.

View file

@ -37,7 +37,7 @@ final class RemoteSettings(val config: Config) {
}
}
@deprecated("Use the RemoteLifecycleEventsLogLevel field instead.")
@deprecated("Use the RemoteLifecycleEventsLogLevel field instead.", "2.3")
def LogRemoteLifecycleEvents: Boolean = RemoteLifecycleEventsLogLevel >= Logging.ErrorLevel
val Dispatcher: String = getString("akka.remote.use-dispatcher")