+act #3663 Package BalancingDispatcher for usage in router pool

* In fact, make it easy to define any dedicated dispatcher for a pool
This commit is contained in:
Patrik Nordwall 2013-10-16 13:02:35 +02:00
parent f6179da523
commit 80892762ad
22 changed files with 140 additions and 47 deletions

View file

@ -5,13 +5,14 @@ package akka.actor.dispatch
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.reflect.ClassTag
import com.typesafe.config.ConfigFactory
import akka.ConfigurationException
import akka.actor.{ Actor, ActorRef, Props }
import akka.dispatch.{ BalancingDispatcher, Dispatcher, Dispatchers, MessageDispatcher, PinnedDispatcher }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.routing.FromConfig
import akka.actor.Identify
import akka.actor.ActorIdentity
object DispatchersSpec {
val config = """
@ -37,7 +38,12 @@ object DispatchersSpec {
/echo2 {
dispatcher = myapp.mydispatcher
}
}
/pool1 {
router = random-pool
nr-of-instances = 3
pool-dispatcher.type = BalancingDispatcher
}
}
"""
class ThreadNameEcho extends Actor {
@ -173,6 +179,17 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend
system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher"), name = "echo2"))
}
"use pool-dispatcher router of deployment config" in {
val pool = system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "pool1")
pool ! Identify(None)
val routee = expectMsgType[ActorIdentity].ref.get
routee ! "what's the name?"
val Expected = """(DispatchersSpec-akka\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""".r
expectMsgPF(remaining) {
case Expected(x)
}
}
}
}

View file

