Merge pull request #334 from jboner/wip-system-and-routers-∂π

Wip system and routers ∂π
This commit is contained in:
Roland Kuhn 2012-02-20 01:59:32 -08:00
commit ea45c8bdbb
14 changed files with 410 additions and 22 deletions

View file

@ -56,6 +56,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
"decorate a Receive" in { "decorate a Receive" in {
new TestKit(appLogging) { new TestKit(appLogging) {
system.eventStream.subscribe(testActor, classOf[Logging.Debug]) system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[UnhandledMessage])
val a = system.actorOf(Props(new Actor { val a = system.actorOf(Props(new Actor {
def receive = new LoggingReceive(Some("funky"), { def receive = new LoggingReceive(Some("funky"), {
case null case null
@ -63,6 +64,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
})) }))
a ! "hallo" a ! "hallo"
expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo")) expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo"))
expectMsgType[UnhandledMessage](1 second)
} }
} }

View file

@ -53,6 +53,7 @@ object RoutingSpec {
} }
} }
def routerDispatcher: String = Dispatchers.DefaultDispatcherId def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
} }
} }
@ -126,6 +127,44 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
system.stop(router) system.stop(router)
} }
"set supplied supervisorStrategy" in {
//#supervision
val escalator = OneForOneStrategy() {
//#custom-strategy
case e testActor ! e; SupervisorStrategy.Escalate
//#custom-strategy
}
val router = system.actorOf(Props.empty.withRouter(
RoundRobinRouter(1, supervisorStrategy = escalator)))
//#supervision
router ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 2) intercept {
expectMsgType[RouterRoutees].routees.head ! Kill
}
expectMsgType[ActorKilledException]
}
"default to all-for-one-always-escalate strategy" in {
val restarter = OneForOneStrategy() {
case e testActor ! e; SupervisorStrategy.Restart
}
val supervisor = system.actorOf(Props(new Supervisor(restarter)))
supervisor ! Props(new Actor {
def receive = {
case x: String throw new Exception(x)
}
override def postRestart(reason: Throwable): Unit = testActor ! "restarted"
}).withRouter(RoundRobinRouter(3))
val router = expectMsgType[ActorRef]
EventFilter[Exception]("die", occurrences = 2) intercept {
router ! "die"
}
expectMsgType[Exception].getMessage must be("die")
expectMsg("restarted")
expectMsg("restarted")
expectMsg("restarted")
}
} }
"no router" must { "no router" must {
@ -542,6 +581,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
case class VoteCountRouter() extends RouterConfig { case class VoteCountRouter() extends RouterConfig {
def routerDispatcher: String = Dispatchers.DefaultDispatcherId def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
//#crRoute //#crRoute
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {

View file

@ -93,7 +93,15 @@ trait ActorContext extends ActorRefFactory {
def sender: ActorRef def sender: ActorRef
/** /**
* Returns all supervised children. * Returns all supervised children; this method returns a view onto the
* internal collection of children. Targeted lookups should be using
* `actorFor` instead for performance reasons:
*
* {{{
* val badLookup = context.children find (_.path.name == "kid")
* // should better be expressed as:
* val goodLookup = context.actorFor("kid")
* }}}
*/ */
def children: Iterable[ActorRef] def children: Iterable[ActorRef]

View file

@ -380,14 +380,10 @@ class LocalActorRefProvider(
} }
} }
/* /**
* Guardians can be asked by ActorSystem to create children, i.e. top-level * Overridable supervision strategy to be used by the /user guardian.
* actors. Therefore these need to answer to these requests, forwarding any
* exceptions which might have occurred.
*/ */
private class Guardian extends Actor { protected def guardianSupervisionStrategy = {
override val supervisorStrategy = {
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
OneForOneStrategy() { OneForOneStrategy() {
case _: ActorKilledException Stop case _: ActorKilledException Stop
@ -396,6 +392,15 @@ class LocalActorRefProvider(
} }
} }
/*
* Guardians can be asked by ActorSystem to create children, i.e. top-level
* actors. Therefore these need to answer to these requests, forwarding any
* exceptions which might have occurred.
*/
private class Guardian extends Actor {
override val supervisorStrategy = guardianSupervisionStrategy
def receive = { def receive = {
case Terminated(_) context.stop(self) case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
@ -408,12 +413,27 @@ class LocalActorRefProvider(
override def preRestart(cause: Throwable, msg: Option[Any]) {} override def preRestart(cause: Throwable, msg: Option[Any]) {}
} }
/**
* Overridable supervision strategy to be used by the /system guardian.
*/
protected def systemGuardianSupervisionStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
/* /*
* Guardians can be asked by ActorSystem to create children, i.e. top-level * Guardians can be asked by ActorSystem to create children, i.e. top-level
* actors. Therefore these need to answer to these requests, forwarding any * actors. Therefore these need to answer to these requests, forwarding any
* exceptions which might have occurred. * exceptions which might have occurred.
*/ */
private class SystemGuardian extends Actor { private class SystemGuardian extends Actor {
override val supervisorStrategy = systemGuardianSupervisionStrategy
def receive = { def receive = {
case Terminated(_) case Terminated(_)
eventStream.stopDefaultLoggers() eventStream.stopDefaultLoggers()

View file

@ -500,7 +500,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) } def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) }
def awaitTermination() = awaitTermination(Duration.Inf) def awaitTermination() = awaitTermination(Duration.Inf)
def shutdown(): Unit = stop(guardian) def shutdown(): Unit = guardian.stop()
/** /**
* Create the scheduler service. This one needs one special behavior: if * Create the scheduler service. This one needs one special behavior: if

View file

@ -135,7 +135,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
* Decider builder which just checks whether one of * Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider = def makeDecider(trapExit: Array[Class[_]]): Decider =
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate } { case x if (trapExit exists (_ isInstance x)) Restart else Escalate }
/** /**
@ -248,7 +248,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/* /*
@ -294,7 +294,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/* /*

View file

@ -171,7 +171,14 @@ trait RouterConfig {
def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer) def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer)
def createActor(): Router = new Router {} def createActor(): Router = new Router {
override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy
}
/**
* SupervisorStrategy for the created Router actor.
*/
def supervisorStrategy: SupervisorStrategy
/** /**
* Dispatcher ID to use for running the head actor, i.e. the [[akka.routing.Router]]. * Dispatcher ID to use for running the head actor, i.e. the [[akka.routing.Router]].
@ -308,10 +315,19 @@ trait Router extends Actor {
def routerReceive: Receive = { def routerReceive: Receive = {
case _ case _
} }
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
}
} }
private object Router { private object Router {
case object Resize case object Resize
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
} }
/** /**
@ -353,6 +369,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
case object NoRouter extends RouterConfig { case object NoRouter extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def routerDispatcher: String = "" def routerDispatcher: String = ""
def supervisorStrategy = null
override def withFallback(other: RouterConfig): RouterConfig = other override def withFallback(other: RouterConfig): RouterConfig = other
} }
@ -363,6 +380,7 @@ case object FromConfig extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def routerDispatcher: String = Dispatchers.DefaultDispatcherId def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
} }
/** /**
@ -378,6 +396,8 @@ case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatch
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
} }
object RoundRobinRouter { object RoundRobinRouter {
@ -402,12 +422,40 @@ object RoundRobinRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
* *
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up * @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]] * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed //TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId) val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with RoundRobinLike { extends RouterConfig with RoundRobinLike {
/** /**
@ -438,6 +486,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* Java API for setting routerDispatcher * Java API for setting routerDispatcher
*/ */
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
} }
trait RoundRobinLike { this: RouterConfig trait RoundRobinLike { this: RouterConfig
@ -488,12 +542,40 @@ object RandomRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
* *
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up * @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]] * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed //TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId) val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with RandomLike { extends RouterConfig with RandomLike {
/** /**
@ -524,6 +606,12 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* Java API for setting routerDispatcher * Java API for setting routerDispatcher
*/ */
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
} }
trait RandomLike { this: RouterConfig trait RandomLike { this: RouterConfig
@ -580,12 +668,40 @@ object SmallestMailboxRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
* *
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up * @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]] * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed //TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId) val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with SmallestMailboxLike { extends RouterConfig with SmallestMailboxLike {
/** /**
@ -616,6 +732,12 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* Java API for setting routerDispatcher * Java API for setting routerDispatcher
*/ */
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
} }
trait SmallestMailboxLike { this: RouterConfig trait SmallestMailboxLike { this: RouterConfig
@ -731,12 +853,40 @@ object BroadcastRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
* *
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up * @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]] * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed //TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId) val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with BroadcastLike { extends RouterConfig with BroadcastLike {
/** /**
@ -767,6 +917,12 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
* Java API for setting routerDispatcher * Java API for setting routerDispatcher
*/ */
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
} }
trait BroadcastLike { this: RouterConfig trait BroadcastLike { this: RouterConfig
@ -808,13 +964,41 @@ object ScatterGatherFirstCompletedRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
* *
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up * @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]] * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed //TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId) val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with ScatterGatherFirstCompletedLike { extends RouterConfig with ScatterGatherFirstCompletedLike {
if (within <= Duration.Zero) throw new IllegalArgumentException( if (within <= Duration.Zero) throw new IllegalArgumentException(
@ -848,6 +1032,12 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Java API for setting routerDispatcher * Java API for setting routerDispatcher
*/ */
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
} }
trait ScatterGatherFirstCompletedLike { this: RouterConfig trait ScatterGatherFirstCompletedLike { this: RouterConfig

View file

@ -53,6 +53,16 @@ public class CustomRouterDocTestBase {
//#dispatchers //#dispatchers
} }
@Test
public void demonstrateSupervisor() {
//#supervision
final SupervisorStrategy strategy = new OneForOneStrategy(5, Duration.parse("1 minute"),
new Class<?>[] { Exception.class });
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy)));
//#supervision
}
//#crTest //#crTest
@Test @Test
public void countVotesAsIntendedNotAsInFlorida() throws Exception { public void countVotesAsIntendedNotAsInFlorida() throws Exception {
@ -124,6 +134,10 @@ public class CustomRouterDocTestBase {
return Dispatchers.DefaultDispatcherId(); return Dispatchers.DefaultDispatcherId();
} }
@Override public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.defaultStrategy();
}
//#crRoute //#crRoute
@Override @Override
public CustomRoute createCustomRoute(Props props, RouteeProvider routeeProvider) { public CustomRoute createCustomRoute(Props props, RouteeProvider routeeProvider) {

View file

@ -92,6 +92,29 @@ to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above. choose the sender reference for any messages they dispatch as shown above.
Routers vs. Supervision
^^^^^^^^^^^^^^^^^^^^^^^
As explained in the previous section, routers create new actor instances as
children of the “head” router, who therefor also is their supervisor. The
supervisor strategy of this actor can be configured by means of the
:meth:`RouterConfig.supervisorStrategy` property, which is supported for all
built-in router types. It defaults to “always escalate”, which leads to the
application of the routers parents supervision directive to all children of
the router uniformly (i.e. not only the one which failed). It should be
mentioned that the router overrides the default behavior of terminating all
children upon restart, which means that a restart—while re-creating them—does
not have an effect on the number of actors in the pool.
Setting the strategy is easily done:
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java
:include: supervision
Another potentially useful approach is to give the router the same strategy as
its parent, which effectively treats all actors in the pool as if they were
direct children of their grand-parent instead.
Router usage Router usage
^^^^^^^^^^^^ ^^^^^^^^^^^^

View file

@ -92,6 +92,30 @@ to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above. choose the sender reference for any messages they dispatch as shown above.
Routers vs. Supervision
^^^^^^^^^^^^^^^^^^^^^^^
As explained in the previous section, routers create new actor instances as
children of the “head” router, who therefor also is their supervisor. The
supervisor strategy of this actor can be configured by means of the
:meth:`RouterConfig.supervisorStrategy` property, which is supported for all
built-in router types. It defaults to “always escalate”, which leads to the
application of the routers parents supervision directive to all children of
the router uniformly (i.e. not only the one which failed). It should be
mentioned that the router overrides the default behavior of terminating all
children upon restart, which means that a restart—while re-creating them—does
not have an effect on the number of actors in the pool.
Setting the strategy is easily done:
.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
:include: supervision
:exclude: custom-strategy
Another potentially useful approach is to give the router the same strategy as
its parent, which effectively treats all actors in the pool as if they were
direct children of their grand-parent instead.
Router usage Router usage
^^^^^^^^^^^^ ^^^^^^^^^^^^

View file

@ -421,6 +421,14 @@ using a small example:
.. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala
:include: imports-test-probe,my-double-echo,test-probe :include: imports-test-probe,my-double-echo,test-probe
Here a the system under test is simulated by :class:`MyDoubleEcho`, which is
supposed to mirror its input to two outputs. Attaching two test probes enables
verification of the (simplistic) behavior. Another example would be two actors
A and B which collaborate by A sending messages to B. In order to verify this
message flow, a :class:`TestProbe` could be inserted as target of A, using the
forwarding capabilities or auto-pilot described below to include a real B in
the test setup.
Probes may also be equipped with custom assertions to make your test code even Probes may also be equipped with custom assertions to make your test code even
more concise and clear: more concise and clear:
@ -455,6 +463,21 @@ network functioning:
The ``dest`` actor will receive the same message invocation as if no test probe The ``dest`` actor will receive the same message invocation as if no test probe
had intervened. had intervened.
Auto-Pilot
^^^^^^^^^^
Receiving messages in a queue for later inspection is nice, but in order to
keep a test running and verify traces later you can also install an
:class:`AutoPilot` in the participating test probes (actually in any
:class:`TestKit`) which is invoked before enqueueing to the inspection queue.
This code can be used to forward messages, e.g. in a chain ``A --> Probe -->
B``, as long as a certain protocol is obeyed.
.. includecode:: ../../akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala#autopilot
The :meth:`run` method must return the auto-pilot for the next message, wrapped
in an :class:`Option`; setting it to :obj:`None` terminates the auto-pilot.
Caution about Timing Assertions Caution about Timing Assertions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -13,6 +13,7 @@ import akka.actor.Props
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.remote.RemoteScope import akka.remote.RemoteScope
import akka.actor.AddressExtractor import akka.actor.AddressExtractor
import akka.actor.SupervisorStrategy
/** /**
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined * [[akka.routing.RouterConfig]] implementation for remote deployment on defined
@ -30,6 +31,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) exte
override def createActor(): Router = local.createActor() override def createActor(): Router = local.createActor()
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
override def routerDispatcher: String = local.routerDispatcher override def routerDispatcher: String = local.routerDispatcher
override def resizer: Option[Resizer] = local.resizer override def resizer: Option[Resizer] = local.resizer

View file

@ -16,9 +16,14 @@ import akka.util.Timeout
object TestActor { object TestActor {
type Ignore = Option[PartialFunction[AnyRef, Boolean]] type Ignore = Option[PartialFunction[AnyRef, Boolean]]
trait AutoPilot {
def run(sender: ActorRef, msg: Any): Option[AutoPilot]
}
case class SetIgnore(i: Ignore) case class SetIgnore(i: Ignore)
case class Watch(ref: ActorRef) case class Watch(ref: ActorRef)
case class UnWatch(ref: ActorRef) case class UnWatch(ref: ActorRef)
case class SetAutoPilot(ap: AutoPilot)
trait Message { trait Message {
def msg: AnyRef def msg: AnyRef
@ -36,11 +41,15 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
var ignore: Ignore = None var ignore: Ignore = None
var autopilot: Option[AutoPilot] = None
def receive = { def receive = {
case SetIgnore(ign) ignore = ign case SetIgnore(ign) ignore = ign
case x @ Watch(ref) context.watch(ref); queue.offerLast(RealMessage(x, self)) case x @ Watch(ref) context.watch(ref); queue.offerLast(RealMessage(x, self))
case x @ UnWatch(ref) context.unwatch(ref); queue.offerLast(RealMessage(x, self)) case x @ UnWatch(ref) context.unwatch(ref); queue.offerLast(RealMessage(x, self))
case SetAutoPilot(pilot) autopilot = Some(pilot)
case x: AnyRef case x: AnyRef
autopilot = autopilot.flatMap(_.run(sender, x))
val observe = ignore map (ignoreFunc if (ignoreFunc isDefinedAt x) !ignoreFunc(x) else true) getOrElse true val observe = ignore map (ignoreFunc if (ignoreFunc isDefinedAt x) !ignoreFunc(x) else true) getOrElse true
if (observe) queue.offerLast(RealMessage(x, sender)) if (observe) queue.offerLast(RealMessage(x, sender))
} }
@ -148,6 +157,13 @@ class TestKit(_system: ActorSystem) {
expectMsg(msg) expectMsg(msg)
} }
/**
* Install an AutoPilot to drive the testActor: the AutoPilot will be run
* for each received message and can be used to send or forward messages,
* etc. Each invocation must return the AutoPilot for the next round.
*/
def setAutoPilot(pilot: TestActor.AutoPilot): Unit = testActor ! TestActor.SetAutoPilot(pilot)
/** /**
* Obtain current time (`System.nanoTime`) as Duration. * Obtain current time (`System.nanoTime`) as Duration.
*/ */

View file

@ -40,6 +40,31 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout {
probe1.expectMsg(0 millis, "world") probe1.expectMsg(0 millis, "world")
} }
"have an AutoPilot" in {
//#autopilot
val probe = TestProbe()
probe.setAutoPilot(new TestActor.AutoPilot {
def run(sender: ActorRef, msg: Any): Option[TestActor.AutoPilot] =
msg match {
case "stop" None
case x testActor.tell(x, sender); Some(this)
}
})
//#autopilot
probe.ref ! "hallo"
probe.ref ! "welt"
probe.ref ! "stop"
expectMsg("hallo")
expectMsg("welt")
probe.expectMsg("hallo")
probe.expectMsg("welt")
probe.expectMsg("stop")
probe.ref ! "hallo"
probe.expectMsg("hallo")
testActor ! "end"
expectMsg("end") // verify that "hallo" did not get through
}
} }
} }