diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 4ced9cd03c..62dc857b27 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -5,13 +5,14 @@ package akka.actor.dispatch import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.reflect.ClassTag - import com.typesafe.config.ConfigFactory - import akka.ConfigurationException import akka.actor.{ Actor, ActorRef, Props } import akka.dispatch.{ BalancingDispatcher, Dispatcher, Dispatchers, MessageDispatcher, PinnedDispatcher } import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.routing.FromConfig +import akka.actor.Identify +import akka.actor.ActorIdentity object DispatchersSpec { val config = """ @@ -37,7 +38,12 @@ object DispatchersSpec { /echo2 { dispatcher = myapp.mydispatcher } - } + /pool1 { + router = random-pool + nr-of-instances = 3 + pool-dispatcher.type = BalancingDispatcher + } + } """ class ThreadNameEcho extends Actor { @@ -173,6 +179,17 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher"), name = "echo2")) } + "use pool-dispatcher router of deployment config" in { + val pool = system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "pool1") + pool ! Identify(None) + val routee = expectMsgType[ActorIdentity].ref.get + routee ! "what's the name?" + val Expected = """(DispatchersSpec-akka\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""".r + expectMsgPF(remaining) { + case Expected(x) ⇒ + } + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 80b4ad64be..4e9a240aa9 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -29,6 +29,9 @@ object ConfiguredLocalRoutingSpec { /config { router = random-pool nr-of-instances = 4 + pool-dispatcher { + type = BalancingDispatcher + } } /paths { router = random-group @@ -100,7 +103,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con "be overridable in config" in { val actor = system.actorOf(RoundRobinPool(12).props(routeeProps = Props[EchoProps]), "config") - routerConfig(actor) must be === RandomPool(4) + routerConfig(actor) must be === RandomPool(nrOfInstances = 4, usePoolDispatcher = true) Await.result(gracefulStop(actor, 3 seconds), 3 seconds) } @@ -120,7 +123,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con "be overridable in config even with explicit deployment" in { val actor = system.actorOf(FromConfig.props(routeeProps = Props[EchoProps]). withDeploy(Deploy(routerConfig = RoundRobinPool(12))), "config") - routerConfig(actor) must be === RandomPool(4) + routerConfig(actor) must be === RandomPool(nrOfInstances = 4, usePoolDispatcher = true) Await.result(gracefulStop(actor, 3 seconds), 3 seconds) } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 43ab67a4b4..b90149e36e 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -173,6 +173,14 @@ akka { # precedence over nr-of-instances paths = [] } + + # To use a dedicated dispatcher for the routees of the pool you can + # define the dispatcher configuration inline with the property name + # 'pool-dispatcher' in the deployment section of the router. + # For example: + # pool-dispatcher { + # type = BalancingDispatcher + # } # Routers with dynamically resizable number of routees; this feature is # enabled by including (parts of) this section in the deployment diff --git a/akka-actor/src/main/scala/akka/routing/Broadcast.scala b/akka-actor/src/main/scala/akka/routing/Broadcast.scala index ff723c585c..bae2a05faa 100644 --- a/akka-actor/src/main/scala/akka/routing/Broadcast.scala +++ b/akka-actor/src/main/scala/akka/routing/Broadcast.scala @@ -60,13 +60,15 @@ final class BroadcastRoutingLogic extends RoutingLogic { final case class BroadcastPool( override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool with PoolOverrideUnsetConfig[BroadcastPool] { def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config)) + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 6ed724ad4f..298383a79c 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -276,13 +276,15 @@ final case class ConsistentHashingPool( val virtualNodesFactor: Int = 0, val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool with PoolOverrideUnsetConfig[ConsistentHashingPool] { def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config)) + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-actor/src/main/scala/akka/routing/Random.scala b/akka-actor/src/main/scala/akka/routing/Random.scala index eb0e64f380..9dc339d6e1 100644 --- a/akka-actor/src/main/scala/akka/routing/Random.scala +++ b/akka-actor/src/main/scala/akka/routing/Random.scala @@ -61,13 +61,15 @@ final class RandomRoutingLogic extends RoutingLogic { final case class RandomPool( override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool with PoolOverrideUnsetConfig[RandomPool] { def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config)) + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala index ceaadc701b..5dd1846f43 100644 --- a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala +++ b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala @@ -66,12 +66,14 @@ final class RoundRobinRoutingLogic extends RoutingLogic { final case class RoundRobinPool( override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool with PoolOverrideUnsetConfig[RoundRobinPool] { def this(config: Config) = this(nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config)) + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala index 56446c35cd..cc28349a42 100644 --- a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala +++ b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala @@ -181,11 +181,28 @@ trait Pool extends RouterConfig { */ def nrOfInstances: Int + /** + * Use a dedicated dispatcher for the routees of the pool. + * The dispatcher is defined in 'pool-dispatcher' configuration property in the + * deployment section of the router. + */ + def usePoolDispatcher: Boolean = false + /** * INTERNAL API */ private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = - ActorRefRoutee(context.actorOf(routeeProps)) + ActorRefRoutee(context.actorOf(enrichWithPoolDispatcher(routeeProps, context))) + + /** + * INTERNAL API + */ + private[akka] def enrichWithPoolDispatcher(routeeProps: Props, context: ActorContext): Props = + if (usePoolDispatcher && routeeProps.dispatcher == Dispatchers.DefaultDispatcherId) + routeeProps.withDispatcher("akka.actor.deployment." + context.self.path.elements.drop(1).mkString("/", "/", "") + + ".pool-dispatcher") + else + routeeProps /** * Pool with dynamically resizable number of routees return the [[akka.routing.Resizer]] diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index 8db5b0a76e..e4bfd89c36 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -95,14 +95,16 @@ final case class ScatterGatherFirstCompletedPool( override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, within: FiniteDuration, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool with PoolOverrideUnsetConfig[ScatterGatherFirstCompletedPool] { def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), within = Duration(config.getMilliseconds("within"), TimeUnit.MILLISECONDS), - resizer = DefaultResizer.fromConfig(config)) + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala index 45d8a9bb41..28547c2bac 100644 --- a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala +++ b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala @@ -176,13 +176,15 @@ class SmallestMailboxRoutingLogic extends RoutingLogic { final case class SmallestMailboxPool( override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool with PoolOverrideUnsetConfig[SmallestMailboxPool] { def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config)) + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala index a4fafe3a7f..289100008d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala @@ -130,12 +130,14 @@ final case class AdaptiveLoadBalancingPool( metricsSelector: MetricsSelector = MixMetricsSelector, override val nrOfInstances: Int = 0, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, - override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) extends Pool { def this(config: Config, dynamicAccess: DynamicAccess) = this(nrOfInstances = config.getInt("nr-of-instances"), - metricsSelector = MetricsSelector.fromConfig(config, dynamicAccess)) + metricsSelector = MetricsSelector.fromConfig(config, dynamicAccess), + usePoolDispatcher = config.hasPath("pool-dispatcher")) /** * Java API diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index cc30d1a6f7..b20032f2c9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -193,7 +193,8 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti */ override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = { val name = "c" + childNameCounter.incrementAndGet - val ref = context.asInstanceOf[ActorCell].attachChild(routeeProps, name, systemService = false) + val ref = context.asInstanceOf[ActorCell].attachChild( + local.enrichWithPoolDispatcher(routeeProps, context), name, systemService = false) ActorRefRoutee(ref) } diff --git a/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java index 2d14959ee4..935e86b29a 100644 --- a/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java @@ -4,22 +4,27 @@ package docs.jrouting; import akka.testkit.AkkaJUnitActorSystemResource; + import org.junit.ClassRule; import org.junit.Test; + import akka.testkit.JavaTestKit; import akka.actor.ActorSystem; //#imports1 import akka.actor.UntypedActor; import akka.routing.ConsistentHashingRouter.ConsistentHashable; + import java.util.Map; import java.util.HashMap; import java.io.Serializable; //#imports1 + //#imports2 import akka.actor.Props; import akka.actor.ActorRef; +import akka.routing.ConsistentHashingPool; import akka.routing.ConsistentHashingRouter; import akka.routing.ConsistentHashingRouter.ConsistentHashMapper; import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; @@ -120,7 +125,7 @@ public class ConsistentHashingRouterDocTest { }; ActorRef cache = system.actorOf( - new ConsistentHashingRouter(10).withHashMapper(hashMapper).props( + new ConsistentHashingPool(10).withHashMapper(hashMapper).props( Props.create(Cache.class)), "cache"); diff --git a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java index 857b4f1d2e..6d85759cb6 100644 --- a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java @@ -317,12 +317,11 @@ public class RouterDocTest { public void demonstrateDispatcher() { //#dispatchers Props props = - // “head” will run on "router-dispatcher" dispatcher - new RoundRobinPool(5).withDispatcher("router-dispatcher").props( - Props.create(Worker.class)) - // Worker routees will run on "workers-dispatcher" dispatcher - .withDispatcher("workers-dispatcher"); - ActorRef router = system.actorOf(props); + // “head” router actor will run on "router-dispatcher" dispatcher + // Worker routees will run on "pool-dispatcher" dispatcher + new RandomPool(5).withDispatcher("router-dispatcher").props( + Props.create(Worker.class)); + ActorRef router = system.actorOf(props, "poolWithDispatcher"); //#dispatchers } @@ -390,8 +389,8 @@ public class RouterDocTest { public void demonstrateRemoteDeploy() { //#remoteRoutees Address[] addresses = { - new Address("akka", "remotesys", "otherhost", 1234), - AddressFromURIString.parse("akka://othersys@anotherhost:1234")}; + new Address("akka.tcp", "remotesys", "otherhost", 1234), + AddressFromURIString.parse("akka.tcp://othersys@anotherhost:1234")}; ActorRef routerRemote = system.actorOf( new RemoteRouterConfig(new RoundRobinPool(5), addresses).props( Props.create(Echo.class))); diff --git a/akka-docs/rst/java/io-tcp.rst b/akka-docs/rst/java/io-tcp.rst index bc271c777a..b1e49bdf3a 100644 --- a/akka-docs/rst/java/io-tcp.rst +++ b/akka-docs/rst/java/io-tcp.rst @@ -133,7 +133,7 @@ Once a connection has been established data can be sent to it from any actor in Tcp.Write The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event. - A ``ByteString`` (as explained in :ref:`this section `) models one or more chunks of immutable + A ``ByteString`` (as explained in :ref:`this section `) models one or more chunks of immutable in-memory data with a maximum (total) size of 2 GB (2^31 bytes). Tcp.WriteFile diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index 1cbb9fbe9d..40efd3810b 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -85,7 +85,7 @@ nacked messages it may need to keep a buffer of pending messages. the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control not error handling. In other words, data may still be lost, even if every write is acknowledged. -.. _ByteString: +.. _bytestring_java: ByteString ^^^^^^^^^^ diff --git a/akka-docs/rst/java/routing.rst b/akka-docs/rst/java/routing.rst index 928595bda0..54ebc7faf6 100644 --- a/akka-docs/rst/java/routing.rst +++ b/akka-docs/rst/java/routing.rst @@ -627,16 +627,25 @@ The deployment section of the configuration is passed to the constructor. Configuring Dispatchers ^^^^^^^^^^^^^^^^^^^^^^^ -The dispatcher for created children of the router will be taken from -``Props`` as described in :ref:`dispatchers-java`. For a pool it +The dispatcher for created children of the pool will be taken from +``Props`` as described in :ref:`dispatchers-scala`. For a pool it makes sense to configure the ``BalancingDispatcher`` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by stealing it from their siblings. +To make it easy to define the dispatcher of the routees of the pool you can +define the dispatcher inline in the deployment section of the config. + +.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-pool-dispatcher + +That is the only thing you need to do enable a dedicated dispatcher for a +pool. + + .. note:: - If you provide a collection of actors to route to, then they will still use the same dispatcher + If you use a group of actors and route to their paths, then they will still use the same dispatcher that was configured for them in their ``Props``, it is not possible to change an actors dispatcher after it has been created. diff --git a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala index df4a6fb138..be4dce4799 100644 --- a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala @@ -140,8 +140,19 @@ akka.actor.deployment { } #//#config-resize-pool +#//#config-pool-dispatcher +akka.actor.deployment { + /poolWithDispatcher { + router = random-pool + nr-of-instances = 5 + pool-dispatcher { + type = BalancingDispatcher + } + } +} +#//#config-pool-dispatcher + router-dispatcher {} -workers-dispatcher {} """ case class Work(payload: String) @@ -351,10 +362,10 @@ class RouterDocSpec extends AkkaSpec(RouterDocSpec.config) with ImplicitSender { "demonstrate dispatcher" in { //#dispatchers val router: ActorRef = system.actorOf( - // “head” will run on "router-dispatcher" dispatcher - RoundRobinPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]) - // Worker routees will run on "workers-dispatcher" dispatcher - .withDispatcher("workers-dispatcher")) + // “head” router actor will run on "router-dispatcher" dispatcher + // Worker routees will run on "pool-dispatcher" dispatcher + RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]), + name = "poolWithDispatcher") //#dispatchers } @@ -410,8 +421,8 @@ class RouterDocSpec extends AkkaSpec(RouterDocSpec.config) with ImplicitSender { import akka.actor.{ Address, AddressFromURIString } import akka.remote.routing.RemoteRouterConfig val addresses = Seq( - Address("akka", "remotesys", "otherhost", 1234), - AddressFromURIString("akka://othersys@anotherhost:1234")) + Address("akka.tcp", "remotesys", "otherhost", 1234), + AddressFromURIString("akka.tcp://othersys@anotherhost:1234")) val routerRemote = system.actorOf( RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo])) //#remoteRoutees diff --git a/akka-docs/rst/scala/io-tcp.rst b/akka-docs/rst/scala/io-tcp.rst index c898dfb129..1508785e86 100644 --- a/akka-docs/rst/scala/io-tcp.rst +++ b/akka-docs/rst/scala/io-tcp.rst @@ -134,7 +134,7 @@ Once a connection has been established data can be sent to it from any actor in Tcp.Write The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event. - A ``ByteString`` (as explained in :ref:`this section `) models one or more chunks of immutable + A ``ByteString`` (as explained in :ref:`this section `) models one or more chunks of immutable in-memory data with a maximum (total) size of 2 GB (2^31 bytes). Tcp.WriteFile diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 88e1e64d10..60b94f3e28 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -90,7 +90,7 @@ nacked messages it may need to keep a buffer of pending messages. the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control not error handling. In other words, data may still be lost, even if every write is acknowledged. -.. _ByteString: +.. _bytestring_scala: ByteString ^^^^^^^^^^ diff --git a/akka-docs/rst/scala/routing.rst b/akka-docs/rst/scala/routing.rst index 451a7ff45e..83e02ff79e 100644 --- a/akka-docs/rst/scala/routing.rst +++ b/akka-docs/rst/scala/routing.rst @@ -626,16 +626,24 @@ The deployment section of the configuration is passed to the constructor. Configuring Dispatchers ^^^^^^^^^^^^^^^^^^^^^^^ -The dispatcher for created children of the router will be taken from +The dispatcher for created children of the pool will be taken from ``Props`` as described in :ref:`dispatchers-scala`. For a pool it makes sense to configure the ``BalancingDispatcher`` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by stealing it from their siblings. +To make it easy to define the dispatcher of the routees of the pool you can +define the dispatcher inline in the deployment section of the config. + +.. includecode:: code/docs/routing/RouterDocSpec.scala#config-pool-dispatcher + +That is the only thing you need to do enable a dedicated dispatcher for a +pool. + .. note:: - If you provide a collection of actors to route to, then they will still use the same dispatcher + If you use a group of actors and route to their paths, then they will still use the same dispatcher that was configured for them in their ``Props``, it is not possible to change an actors dispatcher after it has been created. diff --git a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala index ed26b80951..c02692fc41 100644 --- a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala @@ -54,7 +54,8 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten // attachChild means that the provider will treat this call as if possibly done out of the wrong // context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal // choice in a corner case (and hence not worth fixing). - val ref = context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) + val ref = context.asInstanceOf[ActorCell].attachChild( + local.enrichWithPoolDispatcher(routeeProps, context).withDeploy(deploy), name, systemService = false) ActorRefRoutee(ref) }