@ -29,6 +29,9 @@ object ConfiguredLocalRoutingSpec {
/config {
router = random-pool
nr-of-instances = 4
pool-dispatcher {
type = BalancingDispatcher
}
}
/paths {
router = random-group
@ -100,7 +103,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
"be overridable in config" in {
val actor = system.actorOf(RoundRobinPool(12).props(routeeProps = Props[EchoProps]), "config")
routerConfig(actor) must be === RandomPool(4)
routerConfig(actor) must be === RandomPool(nrOfInstances = 4, usePoolDispatcher = true)
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
@ -120,7 +123,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
"be overridable in config even with explicit deployment" in {
val actor = system.actorOf(FromConfig.props(routeeProps = Props[EchoProps]).
withDeploy(Deploy(routerConfig = RoundRobinPool(12))), "config")
routerConfig(actor) must be === RandomPool(4)
routerConfig(actor) must be === RandomPool(nrOfInstances = 4, usePoolDispatcher = true)
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}

View file

@ -173,6 +173,14 @@ akka {
# precedence over nr-of-instances
paths = []
}
# To use a dedicated dispatcher for the routees of the pool you can
# define the dispatcher configuration inline with the property name
# 'pool-dispatcher' in the deployment section of the router.
# For example:
# pool-dispatcher {
# type = BalancingDispatcher
# }
# Routers with dynamically resizable number of routees; this feature is
# enabled by including (parts of) this section in the deployment

View file

@ -60,13 +60,15 @@ final class BroadcastRoutingLogic extends RoutingLogic {
final case class BroadcastPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[BroadcastPool] {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config))
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -276,13 +276,15 @@ final case class ConsistentHashingPool(
val virtualNodesFactor: Int = 0,
val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[ConsistentHashingPool] {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config))
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -61,13 +61,15 @@ final class RandomRoutingLogic extends RoutingLogic {
final case class RandomPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[RandomPool] {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config))
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -66,12 +66,14 @@ final class RoundRobinRoutingLogic extends RoutingLogic {
final case class RoundRobinPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[RoundRobinPool] {
def this(config: Config) =
this(nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config))
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -181,11 +181,28 @@ trait Pool extends RouterConfig {
*/
def nrOfInstances: Int
/**
* Use a dedicated dispatcher for the routees of the pool.
* The dispatcher is defined in 'pool-dispatcher' configuration property in the
* deployment section of the router.
*/
def usePoolDispatcher: Boolean = false
/**
* INTERNAL API
*/
private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee =
ActorRefRoutee(context.actorOf(routeeProps))
ActorRefRoutee(context.actorOf(enrichWithPoolDispatcher(routeeProps, context)))
/**
* INTERNAL API
*/
private[akka] def enrichWithPoolDispatcher(routeeProps: Props, context: ActorContext): Props =
if (usePoolDispatcher && routeeProps.dispatcher == Dispatchers.DefaultDispatcherId)
routeeProps.withDispatcher("akka.actor.deployment." + context.self.path.elements.drop(1).mkString("/", "/", "")
+ ".pool-dispatcher")
else
routeeProps
/**
* Pool with dynamically resizable number of routees return the [[akka.routing.Resizer]]

View file

@ -95,14 +95,16 @@ final case class ScatterGatherFirstCompletedPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
within: FiniteDuration,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[ScatterGatherFirstCompletedPool] {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
within = Duration(config.getMilliseconds("within"), TimeUnit.MILLISECONDS),
resizer = DefaultResizer.fromConfig(config))
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -176,13 +176,15 @@ class SmallestMailboxRoutingLogic extends RoutingLogic {
final case class SmallestMailboxPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[SmallestMailboxPool] {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config))
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -130,12 +130,14 @@ final case class AdaptiveLoadBalancingPool(
metricsSelector: MetricsSelector = MixMetricsSelector,
override val nrOfInstances: Int = 0,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool {
def this(config: Config, dynamicAccess: DynamicAccess) =
this(nrOfInstances = config.getInt("nr-of-instances"),
metricsSelector = MetricsSelector.fromConfig(config, dynamicAccess))
metricsSelector = MetricsSelector.fromConfig(config, dynamicAccess),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API

View file

@ -180,7 +180,8 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti
*/
override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
val name = "c" + childNameCounter.incrementAndGet
val ref = context.asInstanceOf[ActorCell].attachChild(routeeProps, name, systemService = false)
val ref = context.asInstanceOf[ActorCell].attachChild(
local.enrichWithPoolDispatcher(routeeProps, context), name, systemService = false)
ActorRefRoutee(ref)
}

View file

@ -4,22 +4,27 @@
package docs.jrouting;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;
import akka.testkit.JavaTestKit;
import akka.actor.ActorSystem;
//#imports1
import akka.actor.UntypedActor;
import akka.routing.ConsistentHashingRouter.ConsistentHashable;
import java.util.Map;
import java.util.HashMap;
import java.io.Serializable;
//#imports1
//#imports2
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.routing.ConsistentHashingPool;
import akka.routing.ConsistentHashingRouter;
import akka.routing.ConsistentHashingRouter.ConsistentHashMapper;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
@ -120,7 +125,7 @@ public class ConsistentHashingRouterDocTest {
};
ActorRef cache = system.actorOf(
new ConsistentHashingRouter(10).withHashMapper(hashMapper).props(
new ConsistentHashingPool(10).withHashMapper(hashMapper).props(
Props.create(Cache.class)),
"cache");

View file

@ -317,12 +317,11 @@ public class RouterDocTest {
public void demonstrateDispatcher() {
//#dispatchers
Props props =
// head will run on "router-dispatcher" dispatcher
new RoundRobinPool(5).withDispatcher("router-dispatcher").props(
Props.create(Worker.class))
// Worker routees will run on "workers-dispatcher" dispatcher
.withDispatcher("workers-dispatcher");
ActorRef router = system.actorOf(props);
// head router actor will run on "router-dispatcher" dispatcher
// Worker routees will run on "pool-dispatcher" dispatcher
new RandomPool(5).withDispatcher("router-dispatcher").props(
Props.create(Worker.class));
ActorRef router = system.actorOf(props, "poolWithDispatcher");
//#dispatchers
}
@ -390,8 +389,8 @@ public class RouterDocTest {
public void demonstrateRemoteDeploy() {
//#remoteRoutees
Address[] addresses = {
new Address("akka", "remotesys", "otherhost", 1234),
AddressFromURIString.parse("akka://othersys@anotherhost:1234")};
new Address("akka.tcp", "remotesys", "otherhost", 1234),
AddressFromURIString.parse("akka.tcp://othersys@anotherhost:1234")};
ActorRef routerRemote = system.actorOf(
new RemoteRouterConfig(new RoundRobinPool(5), addresses).props(
Props.create(Echo.class)));

View file

@ -133,7 +133,7 @@ Once a connection has been established data can be sent to it from any actor in
Tcp.Write
The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event.
A ``ByteString`` (as explained in :ref:`this section <ByteString>`) models one or more chunks of immutable
A ``ByteString`` (as explained in :ref:`this section <bytestring_java>`) models one or more chunks of immutable
in-memory data with a maximum (total) size of 2 GB (2^31 bytes).
Tcp.WriteFile

View file

@ -85,7 +85,7 @@ nacked messages it may need to keep a buffer of pending messages.
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
not error handling. In other words, data may still be lost, even if every write is acknowledged.
.. _ByteString:
.. _bytestring_java:
ByteString
^^^^^^^^^^

View file

@ -627,16 +627,25 @@ The deployment section of the configuration is passed to the constructor.
Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the router will be taken from
``Props`` as described in :ref:`dispatchers-java`. For a pool it
The dispatcher for created children of the pool will be taken from
``Props`` as described in :ref:`dispatchers-scala`. For a pool it
makes sense to configure the ``BalancingDispatcher`` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
To make it easy to define the dispatcher of the routees of the pool you can
define the dispatcher inline in the deployment section of the config.
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-pool-dispatcher
That is the only thing you need to do enable a dedicated dispatcher for a
pool.
.. note::
If you provide a collection of actors to route to, then they will still use the same dispatcher
If you use a group of actors and route to their paths, then they will still use the same dispatcher
that was configured for them in their ``Props``, it is not possible to change an actors dispatcher
after it has been created.

View file

@ -140,8 +140,19 @@ akka.actor.deployment {
}
#//#config-resize-pool
#//#config-pool-dispatcher
akka.actor.deployment {
/poolWithDispatcher {
router = random-pool
nr-of-instances = 5
pool-dispatcher {
type = BalancingDispatcher
}
}
}
#//#config-pool-dispatcher
router-dispatcher {}
workers-dispatcher {}
"""
case class Work(payload: String)
@ -351,10 +362,10 @@ class RouterDocSpec extends AkkaSpec(RouterDocSpec.config) with ImplicitSender {
"demonstrate dispatcher" in {
//#dispatchers
val router: ActorRef = system.actorOf(
// head will run on "router-dispatcher" dispatcher
RoundRobinPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker])
// Worker routees will run on "workers-dispatcher" dispatcher
.withDispatcher("workers-dispatcher"))
// head router actor will run on "router-dispatcher" dispatcher
// Worker routees will run on "pool-dispatcher" dispatcher
RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
name = "poolWithDispatcher")
//#dispatchers
}
@ -410,8 +421,8 @@ class RouterDocSpec extends AkkaSpec(RouterDocSpec.config) with ImplicitSender {
import akka.actor.{ Address, AddressFromURIString }
import akka.remote.routing.RemoteRouterConfig
val addresses = Seq(
Address("akka", "remotesys", "otherhost", 1234),
AddressFromURIString("akka://othersys@anotherhost:1234"))
Address("akka.tcp", "remotesys", "otherhost", 1234),
AddressFromURIString("akka.tcp://othersys@anotherhost:1234"))
val routerRemote = system.actorOf(
RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo]))
//#remoteRoutees

View file

@ -134,7 +134,7 @@ Once a connection has been established data can be sent to it from any actor in
Tcp.Write
The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event.
A ``ByteString`` (as explained in :ref:`this section <ByteString>`) models one or more chunks of immutable
A ``ByteString`` (as explained in :ref:`this section <bytestring_scala>`) models one or more chunks of immutable
in-memory data with a maximum (total) size of 2 GB (2^31 bytes).
Tcp.WriteFile

View file

@ -90,7 +90,7 @@ nacked messages it may need to keep a buffer of pending messages.
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
not error handling. In other words, data may still be lost, even if every write is acknowledged.
.. _ByteString:
.. _bytestring_scala:
ByteString
^^^^^^^^^^

View file

@ -626,16 +626,24 @@ The deployment section of the configuration is passed to the constructor.
Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the router will be taken from
The dispatcher for created children of the pool will be taken from
``Props`` as described in :ref:`dispatchers-scala`. For a pool it
makes sense to configure the ``BalancingDispatcher`` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
To make it easy to define the dispatcher of the routees of the pool you can
define the dispatcher inline in the deployment section of the config.
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-pool-dispatcher
That is the only thing you need to do enable a dedicated dispatcher for a
pool.
.. note::
If you provide a collection of actors to route to, then they will still use the same dispatcher
If you use a group of actors and route to their paths, then they will still use the same dispatcher
that was configured for them in their ``Props``, it is not possible to change an actors dispatcher
after it has been created.

View file

@ -54,7 +54,8 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten
// attachChild means that the provider will treat this call as if possibly done out of the wrong
// context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal
// choice in a corner case (and hence not worth fixing).
val ref = context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false)
val ref = context.asInstanceOf[ActorCell].attachChild(
local.enrichWithPoolDispatcher(routeeProps, context).withDeploy(deploy), name, systemService = false)
ActorRefRoutee(ref)
}