diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index ad3702d556..645d68a47b 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -53,6 +53,7 @@ object RoutingSpec { } } def routerDispatcher: String = Dispatchers.DefaultDispatcherId + def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy } } @@ -126,6 +127,44 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with system.stop(router) } + "set supplied supervisorStrategy" in { + //#supervision + val escalator = OneForOneStrategy() { + //#custom-strategy + case e ⇒ testActor ! e; SupervisorStrategy.Escalate + //#custom-strategy + } + val router = system.actorOf(Props.empty.withRouter( + RoundRobinRouter(1, supervisorStrategy = escalator))) + //#supervision + router ! CurrentRoutees + EventFilter[ActorKilledException](occurrences = 2) intercept { + expectMsgType[RouterRoutees].routees.head ! Kill + } + expectMsgType[ActorKilledException] + } + + "default to all-for-one-always-escalate strategy" in { + val restarter = OneForOneStrategy() { + case e ⇒ testActor ! e; SupervisorStrategy.Restart + } + val supervisor = system.actorOf(Props(new Supervisor(restarter))) + supervisor ! Props(new Actor { + def receive = { + case x: String ⇒ throw new Exception(x) + } + override def postRestart(reason: Throwable): Unit = testActor ! "restarted" + }).withRouter(RoundRobinRouter(3)) + val router = expectMsgType[ActorRef] + EventFilter[Exception]("die", occurrences = 2) intercept { + router ! "die" + } + expectMsgType[Exception].getMessage must be("die") + expectMsg("restarted") + expectMsg("restarted") + expectMsg("restarted") + } + } "no router" must { @@ -542,6 +581,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with case class VoteCountRouter() extends RouterConfig { def routerDispatcher: String = Dispatchers.DefaultDispatcherId + def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy //#crRoute def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 44faa67d47..e988206e9f 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -171,7 +171,14 @@ trait RouterConfig { def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer) - def createActor(): Router = new Router {} + def createActor(): Router = new Router { + override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy + } + + /** + * SupervisorStrategy for the created Router actor. + */ + def supervisorStrategy: SupervisorStrategy /** * Dispatcher ID to use for running the “head” actor, i.e. the [[akka.routing.Router]]. @@ -308,10 +315,19 @@ trait Router extends Actor { def routerReceive: Receive = { case _ ⇒ } + + override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { + // do not scrap children + } } private object Router { + case object Resize + + val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case _ ⇒ SupervisorStrategy.Escalate + } } /** @@ -353,6 +369,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef) case object NoRouter extends RouterConfig { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null def routerDispatcher: String = "" + def supervisorStrategy = null override def withFallback(other: RouterConfig): RouterConfig = other } @@ -363,6 +380,7 @@ case object FromConfig extends RouterConfig { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") def routerDispatcher: String = Dispatchers.DefaultDispatcherId + def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy } /** @@ -378,6 +396,8 @@ case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatch def createRoute(props: Props, routeeProvider: RouteeProvider): Route = throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") + + def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy } object RoundRobinRouter { @@ -402,12 +422,40 @@ object RoundRobinRouter { * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * {{{ + * class MyActor extends Actor { + * override val supervisorStrategy = ... + * + * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) + * + * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) + * + * val specialChild = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { + * ... + * }))) + * } + * }}} + * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with RoundRobinLike { /** @@ -438,6 +486,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) } trait RoundRobinLike { this: RouterConfig ⇒ @@ -488,12 +542,40 @@ object RandomRouter { * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * {{{ + * class MyActor extends Actor { + * override val supervisorStrategy = ... + * + * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) + * + * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) + * + * val specialChild = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { + * ... + * }))) + * } + * }}} + * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with RandomLike { /** @@ -524,6 +606,12 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) } trait RandomLike { this: RouterConfig ⇒ @@ -580,12 +668,40 @@ object SmallestMailboxRouter { * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * {{{ + * class MyActor extends Actor { + * override val supervisorStrategy = ... + * + * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) + * + * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) + * + * val specialChild = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { + * ... + * }))) + * } + * }}} + * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with SmallestMailboxLike { /** @@ -616,6 +732,12 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) } trait SmallestMailboxLike { this: RouterConfig ⇒ @@ -731,12 +853,40 @@ object BroadcastRouter { * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * {{{ + * class MyActor extends Actor { + * override val supervisorStrategy = ... + * + * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) + * + * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) + * + * val specialChild = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { + * ... + * }))) + * } + * }}} + * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with BroadcastLike { /** @@ -767,6 +917,12 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) } trait BroadcastLike { this: RouterConfig ⇒ @@ -808,13 +964,41 @@ object ScatterGatherFirstCompletedRouter { * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * {{{ + * class MyActor extends Actor { + * override val supervisorStrategy = ... + * + * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) + * + * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) + * + * val specialChild = context.actorOf(Props[SomeActor].withRouter( + * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { + * ... + * }))) + * } + * }}} + * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId) + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with ScatterGatherFirstCompletedLike { if (within <= Duration.Zero) throw new IllegalArgumentException( @@ -848,6 +1032,12 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) } trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ diff --git a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java index a20a351f06..922195abd3 100644 --- a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java @@ -52,6 +52,16 @@ public class CustomRouterDocTestBase { .withDispatcher("workers")); // MyActor “workers” run on "workers" dispatcher //#dispatchers } + + @Test + public void demonstrateSupervisor() { + //#supervision + final SupervisorStrategy strategy = new OneForOneStrategy(5, Duration.parse("1 minute"), + new Class[] { Exception }); + final ActorRef router = system.actorOf(new Props(MyActor.class) + .withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy))); + //#supervision + } //#crTest @Test @@ -123,6 +133,10 @@ public class CustomRouterDocTestBase { @Override public String routerDispatcher() { return Dispatchers.DefaultDispatcherId(); } + + @Override public SupervisorStrategy supervisorStrategy() { + return SupervisorStrategy.defaultStrategy(); + } //#crRoute @Override diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index ebb219ae5d..aa1daa19db 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -92,6 +92,29 @@ to the actor hierarchy, changing the actor paths of all children of the router. The routees especially do need to know that they are routed to in order to choose the sender reference for any messages they dispatch as shown above. +Routers vs. Supervision +^^^^^^^^^^^^^^^^^^^^^^^ + +As explained in the previous section, routers create new actor instances as +children of the “head” router, who therefor also is their supervisor. The +supervisor strategy of this actor can be configured by means of the +:meth:`RouterConfig.supervisorStrategy` property, which is supported for all +built-in router types. It defaults to “always escalate”, which leads to the +application of the router’s parent’s supervision directive to all children of +the router uniformly (i.e. not only the one which failed). It should be +mentioned that the router overrides the default behavior of terminating all +children upon restart, which means that a restart—while re-creating them—does +not have an effect on the number of actors in the pool. + +Setting the strategy is easily done: + +.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java + :include: supervision + +Another potentially useful approach is to give the router the same strategy as +its parent, which effectively treats all actors in the pool as if they were +direct children of their grand-parent instead. + Router usage ^^^^^^^^^^^^ diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index dfae20fd6b..7071747374 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -92,6 +92,30 @@ to the actor hierarchy, changing the actor paths of all children of the router. The routees especially do need to know that they are routed to in order to choose the sender reference for any messages they dispatch as shown above. +Routers vs. Supervision +^^^^^^^^^^^^^^^^^^^^^^^ + +As explained in the previous section, routers create new actor instances as +children of the “head” router, who therefor also is their supervisor. The +supervisor strategy of this actor can be configured by means of the +:meth:`RouterConfig.supervisorStrategy` property, which is supported for all +built-in router types. It defaults to “always escalate”, which leads to the +application of the router’s parent’s supervision directive to all children of +the router uniformly (i.e. not only the one which failed). It should be +mentioned that the router overrides the default behavior of terminating all +children upon restart, which means that a restart—while re-creating them—does +not have an effect on the number of actors in the pool. + +Setting the strategy is easily done: + +.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala + :include: supervision + :exclude: custom-strategy + +Another potentially useful approach is to give the router the same strategy as +its parent, which effectively treats all actors in the pool as if they were +direct children of their grand-parent instead. + Router usage ^^^^^^^^^^^^ diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 3b1791db8e..afa94433aa 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -13,6 +13,7 @@ import akka.actor.Props import akka.config.ConfigurationException import akka.remote.RemoteScope import akka.actor.AddressExtractor +import akka.actor.SupervisorStrategy /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -30,6 +31,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) exte override def createActor(): Router = local.createActor() + override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy + override def routerDispatcher: String = local.routerDispatcher override def resizer: Option[Resizer] = local.resizer