make supervisorStrategy of Router configurable, see #1835

- also do not scrap router’s children upon restart
- and add docs and tests
This commit is contained in:
Roland 2012-02-18 22:15:39 +01:00
parent c2376d01c0
commit 0f48b9f3eb
6 changed files with 300 additions and 6 deletions

View file

@ -53,6 +53,7 @@ object RoutingSpec {
}
}
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
}
}
@ -126,6 +127,44 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
system.stop(router)
}
"set supplied supervisorStrategy" in {
//#supervision
val escalator = OneForOneStrategy() {
//#custom-strategy
case e testActor ! e; SupervisorStrategy.Escalate
//#custom-strategy
}
val router = system.actorOf(Props.empty.withRouter(
RoundRobinRouter(1, supervisorStrategy = escalator)))
//#supervision
router ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 2) intercept {
expectMsgType[RouterRoutees].routees.head ! Kill
}
expectMsgType[ActorKilledException]
}
"default to all-for-one-always-escalate strategy" in {
val restarter = OneForOneStrategy() {
case e testActor ! e; SupervisorStrategy.Restart
}
val supervisor = system.actorOf(Props(new Supervisor(restarter)))
supervisor ! Props(new Actor {
def receive = {
case x: String throw new Exception(x)
}
override def postRestart(reason: Throwable): Unit = testActor ! "restarted"
}).withRouter(RoundRobinRouter(3))
val router = expectMsgType[ActorRef]
EventFilter[Exception]("die", occurrences = 2) intercept {
router ! "die"
}
expectMsgType[Exception].getMessage must be("die")
expectMsg("restarted")
expectMsg("restarted")
expectMsg("restarted")
}
}
"no router" must {
@ -542,6 +581,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
case class VoteCountRouter() extends RouterConfig {
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
//#crRoute
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {

View file

@ -171,7 +171,14 @@ trait RouterConfig {
def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer)
def createActor(): Router = new Router {}
def createActor(): Router = new Router {
override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy
}
/**
* SupervisorStrategy for the created Router actor.
*/
def supervisorStrategy: SupervisorStrategy
/**
* Dispatcher ID to use for running the head actor, i.e. the [[akka.routing.Router]].
@ -308,10 +315,19 @@ trait Router extends Actor {
def routerReceive: Receive = {
case _
}
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
}
}
private object Router {
case object Resize
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
@ -353,6 +369,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
case object NoRouter extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def routerDispatcher: String = ""
def supervisorStrategy = null
override def withFallback(other: RouterConfig): RouterConfig = other
}
@ -363,6 +380,7 @@ case object FromConfig extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
}
/**
@ -378,6 +396,8 @@ case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatch
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
}
object RoundRobinRouter {
@ -402,12 +422,40 @@ object RoundRobinRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with RoundRobinLike {
/**
@ -438,6 +486,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait RoundRobinLike { this: RouterConfig
@ -488,12 +542,40 @@ object RandomRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with RandomLike {
/**
@ -524,6 +606,12 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait RandomLike { this: RouterConfig
@ -580,12 +668,40 @@ object SmallestMailboxRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with SmallestMailboxLike {
/**
@ -616,6 +732,12 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait SmallestMailboxLike { this: RouterConfig
@ -731,12 +853,40 @@ object BroadcastRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with BroadcastLike {
/**
@ -767,6 +917,12 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait BroadcastLike { this: RouterConfig
@ -808,13 +964,41 @@ object ScatterGatherFirstCompletedRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with ScatterGatherFirstCompletedLike {
if (within <= Duration.Zero) throw new IllegalArgumentException(
@ -848,6 +1032,12 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait ScatterGatherFirstCompletedLike { this: RouterConfig

View file

@ -52,6 +52,16 @@ public class CustomRouterDocTestBase {
.withDispatcher("workers")); // MyActor workers run on "workers" dispatcher
//#dispatchers
}
@Test
public void demonstrateSupervisor() {
//#supervision
final SupervisorStrategy strategy = new OneForOneStrategy(5, Duration.parse("1 minute"),
new Class<? extends Throwable>[] { Exception });
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy)));
//#supervision
}
//#crTest
@Test
@ -123,6 +133,10 @@ public class CustomRouterDocTestBase {
@Override public String routerDispatcher() {
return Dispatchers.DefaultDispatcherId();
}
@Override public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.defaultStrategy();
}
//#crRoute
@Override

View file

@ -92,6 +92,29 @@ to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above.
Routers vs. Supervision
^^^^^^^^^^^^^^^^^^^^^^^
As explained in the previous section, routers create new actor instances as
children of the “head” router, who therefor also is their supervisor. The
supervisor strategy of this actor can be configured by means of the
:meth:`RouterConfig.supervisorStrategy` property, which is supported for all
built-in router types. It defaults to “always escalate”, which leads to the
application of the routers parents supervision directive to all children of
the router uniformly (i.e. not only the one which failed). It should be
mentioned that the router overrides the default behavior of terminating all
children upon restart, which means that a restart—while re-creating them—does
not have an effect on the number of actors in the pool.
Setting the strategy is easily done:
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java
:include: supervision
Another potentially useful approach is to give the router the same strategy as
its parent, which effectively treats all actors in the pool as if they were
direct children of their grand-parent instead.
Router usage
^^^^^^^^^^^^

View file

@ -92,6 +92,30 @@ to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above.
Routers vs. Supervision
^^^^^^^^^^^^^^^^^^^^^^^
As explained in the previous section, routers create new actor instances as
children of the “head” router, who therefor also is their supervisor. The
supervisor strategy of this actor can be configured by means of the
:meth:`RouterConfig.supervisorStrategy` property, which is supported for all
built-in router types. It defaults to “always escalate”, which leads to the
application of the routers parents supervision directive to all children of
the router uniformly (i.e. not only the one which failed). It should be
mentioned that the router overrides the default behavior of terminating all
children upon restart, which means that a restart—while re-creating them—does
not have an effect on the number of actors in the pool.
Setting the strategy is easily done:
.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
:include: supervision
:exclude: custom-strategy
Another potentially useful approach is to give the router the same strategy as
its parent, which effectively treats all actors in the pool as if they were
direct children of their grand-parent instead.
Router usage
^^^^^^^^^^^^

View file

@ -13,6 +13,7 @@ import akka.actor.Props
import akka.config.ConfigurationException
import akka.remote.RemoteScope
import akka.actor.AddressExtractor
import akka.actor.SupervisorStrategy
/**
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined
@ -30,6 +31,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) exte
override def createActor(): Router = local.createActor()
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
override def routerDispatcher: String = local.routerDispatcher
override def resizer: Option[Resizer] = local.resizer