Better methods in RouteeProvider

* createRoutees only for new nrOfInstances
* createRoutees also registers
* New registerRouteesFor for path lookup
* removeRoutees to gracefully unregister and stop routee
This commit is contained in:
Patrik Nordwall 2012-08-28 20:54:16 +02:00
parent f4cc8f8649
commit bf20ae5157
4 changed files with 84 additions and 61 deletions

View file

@ -15,12 +15,12 @@ class CustomRouteSpec extends AkkaSpec {
import akka.dispatch.Dispatchers
class MyRouter(target: ActorRef) extends RouterConfig {
override def createRoute(p: Props, prov: RouteeProvider): Route = {
prov.createAndRegisterRoutees(p, 1, Nil)
override def createRoute(props: Props, provider: RouteeProvider): Route = {
provider.createRoutees(props, 1)
{
case (sender, message: String) Seq(Destination(sender, target))
case (sender, message) toAll(sender, prov.routees)
case (sender, message) toAll(sender, provider.routees)
}
}
override def supervisorStrategy = SupervisorStrategy.defaultStrategy

View file

@ -96,7 +96,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
* Not thread safe, but intended to be called from protected points, such as
* `RouterConfig.createRoute` and `Resizer.resize`
*/
private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = {
private[akka] def addRoutees(newRoutees: Iterable[ActorRef]): Unit = {
_routees = _routees ++ newRoutees
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
newRoutees foreach watch
@ -212,7 +212,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) {
* Not thread safe, but intended to be called from protected points, such as
* `RouterConfig.createRoute` and `Resizer.resize`.
*/
def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.addRoutees(routees)
def registerRoutees(routees: Iterable[ActorRef]): Unit = routedCell.addRoutees(routees)
/**
* Adds the routees to the router.
@ -234,31 +234,38 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) {
*/
def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.removeRoutees(routees)
def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] =
(nrOfInstances, routees) match {
case (x, Nil) if x <= 0
throw new IllegalArgumentException(
"Must specify nrOfInstances or routees for [%s]" format context.self.path.toString)
case (x, Nil) (1 to x).map(_ context.actorOf(props))(scala.collection.breakOut)
case (_, xs) xs.map(context.actorFor(_))(scala.collection.breakOut)
}
def createAndRegisterRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): Unit =
if (resizer.isEmpty) registerRoutees(createRoutees(props, nrOfInstances, routees))
/**
* Looks up routes with specified paths and registers them.
*/
def registerRouteesFor(paths: Iterable[String]): Unit = {
val routees = paths.map(context.actorFor(_))(scala.collection.breakOut)
registerRoutees(routees)
}
/**
* Adjust number of routees by creating new routees and register them if
* `nrOfInstances` is positive, otherwise if negative unregister
* routees and send [[akka.actor.PoisonPill]] after the specified delay.
* Creates new routees from specified `Props` and registers them.
*/
def createRoutees(props: Props, nrOfInstances: Int): Unit = {
if (nrOfInstances <= 0) throw new IllegalArgumentException(
"Must specify nrOfInstances or routees for [%s]" format context.self.path.toString)
else {
val routees = (1 to nrOfInstances).map(_ context.actorOf(props))(scala.collection.breakOut)
registerRoutees(routees)
}
}
/**
* Remove specified number of routees by unregister them
* and sending [[akka.actor.PoisonPill]] after the specified delay.
* The reason for the delay is to give concurrent messages a chance to be
* placed in mailbox before sending PoisonPill.
*/
def adjustRoutees(props: Props, nrOfInstances: Int, stopDelay: Duration): Unit = {
if (nrOfInstances > 0) {
registerRoutees(createRoutees(props, nrOfInstances, Nil))
} else if (nrOfInstances < 0) {
def removeRoutees(nrOfInstances: Int, stopDelay: Duration): Unit = {
if (nrOfInstances <= 0) {
throw new IllegalArgumentException("Expected positive nrOfInstances, got [%s]".format(nrOfInstances))
} else if (nrOfInstances > 0) {
val currentRoutees = routees
val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + nrOfInstances)
val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length - nrOfInstances)
unregisterRoutees(abandon)
delayedStop(context.system.scheduler, abandon, stopDelay)
}
@ -268,9 +275,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) {
* Give concurrent messages a chance to be placed in mailbox before
* sending PoisonPill.
*/
protected def delayedStop(
scheduler: Scheduler,
abandon: IndexedSeq[ActorRef], stopDelay: Duration): Unit = {
protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef], stopDelay: Duration): Unit = {
if (abandon.nonEmpty) {
if (stopDelay <= Duration.Zero) {
abandon foreach (_ ! PoisonPill)
@ -549,14 +554,17 @@ trait RoundRobinLike { this: RouterConfig ⇒
def routees: Iterable[String]
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val next = new AtomicLong(0)
def getNext(): ActorRef = {
val _routees = routeeProvider.routees
if (_routees.isEmpty) routeeProvider.context.system.deadLetters
else _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int])
val currentRoutees = routeeProvider.routees
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees((next.getAndIncrement % currentRoutees.size).asInstanceOf[Int])
}
{
@ -668,12 +676,15 @@ trait RandomLike { this: RouterConfig ⇒
def routees: Iterable[String]
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
def getNext(): ActorRef = {
val _routees = routeeProvider.routees
if (_routees.isEmpty) routeeProvider.context.system.deadLetters
else _routees(ThreadLocalRandom.current.nextInt(_routees.size))
val currentRoutees = routeeProvider.routees
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
}
{
@ -848,7 +859,10 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
}
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
// Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found.
// Lowest score wins, score 0 is autowin
@ -996,7 +1010,10 @@ trait BroadcastLike { this: RouterConfig ⇒
def routees: Iterable[String]
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
{
case (sender, message) toAll(sender, routeeProvider.routees)
@ -1114,7 +1131,10 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
def within: Duration
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
{
case (sender, message)
@ -1255,7 +1275,8 @@ case class DefaultResizer(
val currentRoutees = routeeProvider.routees
val requestedCapacity = capacity(currentRoutees)
routeeProvider.adjustRoutees(props, requestedCapacity, stopDelay)
if (requestedCapacity > 0) routeeProvider.createRoutees(props, requestedCapacity)
else if (requestedCapacity < 0) routeeProvider.removeRoutees(-requestedCapacity, stopDelay)
}
/**

View file

@ -66,23 +66,25 @@ class ClusterRouteeProvider(_context: ActorContext, _resizer: Option[Resizer])
// need this counter as instance variable since Resizer may call createRoutees several times
private val childNameCounter = new AtomicInteger
override def createRoutees(props: Props, nrOfInstances: Int, _routees: Iterable[String]): IndexedSeq[ActorRef] = {
override def registerRouteesFor(paths: Iterable[String]): Unit =
throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]"
format context.self.path.toString)
override def createRoutees(props: Props, nrOfInstances: Int): Unit = {
val nodes = upNodes
if (_routees.nonEmpty) {
throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]"
format context.self.path.toString)
} else if (nodes.isEmpty) {
if (nodes.isEmpty) {
IndexedSeq.empty
} else {
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
// FIXME We could count number of routees per node and select nodes with least routees first
val nodesIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator
IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val refs = IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + childNameCounter.incrementAndGet
val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodesIter.next))
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name,
systemService = false, Some(deploy), lookupDeploy = false, async = false)
})
registerRoutees(refs)
}
}

View file

@ -58,25 +58,25 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) ext
class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _resizer: Option[Resizer])
extends RouteeProvider(_context, _resizer) {
if (nodes.isEmpty) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]"
format context.self.path.toString)
// need this iterator as instance variable since Resizer may call createRoutees several times
private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator
override def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] =
(nrOfInstances, routees, nodes) match {
case (_, _, Nil) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]"
format context.self.path.toString)
override def registerRouteesFor(paths: Iterable[String]): Unit =
throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"
format context.self.path.toString)
case (n, Nil, ys)
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i
val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next))
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name,
systemService = false, Some(deploy), lookupDeploy = false, async = false)
})
case (_, xs, _) throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"
format context.self.path.toString)
}
override def createRoutees(props: Props, nrOfInstances: Int): Unit = {
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
val refs = IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i
val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next))
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name,
systemService = false, Some(deploy), lookupDeploy = false, async = false)
})
registerRoutees(refs)
}
}