/** * Copyright (C) 2009-2012 Typesafe Inc. */ package akka.routing import akka.actor._ import akka.util.Duration import akka.util.duration._ import akka.ConfigurationException import akka.pattern.pipe import com.typesafe.config.Config import scala.collection.JavaConversions.iterableAsScalaIterable import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit import akka.jsr166y.ThreadLocalRandom import akka.util.Unsafe import akka.dispatch.Dispatchers import scala.annotation.tailrec import scala.runtime.ScalaRunTime /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to * send a message to on (or more) of these actors. */ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) extends LocalActorRef( _system, _props.copy(creator = () ⇒ _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher), _supervisor, _path) { // verify that a BalancingDispatcher is not used with a Router if (_system.dispatchers.isBalancingDispatcher(_props.dispatcher) && _props.routerConfig != NoRouter) throw new ConfigurationException( "Configuration for actor [" + _path.toString + "] is invalid - you can not use a 'BalancingDispatcher' together with any type of 'Router'") /* * CAUTION: RoutedActorRef is PROBLEMATIC * ====================================== * * We are constructing/assembling the children outside of the scope of the * Router actor, inserting them in its childrenRef list, which is not at all * synchronized. This is done exactly once at start-up, all other accesses * are done from the Router actor. This means that the only thing which is * really hairy is making sure that the Router does not touch its childrenRefs * before we are done with them: lock the monitor of the actor cell (hence the * override of newActorCell) and use that to block the Router constructor for * as long as it takes to setup the RoutedActorRef itself. */ override def newActorCell( system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef, receiveTimeout: Option[Duration]): ActorCell = { val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout) Unsafe.instance.monitorEnter(cell) cell } private[akka] val routerConfig = _props.routerConfig private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) private[akka] val resizeInProgress = new AtomicBoolean private val resizeCounter = new AtomicLong @volatile private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute def routees = _routees @volatile private var _routeeProvider: RouteeProvider = _ def routeeProvider = _routeeProvider val route = try { _routeeProvider = routerConfig.createRouteeProvider(actorContext) val r = routerConfig.createRoute(routeeProps, routeeProvider) // initial resize, before message send routerConfig.resizer foreach { r ⇒ if (r.isTimeForResize(resizeCounter.getAndIncrement())) r.resize(routeeProps, routeeProvider) } r } finally { assert(Thread.holdsLock(actorContext)) Unsafe.instance.monitorExit(actorContext) // unblock Router’s constructor } if (routerConfig.resizer.isEmpty && _routees.isEmpty) throw new ActorInitializationException("router " + routerConfig + " did not register routees!") /* * end of construction */ def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { case _: AutoReceivedMessage ⇒ Destination(this, this) :: Nil case Terminated(_) ⇒ Destination(this, this) :: Nil case CurrentRoutees ⇒ sender ! RouterRoutees(_routees) Nil case _ ⇒ if (route.isDefinedAt(sender, message)) route(sender, message) else Nil } /** * Adds the routees to existing routees. * Adds death watch of the routees so that they are removed when terminated. * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize` */ private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = { _routees = _routees ++ newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach underlying.watch } /** * Adds the routees to existing routees. * Removes death watch of the routees. Doesn't stop the routees. * Not thread safe, but intended to be called from protected points, such as * `Resizer.resize` */ private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]): Unit = { _routees = _routees diff abandonedRoutees abandonedRoutees foreach underlying.unwatch } override def !(message: Any)(implicit sender: ActorRef = null): Unit = { resize() val s = if (sender eq null) underlying.system.deadLetters else sender val msg = message match { case Broadcast(m) ⇒ m case m ⇒ m } applyRoute(s, message) match { case Destination(_, x) :: Nil if x eq this ⇒ super.!(message)(s) case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) } } def resize(): Unit = { for (r ← routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) super.!(Router.Resize) } } } /** * This trait represents a router factory: it produces the actual router actor * and creates the routing table (a function which determines the recipients * for each message which is to be dispatched). The resulting RoutedActorRef * optimizes the sending of the message so that it does NOT go through the * router’s mailbox unless the route returns an empty recipient set. * * '''Caution:''' This means * that the route function is evaluated concurrently without protection by * the RoutedActorRef: either provide a reentrant (i.e. pure) implementation or * do the locking yourself! * * '''Caution:''' Please note that the [[akka.routing.Router]] which needs to * be returned by `createActor()` should not send a message to itself in its * constructor or `preStart()` or publish its self reference from there: if * someone tries sending a message to that reference before the constructor of * RoutedActorRef has returned, there will be a `NullPointerException`! */ trait RouterConfig { def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer) def createActor(): Router = new Router { override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy } /** * SupervisorStrategy for the created Router actor. */ def supervisorStrategy: SupervisorStrategy /** * Dispatcher ID to use for running the “head” actor, i.e. the [[akka.routing.Router]]. */ def routerDispatcher: String /** * Overridable merge strategy, by default completely prefers “this” (i.e. no merge). */ def withFallback(other: RouterConfig): RouterConfig = this protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _)) /** * Routers with dynamically resizable number of routees return the [[akka.routing.Resizer]] * to use. */ def resizer: Option[Resizer] = None } /** * Factory and registry for routees of the router. * Uses `context.actorOf` to create routees from nrOfInstances property * and `context.actorFor` lookup routees from paths. */ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { /** * Adds the routees to the router. * Adds death watch of the routees so that they are removed when terminated. * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize`. */ def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = { routedRef.addRoutees(routees) } /** * Adds the routees to the router. * Adds death watch of the routees so that they are removed when terminated. * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize`. * Java API. */ def registerRoutees(routees: java.util.List[ActorRef]): Unit = { import scala.collection.JavaConverters._ registerRoutees(routees.asScala.toIndexedSeq) } /** * Removes routees from the router. This method doesn't stop the routees. * Removes death watch of the routees. * Not thread safe, but intended to be called from protected points, such as * `Resizer.resize`. */ def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = { routedRef.removeRoutees(routees) } def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { case (x, Nil) if x <= 0 ⇒ throw new IllegalArgumentException( "Must specify nrOfInstances or routees for [%s]" format context.self.path.toString) case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) case (_, xs) ⇒ xs.map(context.actorFor(_))(scala.collection.breakOut) } def createAndRegisterRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): Unit = { if (resizer.isEmpty) { registerRoutees(createRoutees(props, nrOfInstances, routees)) } } /** * All routees of the router */ def routees: IndexedSeq[ActorRef] = routedRef.routees private def routedRef = context.self.asInstanceOf[RoutedActorRef] } /** * Java API for a custom router factory. * @see akka.routing.RouterConfig */ abstract class CustomRouterConfig extends RouterConfig { override def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { // as a bonus, this prevents closing of props and context in the returned Route PartialFunction val customRoute = createCustomRoute(props, routeeProvider) { case (sender, message) ⇒ customRoute.destinationsFor(sender, message) } } def createCustomRoute(props: Props, routeeProvider: RouteeProvider): CustomRoute } trait CustomRoute { def destinationsFor(sender: ActorRef, message: Any): java.lang.Iterable[Destination] } /** * Base trait for `Router` actors. Override `receive` to handle custom * messages which the corresponding [[akka.actor.RouterConfig]] lets * through by returning an empty route. */ trait Router extends Actor { // make sure that we synchronize properly to get the childrenRefs into our CPU cache val ref = context.synchronized { self match { case x: RoutedActorRef ⇒ x case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") } } final def receive = ({ case Router.Resize ⇒ val ab = ref.resizeInProgress if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) finally ab.set(false) case Terminated(child) ⇒ ref.removeRoutees(IndexedSeq(child)) if (ref.routees.isEmpty) context.stop(self) }: Receive) orElse routerReceive def routerReceive: Receive = Actor.emptyBehavior override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { // do not scrap children } } /** * INTERNAL API */ private object Router { case object Resize val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() { case _ ⇒ SupervisorStrategy.Escalate } } /** * Used to broadcast a message to all connections in a router; only the * contained message will be forwarded, i.e. the `Broadcast(...)` * envelope will be stripped off. * * Router implementations may choose to handle this message differently. */ case class Broadcast(message: Any) /** * Sending this message to a router will make it send back its currently used routees. * A RouterRoutees message is sent asynchronously to the "requester" containing information * about what routees the router is routing over. */ abstract class CurrentRoutees case object CurrentRoutees extends CurrentRoutees { /** * Java API: get the singleton instance */ def getInstance = this } /** * Message used to carry information about what routees the router is currently using. */ case class RouterRoutees(routees: Iterable[ActorRef]) /** * For every message sent to a router, its route determines a set of destinations, * where for each recipient a different sender may be specified; typically the * sender should match the sender of the original request, but e.g. the scatter- * gather router needs to receive the replies with an AskActorRef instead. */ case class Destination(sender: ActorRef, recipient: ActorRef) /** * Routing configuration that indicates no routing; this is also the default * value which hence overrides the merge strategy in order to accept values * from lower-precedence sources. The decision whether or not to create a * router is taken in the LocalActorRefProvider based on Props. */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed abstract class NoRouter extends RouterConfig case object NoRouter extends NoRouter { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null // FIXME, null, really?? def routerDispatcher: String = "" def supervisorStrategy = null // FIXME null, really?? override def withFallback(other: RouterConfig): RouterConfig = other /** * Java API: get the singleton instance */ def getInstance = this } /** * Router configuration which has no default, i.e. external configuration is required. */ case object FromConfig extends FromConfig { /** * Java API: get the singleton instance */ def getInstance = this @inline final def apply(routerDispatcher: String = Dispatchers.DefaultDispatcherId) = new FromConfig(routerDispatcher) @inline final def unapply(fc: FromConfig): Option[String] = Some(fc.routerDispatcher) } /** * Java API: Router configuration which has no default, i.e. external configuration is required. * * This can be used when the dispatcher to be used for the head Router needs to be configured * (defaults to default-dispatcher). */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig with Serializable { def this() = this(Dispatchers.DefaultDispatcherId) def createRoute(props: Props, routeeProvider: RouteeProvider): Route = throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy } object RoundRobinRouter { def apply(routees: Iterable[ActorRef]) = new RoundRobinRouter(routees = routees map (_.path.toString)) /** * Java API to create router with the supplied 'routees' actors. */ def create(routees: java.lang.Iterable[ActorRef]): RoundRobinRouter = { import scala.collection.JavaConverters._ apply(routees.asScala) } } /** * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * *

Supervision Setup

* * The router creates a “head” actor which supervises and/or monitors the * routees. Instances are created as children of this actor, hence the * children are not supervised by the parent of the router. Common choices are * to always escalate (meaning that fault handling is always applied to all * children simultaneously; this is the default) or use the parent’s strategy, * which will result in routed children being treated individually, but it is * possible as well to use Routers to give different supervisor strategies to * different groups of children. * * {{{ * class MyActor extends Actor { * override val supervisorStrategy = ... * * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) * * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) * * val specialChild = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { * ... * }))) * } * }}} * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with RoundRobinLike { /** * Constructor that sets nrOfInstances to be created. * Java API */ def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. * Java API * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) /** * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String): RoundRobinRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): RoundRobinRouter = copy(supervisorStrategy = strategy) } trait RoundRobinLike { this: RouterConfig ⇒ def nrOfInstances: Int def routees: Iterable[String] def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) val next = new AtomicLong(0) def getNext(): ActorRef = { val _routees = routeeProvider.routees _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int]) } { case (sender, message) ⇒ message match { case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) case msg ⇒ List(Destination(sender, getNext())) } } } } object RandomRouter { def apply(routees: Iterable[ActorRef]) = new RandomRouter(routees = routees map (_.path.toString)) /** * Java API to create router with the supplied 'routees' actors. */ def create(routees: java.lang.Iterable[ActorRef]): RandomRouter = { import scala.collection.JavaConverters._ apply(routees.asScala) } } /** * A Router that randomly selects one of the target connections to send a message to. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * *

Supervision Setup

* * The router creates a “head” actor which supervises and/or monitors the * routees. Instances are created as children of this actor, hence the * children are not supervised by the parent of the router. Common choices are * to always escalate (meaning that fault handling is always applied to all * children simultaneously; this is the default) or use the parent’s strategy, * which will result in routed children being treated individually, but it is * possible as well to use Routers to give different supervisor strategies to * different groups of children. * * {{{ * class MyActor extends Actor { * override val supervisorStrategy = ... * * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) * * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) * * val specialChild = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { * ... * }))) * } * }}} * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with RandomLike { /** * Constructor that sets nrOfInstances to be created. * Java API */ def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. * Java API * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) /** * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String): RandomRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): RandomRouter = copy(supervisorStrategy = strategy) } trait RandomLike { this: RouterConfig ⇒ def nrOfInstances: Int def routees: Iterable[String] def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) def getNext(): ActorRef = { val _routees = routeeProvider.routees _routees(ThreadLocalRandom.current.nextInt(_routees.size)) } { case (sender, message) ⇒ message match { case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) case msg ⇒ List(Destination(sender, getNext())) } } } } object SmallestMailboxRouter { def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString)) /** * Java API to create router with the supplied 'routees' actors. */ def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = { import scala.collection.JavaConverters._ apply(routees.asScala) } } /** * A Router that tries to send to the non-suspended routee with fewest messages in mailbox. * The selection is done in this order: * * *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * *

Supervision Setup

* * The router creates a “head” actor which supervises and/or monitors the * routees. Instances are created as children of this actor, hence the * children are not supervised by the parent of the router. Common choices are * to always escalate (meaning that fault handling is always applied to all * children simultaneously; this is the default) or use the parent’s strategy, * which will result in routed children being treated individually, but it is * possible as well to use Routers to give different supervisor strategies to * different groups of children. * * {{{ * class MyActor extends Actor { * override val supervisorStrategy = ... * * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) * * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) * * val specialChild = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { * ... * }))) * } * }}} * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with SmallestMailboxLike { /** * Constructor that sets nrOfInstances to be created. * Java API */ def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. * Java API * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) /** * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String): SmallestMailboxRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): SmallestMailboxRouter = copy(supervisorStrategy = strategy) } trait SmallestMailboxLike { this: RouterConfig ⇒ def nrOfInstances: Int def routees: Iterable[String] /** * Returns true if the actor is currently processing a message. * It will always return false for remote actors. * Method is exposed to subclasses to be able to implement custom * routers based on mailbox and actor internal state. */ protected def isProcessingMessage(a: ActorRef): Boolean = a match { case x: LocalActorRef ⇒ val cell = x.underlying cell.mailbox.isScheduled && cell.currentMessage != null case _ ⇒ false } /** * Returns true if the actor currently has any pending messages * in the mailbox, i.e. the mailbox is not empty. * It will always return false for remote actors. * Method is exposed to subclasses to be able to implement custom * routers based on mailbox and actor internal state. */ protected def hasMessages(a: ActorRef): Boolean = a match { case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages case _ ⇒ false } /** * Returns true if the actor is currently suspended. * It will always return false for remote actors. * Method is exposed to subclasses to be able to implement custom * routers based on mailbox and actor internal state. */ protected def isSuspended(a: ActorRef): Boolean = a match { case x: LocalActorRef ⇒ x.underlying.mailbox.isSuspended case _ ⇒ false } /** * Returns the number of pending messages in the mailbox of the actor. * It will always return 0 for remote actors. * Method is exposed to subclasses to be able to implement custom * routers based on mailbox and actor internal state. */ protected def numberOfMessages(a: ActorRef): Int = a match { case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages case _ ⇒ 0 } def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) // Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found. // Lowest score wins, score 0 is autowin // If no actor with score 0 is found, it will return that, or if it is terminated, a random of the entire set. // Why? Well, in case we had 0 viable actors and all we got was the default, which is the DeadLetters, anything else is better. // Order of interest, in ascending priority: // 1. The DeadLetterActorRef // 2. A Suspended ActorRef // 3. An ActorRef with unknown mailbox size but with one message being processed // 4. An ActorRef with unknown mailbox size that isn't processing anything // 5. An ActorRef with a known mailbox size // 6. An ActorRef without any messages @tailrec def getNext(targets: IndexedSeq[ActorRef] = routeeProvider.routees, proposedTarget: ActorRef = routeeProvider.context.system.deadLetters, currentScore: Long = Long.MaxValue, at: Int = 0, deep: Boolean = false): ActorRef = if (at >= targets.size) { if (deep) { if (proposedTarget.isTerminated) targets(ThreadLocalRandom.current.nextInt(targets.size)) else proposedTarget } else getNext(targets, proposedTarget, currentScore, 0, deep = true) } else { val target = targets(at) val newScore: Long = if (isSuspended(target)) Long.MaxValue - 1 else { //Just about better than the DeadLetters (if (isProcessingMessage(target)) 1l else 0l) + (if (!hasMessages(target)) 0l else { //Race between hasMessages and numberOfMessages here, unfortunate the numberOfMessages returns 0 if unknown val noOfMsgs: Long = if (deep) numberOfMessages(target) else 0 if (noOfMsgs > 0) noOfMsgs else Long.MaxValue - 3 //Just better than a suspended actorref }) } if (newScore == 0) target else if (newScore < 0 || newScore >= currentScore) getNext(targets, proposedTarget, currentScore, at + 1, deep) else getNext(targets, target, newScore, at + 1, deep) } { case (sender, message) ⇒ message match { case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) case msg ⇒ List(Destination(sender, getNext())) } } } } object BroadcastRouter { def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString)) /** * Java API to create router with the supplied 'routees' actors. */ def create(routees: java.lang.Iterable[ActorRef]): BroadcastRouter = { import scala.collection.JavaConverters._ apply(routees.asScala) } } /** * A Router that uses broadcasts a message to all its connections. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * *

