Merge branch 'wip-1804-router-create-race-∂π'

This commit is contained in:
Roland 2012-02-10 14:38:51 +01:00
commit 6786e45dc7
11 changed files with 288 additions and 77 deletions

View file

@ -50,6 +50,8 @@ object ActorModelSpec {
case object Restart extends ActorModelMessage
case object DoubleStop extends ActorModelMessage
case class ThrowException(e: Throwable) extends ActorModelMessage
val Ping = "Ping"
@ -86,6 +88,7 @@ object ActorModelSpec {
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!")
case ThrowException(e: Throwable) ack; busy.switchOff(); throw e
case DoubleStop ack; context.stop(self); context.stop(self); busy.switchOff
}
}
@ -190,13 +193,13 @@ object ActorModelSpec {
}
def assertRef(actorRef: ActorRef, dispatcher: MessageDispatcher = null)(
suspensions: Long = statsFor(actorRef).suspensions.get(),
resumes: Long = statsFor(actorRef).resumes.get(),
registers: Long = statsFor(actorRef).registers.get(),
unregisters: Long = statsFor(actorRef).unregisters.get(),
msgsReceived: Long = statsFor(actorRef).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(),
restarts: Long = statsFor(actorRef).restarts.get())(implicit system: ActorSystem) {
suspensions: Long = statsFor(actorRef, dispatcher).suspensions.get(),
resumes: Long = statsFor(actorRef, dispatcher).resumes.get(),
registers: Long = statsFor(actorRef, dispatcher).registers.get(),
unregisters: Long = statsFor(actorRef, dispatcher).unregisters.get(),
msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(),
restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) {
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher))
val deadline = System.currentTimeMillis + 1000
try {
@ -426,6 +429,14 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
assert(f5.value.isEmpty)
}
}
"not double-deregister" in {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id)
a ! DoubleStop
awaitCond(statsFor(a, dispatcher).registers.get == 1)
awaitCond(statsFor(a, dispatcher).unregisters.get == 1)
}
}
}

View file

