diff --git a/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala index ed61d8798f..56193f668f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala @@ -15,12 +15,12 @@ class CustomRouteSpec extends AkkaSpec { import akka.dispatch.Dispatchers class MyRouter(target: ActorRef) extends RouterConfig { - override def createRoute(p: Props, prov: RouteeProvider): Route = { - prov.createAndRegisterRoutees(p, 1, Nil) + override def createRoute(props: Props, provider: RouteeProvider): Route = { + provider.createRoutees(props, 1) { case (sender, message: String) ⇒ Seq(Destination(sender, target)) - case (sender, message) ⇒ toAll(sender, prov.routees) + case (sender, message) ⇒ toAll(sender, provider.routees) } } override def supervisorStrategy = SupervisorStrategy.defaultStrategy diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 8e0337affb..b5362048a7 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -96,7 +96,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize` */ - private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = { + private[akka] def addRoutees(newRoutees: Iterable[ActorRef]): Unit = { _routees = _routees ++ newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach watch @@ -212,7 +212,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize`. */ - def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.addRoutees(routees) + def registerRoutees(routees: Iterable[ActorRef]): Unit = routedCell.addRoutees(routees) /** * Adds the routees to the router. @@ -234,31 +234,38 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { */ def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = routedCell.removeRoutees(routees) - def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = - (nrOfInstances, routees) match { - case (x, Nil) if x <= 0 ⇒ - throw new IllegalArgumentException( - "Must specify nrOfInstances or routees for [%s]" format context.self.path.toString) - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) - case (_, xs) ⇒ xs.map(context.actorFor(_))(scala.collection.breakOut) - } - - def createAndRegisterRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): Unit = - if (resizer.isEmpty) registerRoutees(createRoutees(props, nrOfInstances, routees)) + /** + * Looks up routes with specified paths and registers them. + */ + def registerRouteesFor(paths: Iterable[String]): Unit = { + val routees = paths.map(context.actorFor(_))(scala.collection.breakOut) + registerRoutees(routees) + } /** - * Adjust number of routees by creating new routees and register them if - * `nrOfInstances` is positive, otherwise if negative unregister - * routees and send [[akka.actor.PoisonPill]] after the specified delay. + * Creates new routees from specified `Props` and registers them. + */ + def createRoutees(props: Props, nrOfInstances: Int): Unit = { + if (nrOfInstances <= 0) throw new IllegalArgumentException( + "Must specify nrOfInstances or routees for [%s]" format context.self.path.toString) + else { + val routees = (1 to nrOfInstances).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) + registerRoutees(routees) + } + } + + /** + * Remove specified number of routees by unregister them + * and sending [[akka.actor.PoisonPill]] after the specified delay. * The reason for the delay is to give concurrent messages a chance to be * placed in mailbox before sending PoisonPill. */ - def adjustRoutees(props: Props, nrOfInstances: Int, stopDelay: Duration): Unit = { - if (nrOfInstances > 0) { - registerRoutees(createRoutees(props, nrOfInstances, Nil)) - } else if (nrOfInstances < 0) { + def removeRoutees(nrOfInstances: Int, stopDelay: Duration): Unit = { + if (nrOfInstances <= 0) { + throw new IllegalArgumentException("Expected positive nrOfInstances, got [%s]".format(nrOfInstances)) + } else if (nrOfInstances > 0) { val currentRoutees = routees - val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + nrOfInstances) + val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length - nrOfInstances) unregisterRoutees(abandon) delayedStop(context.system.scheduler, abandon, stopDelay) } @@ -268,9 +275,7 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { * Give concurrent messages a chance to be placed in mailbox before * sending PoisonPill. */ - protected def delayedStop( - scheduler: Scheduler, - abandon: IndexedSeq[ActorRef], stopDelay: Duration): Unit = { + protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef], stopDelay: Duration): Unit = { if (abandon.nonEmpty) { if (stopDelay <= Duration.Zero) { abandon foreach (_ ! PoisonPill) @@ -549,14 +554,17 @@ trait RoundRobinLike { this: RouterConfig ⇒ def routees: Iterable[String] def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { - routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } val next = new AtomicLong(0) def getNext(): ActorRef = { - val _routees = routeeProvider.routees - if (_routees.isEmpty) routeeProvider.context.system.deadLetters - else _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int]) + val currentRoutees = routeeProvider.routees + if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters + else currentRoutees((next.getAndIncrement % currentRoutees.size).asInstanceOf[Int]) } { @@ -668,12 +676,15 @@ trait RandomLike { this: RouterConfig ⇒ def routees: Iterable[String] def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { - routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } def getNext(): ActorRef = { - val _routees = routeeProvider.routees - if (_routees.isEmpty) routeeProvider.context.system.deadLetters - else _routees(ThreadLocalRandom.current.nextInt(_routees.size)) + val currentRoutees = routeeProvider.routees + if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters + else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size)) } { @@ -848,7 +859,10 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ } def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { - routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } // Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found. // Lowest score wins, score 0 is autowin @@ -996,7 +1010,10 @@ trait BroadcastLike { this: RouterConfig ⇒ def routees: Iterable[String] def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { - routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } { case (sender, message) ⇒ toAll(sender, routeeProvider.routees) @@ -1114,7 +1131,10 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def within: Duration def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { - routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } { case (sender, message) ⇒ @@ -1255,7 +1275,8 @@ case class DefaultResizer( val currentRoutees = routeeProvider.routees val requestedCapacity = capacity(currentRoutees) - routeeProvider.adjustRoutees(props, requestedCapacity, stopDelay) + if (requestedCapacity > 0) routeeProvider.createRoutees(props, requestedCapacity) + else if (requestedCapacity < 0) routeeProvider.removeRoutees(-requestedCapacity, stopDelay) } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 5d58d0ffac..8bc38440ad 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -66,23 +66,25 @@ class ClusterRouteeProvider(_context: ActorContext, _resizer: Option[Resizer]) // need this counter as instance variable since Resizer may call createRoutees several times private val childNameCounter = new AtomicInteger - override def createRoutees(props: Props, nrOfInstances: Int, _routees: Iterable[String]): IndexedSeq[ActorRef] = { + override def registerRouteesFor(paths: Iterable[String]): Unit = + throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]" + format context.self.path.toString) + + override def createRoutees(props: Props, nrOfInstances: Int): Unit = { val nodes = upNodes - if (_routees.nonEmpty) { - throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]" - format context.self.path.toString) - } else if (nodes.isEmpty) { + if (nodes.isEmpty) { IndexedSeq.empty } else { val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 // FIXME We could count number of routees per node and select nodes with least routees first val nodesIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator - IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { + val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodesIter.next)) impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, systemService = false, Some(deploy), lookupDeploy = false, async = false) }) + registerRoutees(refs) } } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index a0b7ae4a49..01230605d2 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -58,25 +58,25 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) ext class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _resizer: Option[Resizer]) extends RouteeProvider(_context, _resizer) { + if (nodes.isEmpty) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]" + format context.self.path.toString) + // need this iterator as instance variable since Resizer may call createRoutees several times private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator - override def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = - (nrOfInstances, routees, nodes) match { - case (_, _, Nil) ⇒ throw new ConfigurationException("Must specify list of remote target.nodes for [%s]" - format context.self.path.toString) + override def registerRouteesFor(paths: Iterable[String]): Unit = + throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" + format context.self.path.toString) - case (n, Nil, ys) ⇒ - val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { - val name = "c" + i - val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, - systemService = false, Some(deploy), lookupDeploy = false, async = false) - }) - - case (_, xs, _) ⇒ throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" - format context.self.path.toString) - } + override def createRoutees(props: Props, nrOfInstances: Int): Unit = { + val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 + val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { + val name = "c" + i + val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) + impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, + systemService = false, Some(deploy), lookupDeploy = false, async = false) + }) + registerRoutees(refs) + } }