Supervision Setup

* * The router creates a “head” actor which supervises and/or monitors the * routees. Instances are created as children of this actor, hence the * children are not supervised by the parent of the router. Common choices are * to always escalate (meaning that fault handling is always applied to all * children simultaneously; this is the default) or use the parent’s strategy, * which will result in routed children being treated individually, but it is * possible as well to use Routers to give different supervisor strategies to * different groups of children. * * {{{ * class MyActor extends Actor { * override val supervisorStrategy = ... * * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) * * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) * * val specialChild = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { * ... * }))) * } * }}} * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with BroadcastLike { /** * Constructor that sets nrOfInstances to be created. * Java API */ def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. * Java API * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) /** * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String): BroadcastRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): BroadcastRouter = copy(supervisorStrategy = strategy) } trait BroadcastLike { this: RouterConfig ⇒ def nrOfInstances: Int def routees: Iterable[String] def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) { case (sender, message) ⇒ toAll(sender, routeeProvider.routees) } } } object ScatterGatherFirstCompletedRouter { def apply(routees: Iterable[ActorRef], within: Duration) = new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within) /** * Java API to create router with the supplied 'routees' actors. */ def create(routees: java.lang.Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter = { import scala.collection.JavaConverters._ apply(routees.asScala, within) } } /** * Simple router that broadcasts the message to all routees, and replies with the first response. *
* You have to defin the 'within: Duration' parameter (f.e: within = 10 seconds). *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. * *

Supervision Setup

* * The router creates a “head” actor which supervises and/or monitors the * routees. Instances are created as children of this actor, hence the * children are not supervised by the parent of the router. Common choices are * to always escalate (meaning that fault handling is always applied to all * children simultaneously; this is the default) or use the parent’s strategy, * which will result in routed children being treated individually, but it is * possible as well to use Routers to give different supervisor strategies to * different groups of children. * * {{{ * class MyActor extends Actor { * override val supervisorStrategy = ... * * val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5))) * * val poolIndividuals = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy))) * * val specialChild = context.actorOf(Props[SomeActor].withRouter( * RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() { * ... * }))) * } * }}} * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) extends RouterConfig with ScatterGatherFirstCompletedLike { if (within <= Duration.Zero) throw new IllegalArgumentException( "[within: Duration] can not be zero or negative, was [" + within + "]") /** * Constructor that sets nrOfInstances to be created. * Java API */ def this(nr: Int, w: Duration) = this(nrOfInstances = nr, within = w) /** * Constructor that sets the routees to be used. * Java API * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ def this(routeePaths: java.lang.Iterable[String], w: Duration) = this(routees = iterableAsScalaIterable(routeePaths), within = w) /** * Constructor that sets the resizer to be used. * Java API */ def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w) /** * Java API for setting routerDispatcher */ def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) } trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def nrOfInstances: Int def routees: Iterable[String] def within: Duration def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) { case (sender, message) ⇒ val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider val asker = akka.pattern.PromiseActorRef(provider, within) asker.result.pipeTo(sender) toAll(asker, routeeProvider.routees) } } } /** * Routers with dynamically resizable number of routees is implemented by providing a Resizer * implementation in [[akka.routing.RouterConfig]]. */ trait Resizer { /** * Is it time for resizing. Typically implemented with modulo of nth message, but * could be based on elapsed time or something else. The messageCounter starts with 0 * for the initial resize and continues with 1 for the first message. Make sure to perform * initial resize before first message (messageCounter == 0), because there is no guarantee * that resize will be done when concurrent messages are in play. * * CAUTION: this method is invoked from the thread which tries to send a * message to the pool, i.e. the ActorRef.!() method, hence it may be called * concurrently. */ def isTimeForResize(messageCounter: Long): Boolean /** * Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize` * returns true and no other resize is in progress. * Create and register more routees with `routeeProvider.registerRoutees(newRoutees) * or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and * sending [[akka.actor.PoisonPill]] to them. * * This method is invoked only in the context of the Router actor in order to safely * create/stop children. */ def resize(props: Props, routeeProvider: RouteeProvider): Unit } case object DefaultResizer { def apply(resizerConfig: Config): DefaultResizer = DefaultResizer( lowerBound = resizerConfig.getInt("lower-bound"), upperBound = resizerConfig.getInt("upper-bound"), pressureThreshold = resizerConfig.getInt("pressure-threshold"), rampupRate = resizerConfig.getDouble("rampup-rate"), backoffThreshold = resizerConfig.getDouble("backoff-threshold"), backoffRate = resizerConfig.getDouble("backoff-rate"), stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), messagesPerResize = resizerConfig.getInt("messages-per-resize")) } //FIXME DOCUMENT ME case class DefaultResizer( /** * The fewest number of routees the router should ever have. */ lowerBound: Int = 1, /** * The most number of routees the router should ever have. * Must be greater than or equal to `lowerBound`. */ upperBound: Int = 10, /** * Threshold to evaluate if routee is considered to be busy (under pressure). * Implementation depends on this value (default is 1). * */ pressureThreshold: Int = 1, /** * Percentage to increase capacity whenever all routees are busy. * For example, 0.2 would increase 20% (rounded up), i.e. if current * capacity is 6 it will request an increase of 2 more routees. */ rampupRate: Double = 0.2, /** * Minimum fraction of busy routees before backing off. * For example, if this is 0.3, then we'll remove some routees only when * less than 30% of routees are busy, i.e. if current capacity is 10 and * 3 are busy then the capacity is unchanged, but if 2 or less are busy * the capacity is decreased. * * Use 0.0 or negative to avoid removal of routees. */ backoffThreshold: Double = 0.3, /** * Fraction of routees to be removed when the resizer reaches the * backoffThreshold. * For example, 0.1 would decrease 10% (rounded up), i.e. if current * capacity is 9 it will request an decrease of 1 routee. */ backoffRate: Double = 0.1, /** * When the resizer reduce the capacity the abandoned routee actors are stopped * with PoisonPill after this delay. The reason for the delay is to give concurrent * messages a chance to be placed in mailbox before sending PoisonPill. * Use 0 seconds to skip delay. */ stopDelay: Duration = 1.second, /** * Number of messages between resize operation. * Use 1 to resize before each message. */ messagesPerResize: Int = 10) extends Resizer { /** * Java API constructor for default values except bounds. */ def this(lower: Int, upper: Int) = this(lowerBound = lower, upperBound = upper) if (lowerBound < 0) throw new IllegalArgumentException("lowerBound must be >= 0, was: [%s]".format(lowerBound)) if (upperBound < 0) throw new IllegalArgumentException("upperBound must be >= 0, was: [%s]".format(upperBound)) if (upperBound < lowerBound) throw new IllegalArgumentException("upperBound must be >= lowerBound, was: [%s] < [%s]".format(upperBound, lowerBound)) if (rampupRate < 0.0) throw new IllegalArgumentException("rampupRate must be >= 0.0, was [%s]".format(rampupRate)) if (backoffThreshold > 1.0) throw new IllegalArgumentException("backoffThreshold must be <= 1.0, was [%s]".format(backoffThreshold)) if (backoffRate < 0.0) throw new IllegalArgumentException("backoffRate must be >= 0.0, was [%s]".format(backoffRate)) if (messagesPerResize <= 0) throw new IllegalArgumentException("messagesPerResize must be > 0, was [%s]".format(messagesPerResize)) def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0) def resize(props: Props, routeeProvider: RouteeProvider): Unit = { val currentRoutees = routeeProvider.routees val requestedCapacity = capacity(currentRoutees) if (requestedCapacity > 0) { val newRoutees = routeeProvider.createRoutees(props, requestedCapacity, Nil) routeeProvider.registerRoutees(newRoutees) } else if (requestedCapacity < 0) { val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity) routeeProvider.unregisterRoutees(abandon) delayedStop(routeeProvider.context.system.scheduler, abandon) } } /** * Give concurrent messages a chance to be placed in mailbox before * sending PoisonPill. */ protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]): Unit = { if (abandon.nonEmpty) { if (stopDelay <= Duration.Zero) { abandon foreach (_ ! PoisonPill) } else { scheduler.scheduleOnce(stopDelay) { abandon foreach (_ ! PoisonPill) } } } } /** * Returns the overall desired change in resizer capacity. Positive value will * add routees to the resizer. Negative value will remove routees from the * resizer. * @param routees The current actor in the resizer * @return the number of routees by which the resizer should be adjusted (positive, negative or zero) */ def capacity(routees: IndexedSeq[ActorRef]): Int = { val currentSize = routees.size val press = pressure(routees) val delta = filter(press, currentSize) val proposed = currentSize + delta if (proposed < lowerBound) delta + (lowerBound - proposed) else if (proposed > upperBound) delta - (proposed - upperBound) else delta } /** * Number of routees considered busy, or above 'pressure level'. * * Implementation depends on the value of `pressureThreshold` * (default is 1). * * * @param routees the current resizer of routees * @return number of busy routees, between 0 and routees.size */ def pressure(routees: IndexedSeq[ActorRef]): Int = { routees count { case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null case threshold ⇒ cell.mailbox.numberOfMessages >= threshold } case x ⇒ false } } /** * This method can be used to smooth the capacity delta by considering * the current pressure and current capacity. * * @param pressure current number of busy routees * @param capacity current number of routees * @return proposed change in the capacity */ def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity) + backoff(pressure, capacity) /** * Computes a proposed positive (or zero) capacity delta using * the configured `rampupRate`. * @param pressure the current number of busy routees * @param capacity the current number of total routees * @return proposed increase in capacity */ def rampup(pressure: Int, capacity: Int): Int = if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt /** * Computes a proposed negative (or zero) capacity delta using * the configured `backoffThreshold` and `backoffRate` * @param pressure the current number of busy routees * @param capacity the current number of total routees * @return proposed decrease in capacity (as a negative number) */ def backoff(pressure: Int, capacity: Int): Int = if (backoffThreshold > 0.0 && backoffRate > 0.0 && capacity > 0 && pressure.toDouble / capacity < backoffThreshold) math.floor(-1.0 * backoffRate * capacity) toInt else 0 }