@ -24,6 +24,9 @@ object ResizerSpec {
}
}
}
bal-disp {
type = BalancingDispatcher
}
"""
class TestActor extends Actor {
@ -133,7 +136,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
pressureThreshold = 0,
messagesPerResize = 1)
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp"))
val latch1 = new TestLatch(1)
router ! (latch1, busy)
@ -179,10 +182,10 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = {
(10 millis).dilated.sleep
(100 millis).dilated.sleep
for (m 0 until loops) {
router.!((t, latch, count))
(10 millis).dilated.sleep
(100 millis).dilated.sleep
}
}
@ -198,7 +201,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
// a whole bunch should max it out
val count2 = new AtomicInteger
val latch2 = TestLatch(10)
loop(10, 200, latch2, count2)
loop(10, 500, latch2, count2)
Await.ready(latch2, TestLatch.DefaultTimeout)
count2.get must be(10)
@ -238,7 +241,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
// let it cool down
for (m 0 to 3) {
router ! 1
(200 millis).dilated.sleep
(500 millis).dilated.sleep
}
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be < (z)

View file

@ -15,6 +15,7 @@ import com.typesafe.config.ConfigFactory
import akka.pattern.ask
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
object RoutingSpec {
@ -51,6 +52,7 @@ object RoutingSpec {
case (sender, message) Nil
}
}
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
}
@ -539,6 +541,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
//#crRouter
case class VoteCountRouter() extends RouterConfig {
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
//#crRoute
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d")

View file

@ -20,6 +20,7 @@ import akka.event.Logging.Debug
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
import akka.pattern.AskTimeoutException
object Await {
@ -823,8 +824,9 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e) throw e
case Right(r) r
case Left(e: AskTimeoutException) throw new AskTimeoutException(e.getMessage, e) // to get meaningful stack trace
case Left(e) throw e
case Right(r) r
}
def value: Option[Either[Throwable, T]] = getState match {

View file

@ -189,7 +189,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
final def processAllSystemMessages() {
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
actor systemInvoke nextMessage
nextMessage = nextMessage.next

View file

@ -9,15 +9,14 @@ import akka.util.duration._
import akka.config.ConfigurationException
import akka.pattern.pipe
import akka.pattern.AskSupport
import com.typesafe.config.Config
import scala.collection.JavaConversions.iterableAsScalaIterable
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import akka.jsr166y.ThreadLocalRandom
import akka.util.Unsafe
import akka.dispatch.Dispatchers
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -26,25 +25,88 @@ import akka.jsr166y.ThreadLocalRandom
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
extends LocalActorRef(
_system,
_props.copy(creator = () _props.routerConfig.createActor()),
_props.copy(creator = () _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher),
_supervisor,
_path) {
private val routeeProps = _props.copy(routerConfig = NoRouter)
private val resizeProgress = new AtomicBoolean
/*
* CAUTION: RoutedActorRef is PROBLEMATIC
* ======================================
*
* We are constructing/assembling the children outside of the scope of the
* Router actor, inserting them in its childrenRef list, which is not at all
* synchronized. This is done exactly once at start-up, all other accesses
* are done from the Router actor. This means that the only thing which is
* really hairy is making sure that the Router does not touch its childrenRefs
* before we are done with them: lock the monitor of the actor cell (hence the
* override of newActorCell) and use that to block the Router constructor for
* as long as it takes to setup the RoutedActorRef itself.
*/
override def newActorCell(
system: ActorSystemImpl,
ref: InternalActorRef,
props: Props,
supervisor: InternalActorRef,
receiveTimeout: Option[Duration]): ActorCell =
{
val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout)
Unsafe.instance.monitorEnter(cell)
cell
}
private[akka] val routerConfig = _props.routerConfig
private[akka] val routeeProps = _props.copy(routerConfig = NoRouter)
private[akka] val resizeInProgress = new AtomicBoolean
private val resizeCounter = new AtomicLong
@volatile
private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute
def routees = _routees
@volatile
private var _routeeProvider: RouteeProvider = _
def routeeProvider = _routeeProvider
val route =
try {
_routeeProvider = routerConfig.createRouteeProvider(actorContext)
val r = routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send
routerConfig.resizer foreach { r
if (r.isTimeForResize(resizeCounter.getAndIncrement()))
r.resize(routeeProps, routeeProvider)
}
r
} finally {
assert(Thread.holdsLock(actorContext))
Unsafe.instance.monitorExit(actorContext) // unblock Routers constructor
}
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
/*
* end of construction
*/
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Destination(this, this) :: Nil
case Terminated(_) Destination(this, this) :: Nil
case CurrentRoutees
sender ! RouterRoutees(_routees)
Nil
case _
if (route.isDefinedAt(sender, message)) route(sender, message)
else Nil
}
/**
* Adds the routees to existing routees.
* Adds death watch of the routees so that they are removed when terminated.
* Not thread safe, but intended to be called from protected points, such as
* `RouterConfig.createRoute` and `Resizer.resize`
*/
private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]) {
private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = {
_routees = _routees ++ newRoutees
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
newRoutees foreach underlying.watch
@ -56,34 +118,11 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
* Not thread safe, but intended to be called from protected points, such as
* `Resizer.resize`
*/
private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) {
private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]): Unit = {
_routees = _routees diff abandonedRoutees
abandonedRoutees foreach underlying.unwatch
}
private val routeeProvider = _props.routerConfig.createRouteeProvider(actorContext)
val route = _props.routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send
resize()
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Nil
case Terminated(_) Nil
case CurrentRoutees
sender ! RouterRoutees(_routees)
Nil
case _
if (route.isDefinedAt(sender, message)) route(sender, message)
else Nil
}
if (_props.routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!")
_routees match {
case x _routees = x // volatile write to publish the route before sending messages
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = {
resize()
@ -95,20 +134,15 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
}
applyRoute(s, message) match {
case Nil super.!(message)(s)
case refs refs foreach (p p.recipient.!(msg)(p.sender))
case Destination(_, x) :: Nil if x eq this super.!(message)(s)
case refs refs foreach (p p.recipient.!(msg)(p.sender))
}
}
def resize() {
for (r _props.routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) {
try {
r.resize(routeeProps, routeeProvider)
} finally {
resizeProgress.set(false)
}
}
def resize(): Unit = {
for (r routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true))
super.!(Router.Resize)
}
}
}
@ -139,6 +173,11 @@ trait RouterConfig {
def createActor(): Router = new Router {}
/**
* Dispatcher ID to use for running the head actor, i.e. the [[akka.routing.Router]].
*/
def routerDispatcher: String
/**
* Overridable merge strategy, by default completely prefers this (i.e. no merge).
*/
@ -246,13 +285,20 @@ trait CustomRoute {
*/
trait Router extends Actor {
val ref = self match {
case x: RoutedActorRef x
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef")
// make sure that we synchronize properly to get the childrenRefs into our CPU cache
val ref = context.synchronized {
self match {
case x: RoutedActorRef x
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef")
}
}
final def receive = ({
case Router.Resize
try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider))
finally assert(ref.resizeInProgress.getAndSet(false))
case Terminated(child)
ref.removeRoutees(IndexedSeq(child))
if (ref.routees.isEmpty) context.stop(self)
@ -264,6 +310,10 @@ trait Router extends Actor {
}
}
private object Router {
case object Resize
}
/**
* Used to broadcast a message to all connections in a router; only the
* contained message will be forwarded, i.e. the `Broadcast(...)`
@ -302,6 +352,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case object NoRouter extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def routerDispatcher: String = ""
override def withFallback(other: RouterConfig): RouterConfig = other
}
@ -311,13 +362,17 @@ case object NoRouter extends RouterConfig {
case object FromConfig extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
/**
* Java API: Router configuration which has no default, i.e. external configuration is required.
*
* This can be used when the dispatcher to be used for the head Router needs to be configured
* (defaults to default-dispatcher).
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class FromConfig() extends RouterConfig {
case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
}
@ -348,7 +403,8 @@ object RoundRobinRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with RoundRobinLike {
/**
@ -374,6 +430,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait RoundRobinLike { this: RouterConfig
@ -428,7 +489,8 @@ object RandomRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with RandomLike {
/**
@ -454,6 +516,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait RandomLike { this: RouterConfig
@ -514,7 +581,8 @@ object SmallestMailboxRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with SmallestMailboxLike {
/**
@ -540,6 +608,11 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait SmallestMailboxLike { this: RouterConfig
@ -659,7 +732,8 @@ object BroadcastRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with BroadcastLike {
/**
@ -686,6 +760,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait BroadcastLike { this: RouterConfig
@ -732,7 +810,8 @@ object ScatterGatherFirstCompletedRouter {
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None)
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with ScatterGatherFirstCompletedLike {
if (within <= Duration.Zero) throw new IllegalArgumentException(
@ -761,6 +840,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Java API
*/
def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w)
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait ScatterGatherFirstCompletedLike { this: RouterConfig
@ -795,14 +879,22 @@ trait Resizer {
* for the initial resize and continues with 1 for the first message. Make sure to perform
* initial resize before first message (messageCounter == 0), because there is no guarantee
* that resize will be done when concurrent messages are in play.
*
* CAUTION: this method is invoked from the thread which tries to send a
* message to the pool, i.e. the ActorRef.!() method, hence it may be called
* concurrently.
*/
def isTimeForResize(messageCounter: Long): Boolean
/**
* Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize`
* returns true and no other resize is in progress.
* Create and register more routees with `routeeProvider.registerRoutees(newRoutees)
* or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and
* sending [[akka.actor.PoisonPill]] to them.
*
* This method is invoked only in the context of the Router actor in order to safely
* create/stop children.
*/
def resize(props: Props, routeeProvider: RouteeProvider)
}

View file

@ -17,6 +17,7 @@ import akka.util.Duration;
import akka.util.Timeout;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.dispatch.Dispatchers;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import static akka.pattern.Patterns.ask;
@ -39,6 +40,19 @@ public class CustomRouterDocTestBase {
system.shutdown();
}
public static class MyActor extends UntypedActor {
@Override public void onReceive(Object o) {}
}
@Test
public void demonstrateDispatchers() {
//#dispatchers
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withDispatcher("head")) // head router runs on "head" dispatcher
.withDispatcher("workers")); // MyActor workers run on "workers" dispatcher
//#dispatchers
}
//#crTest
@Test
public void countVotesAsIntendedNotAsInFlorida() {
@ -106,6 +120,10 @@ public class CustomRouterDocTestBase {
//#crRouter
public static class VoteCountRouter extends CustomRouterConfig {
@Override public String routerDispatcher() {
return Dispatchers.DefaultDispatcherId();
}
//#crRoute
@Override
public CustomRoute createCustomRoute(Props props, RouteeProvider routeeProvider) {

View file

@ -8,11 +8,6 @@ Routing (Java)
.. contents:: :local:
Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below:
Router
------
A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'.
@ -249,6 +244,16 @@ This is an example of how to programatically create a resizable router:
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value
will be used instead of any programmatically sent parameters.*
.. note::
Resizing is triggered by sending messages to the actor pool, but it is not
completed synchronously; instead a message is sent to the “head”
:class:`Router` to perform the size change. Thus you cannot rely on resizing
to instantaneously create new workers when all others are busy, because the
message just sent will be queued to the mailbox of a busy actor. To remedy
this, configure the pool to use a balancing dispatcher, see `Configuring
Dispatchers`_ for more information.
Custom Router
^^^^^^^^^^^^^
@ -312,3 +317,23 @@ A router with dynamically resizable number of routees is implemented by providin
in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration
of how to write your own resize strategy.
Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the router will be taken from
:class:`Props` as described in :ref:`dispatchers-java`. For a dynamic pool it
makes sense to configure the :class:`BalancingDispatcher` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
The “head” router, of couse, cannot run on the same balancing dispatcher,
because it does not process the same messages, hence this special actor does
not use the dispatcher configured in :class:`Props`, but takes the
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
the actor systems default dispatcher. All standard routers allow setting this
property in their constructor or factory method, custom routers have to
implement the method in a suitable way.
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#dispatchers

View file

@ -0,0 +1,29 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.routing
import RouterDocSpec.MyActor
import akka.actor.{ Props, Actor }
import akka.testkit.AkkaSpec
import akka.routing.RoundRobinRouter
object RouterDocSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
class RouterDocSpec extends AkkaSpec {
import RouterDocSpec._
//#dispatchers
val router = system.actorOf(Props[MyActor]
.withRouter(RoundRobinRouter(5, routerDispatcher = "router")) // head will run on "router" dispatcher
.withDispatcher("workers")) // MyActor workers will run on "workers" dispatcher
//#dispatchers
}

View file

@ -8,11 +8,6 @@ Routing (Scala)
.. contents:: :local:
Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below:
Router
------
A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'.
@ -250,6 +245,16 @@ This is an example of how to programatically create a resizable router:
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value
will be used instead of any programmatically sent parameters.*
.. note::
Resizing is triggered by sending messages to the actor pool, but it is not
completed synchronously; instead a message is sent to the “head”
:class:`Router` to perform the size change. Thus you cannot rely on resizing
to instantaneously create new workers when all others are busy, because the
message just sent will be queued to the mailbox of a busy actor. To remedy
this, configure the pool to use a balancing dispatcher, see `Configuring
Dispatchers`_ for more information.
Custom Router
^^^^^^^^^^^^^
@ -311,3 +316,23 @@ A router with dynamically resizable number of routees is implemented by providin
in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration
of how to write your own resize strategy.
Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the router will be taken from
:class:`Props` as described in :ref:`dispatchers-scala`. For a dynamic pool it
makes sense to configure the :class:`BalancingDispatcher` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
The “head” router, of couse, cannot run on the same balancing dispatcher,
because it does not process the same messages, hence this special actor does
not use the dispatcher configured in :class:`Props`, but takes the
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
the actor systems default dispatcher. All standard routers allow setting this
property in their constructor or factory method, custom routers have to
implement the method in a suitable way.
.. includecode:: code/akka/docs/routing/RouterDocSpec.scala#dispatchers

View file

@ -30,6 +30,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) exte
override def createActor(): Router = local.createActor()
override def routerDispatcher: String = local.routerDispatcher
override def resizer: Option[Resizer] = local.resizer
override def withFallback(other: RouterConfig): RouterConfig = other match {