fix some more comments and make Router dispatcher configurable

- assert locking balance when using Unsafe.instance.monitorExit
- add RouterConfig.routerDispatcher
- re-enable “busy” resizer test after switching to BalancingDispatcher
- document resizer asynchronicity and how to configure dispatchers
This commit is contained in:
Roland 2012-02-10 14:13:40 +01:00
parent bbe221e812
commit 36247b10fe
9 changed files with 170 additions and 29 deletions

View file

@ -24,6 +24,9 @@ object ResizerSpec {
}
}
}
bal-disp {
type = BalancingDispatcher
}
"""
class TestActor extends Actor {
@ -123,15 +126,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
current.routees.size must be(2)
}
/*
* TODO RK This test seems invalid to me, because it relies on that no resize() event is lost;
* this currently fails because I made resize() asynchronous (by sending a message to the
* Router), but it could also fail for concurrent send operations, i.e. when one of thread
* fails the resizeInProgress.compareAndSet(false, true) check.
*
* Either the test must be fixed/removed or resize() must be changed to be blocking.
*/
"resize when busy" ignore {
"resize when busy" in {
val busy = new TestLatch(1)
@ -141,7 +136,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
pressureThreshold = 0,
messagesPerResize = 1)
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp"))
val latch1 = new TestLatch(1)
router ! (latch1, busy)

View file

@ -15,6 +15,7 @@ import com.typesafe.config.ConfigFactory
import akka.pattern.ask
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
object RoutingSpec {
@ -51,6 +52,7 @@ object RoutingSpec {
case (sender, message) Nil
}
}
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
}
@ -539,6 +541,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
//#crRouter
case class VoteCountRouter() extends RouterConfig {
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
//#crRoute
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d")

View file

@ -796,7 +796,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e: AskTimeoutException) throw new AskTimeoutException(e.getMessage, e)
case Left(e: AskTimeoutException) throw new AskTimeoutException(e.getMessage, e) // to get meaningful stack trace
case Left(e) throw e
case Right(r) r
}

View file

@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import akka.jsr166y.ThreadLocalRandom
import akka.util.Unsafe
import akka.dispatch.Dispatchers
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -24,7 +25,7 @@ import akka.util.Unsafe
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
extends LocalActorRef(
_system,
_props.copy(creator = () _props.routerConfig.createActor()),
_props.copy(creator = () _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher),
_supervisor,
_path) {
@ -76,7 +77,10 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
r.resize(routeeProps, routeeProvider)
}
r
} finally Unsafe.instance.monitorExit(actorContext) // unblock Routers constructor
} finally {
assert(Thread.holdsLock(actorContext))
Unsafe.instance.monitorExit(actorContext) // unblock Routers constructor
}
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
@ -169,6 +173,11 @@ trait RouterConfig {
def createActor(): Router = new Router {}
/**
* Dispatcher ID to use for running the head actor, i.e. the [[akka.routing.Router]].
*/
def routerDispatcher: String
/**
* Overridable merge strategy, by default completely prefers this (i.e. no merge).
*/
@ -343,6 +352,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case object NoRouter extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def routerDispatcher: String = ""
override def withFallback(other: RouterConfig): RouterConfig = other
}
@ -352,13 +362,17 @@ case object NoRouter extends RouterConfig {
case object FromConfig extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
/**
* Java API: Router configuration which has no default, i.e. external configuration is required.
*
* This can be used when the dispatcher to be used for the head Router needs to be configured
* (defaults to default-dispatcher).
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class FromConfig() extends RouterConfig {
case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
}
@ -389,7 +403,8 @@ object RoundRobinRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with RoundRobinLike {
/**
@ -415,6 +430,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait RoundRobinLike { this: RouterConfig
@ -469,7 +489,8 @@ object RandomRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with RandomLike {
/**
@ -495,6 +516,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait RandomLike { this: RouterConfig
@ -555,7 +581,8 @@ object SmallestMailboxRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with SmallestMailboxLike {
/**
@ -581,6 +608,11 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait SmallestMailboxLike { this: RouterConfig
@ -700,7 +732,8 @@ object BroadcastRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with BroadcastLike {
/**
@ -727,6 +760,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait BroadcastLike { this: RouterConfig
@ -773,7 +810,8 @@ object ScatterGatherFirstCompletedRouter {
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None)
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig with ScatterGatherFirstCompletedLike {
if (within <= Duration.Zero) throw new IllegalArgumentException(
@ -802,6 +840,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Java API
*/
def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w)
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
}
trait ScatterGatherFirstCompletedLike { this: RouterConfig

View file

@ -17,6 +17,7 @@ import akka.util.Duration;
import akka.util.Timeout;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.dispatch.Dispatchers;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import static akka.pattern.Patterns.ask;
@ -39,6 +40,19 @@ public class CustomRouterDocTestBase {
system.shutdown();
}
public static class MyActor extends UntypedActor {
@Override public void onReceive(Object o) {}
}
@Test
public void demonstrateDispatchers() {
//#dispatchers
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withDispatcher("head")) // head router runs on "head" dispatcher
.withDispatcher("workers")); // MyActor workers run on "workers" dispatcher
//#dispatchers
}
//#crTest
@Test
public void countVotesAsIntendedNotAsInFlorida() {
@ -106,6 +120,10 @@ public class CustomRouterDocTestBase {
//#crRouter
public static class VoteCountRouter extends CustomRouterConfig {
@Override public String routerDispatcher() {
return Dispatchers.DefaultDispatcherId();
}
//#crRoute
@Override
public CustomRoute createCustomRoute(Props props, RouteeProvider routeeProvider) {

View file

@ -8,11 +8,6 @@ Routing (Java)
.. contents:: :local:
Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below:
Router
------
A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'.
@ -249,6 +244,16 @@ This is an example of how to programatically create a resizable router:
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value
will be used instead of any programmatically sent parameters.*
.. note::
Resizing is triggered by sending messages to the actor pool, but it is not
completed synchronously; instead a message is sent to the “head”
:class:`Router` to perform the size change. Thus you cannot rely on resizing
to instantaneously create new workers when all others are busy, because the
message just sent will be queued to the mailbox of a busy actor. To remedy
this, configure the pool to use a balancing dispatcher, see `Configuring
Dispatchers`_ for more information.
Custom Router
^^^^^^^^^^^^^
@ -312,3 +317,23 @@ A router with dynamically resizable number of routees is implemented by providin
in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration
of how to write your own resize strategy.
Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the router will be taken from
:class:`Props` as described in :ref:`dispatchers-java`. For a dynamic pool it
makes sense to configure the :class:`BalancingDispatcher` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
The “head” router, of couse, cannot run on the same balancing dispatcher,
because it does not process the same messages, hence this special actor does
not use the dispatcher configured in :class:`Props`, but takes the
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
the actor systems default dispatcher. All standard routers allow setting this
property in their constructor or factory method, custom routers have to
implement the method in a suitable way.
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#dispatchers

View file

@ -0,0 +1,29 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.routing
import RouterDocSpec.MyActor
import akka.actor.{ Props, Actor }
import akka.testkit.AkkaSpec
import akka.routing.RoundRobinRouter
object RouterDocSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
class RouterDocSpec extends AkkaSpec {
import RouterDocSpec._
//#dispatchers
val router = system.actorOf(Props[MyActor]
.withRouter(RoundRobinRouter(5, routerDispatcher = "router")) // head will run on "router" dispatcher
.withDispatcher("workers")) // MyActor workers will run on "workers" dispatcher
//#dispatchers
}

View file

@ -8,11 +8,6 @@ Routing (Scala)
.. contents:: :local:
Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below:
Router
------
A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'.
@ -250,6 +245,16 @@ This is an example of how to programatically create a resizable router:
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value
will be used instead of any programmatically sent parameters.*
.. note::
Resizing is triggered by sending messages to the actor pool, but it is not
completed synchronously; instead a message is sent to the “head”
:class:`Router` to perform the size change. Thus you cannot rely on resizing
to instantaneously create new workers when all others are busy, because the
message just sent will be queued to the mailbox of a busy actor. To remedy
this, configure the pool to use a balancing dispatcher, see `Configuring
Dispatchers`_ for more information.
Custom Router
^^^^^^^^^^^^^
@ -311,3 +316,23 @@ A router with dynamically resizable number of routees is implemented by providin
in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration
of how to write your own resize strategy.
Configuring Dispatchers
^^^^^^^^^^^^^^^^^^^^^^^
The dispatcher for created children of the router will be taken from
:class:`Props` as described in :ref:`dispatchers-scala`. For a dynamic pool it
makes sense to configure the :class:`BalancingDispatcher` if the precise
routing is not so important (i.e. no consistent hashing or round-robin is
required); this enables newly created routees to pick up work immediately by
stealing it from their siblings.
The “head” router, of couse, cannot run on the same balancing dispatcher,
because it does not process the same messages, hence this special actor does
not use the dispatcher configured in :class:`Props`, but takes the
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
the actor systems default dispatcher. All standard routers allow setting this
property in their constructor or factory method, custom routers have to
implement the method in a suitable way.
.. includecode:: code/akka/docs/routing/RouterDocSpec.scala#dispatchers

View file

@ -30,6 +30,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) exte
override def createActor(): Router = local.createActor()
override def routerDispatcher: String = local.routerDispatcher
override def resizer: Option[Resizer] = local.resizer
override def withFallback(other: RouterConfig): RouterConfig = other match {