diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index a9ec39ff6e..910985151d 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -45,9 +45,9 @@ object RoutingSpec { class MyRouter(config: Config) extends RouterConfig { val foo = config.getString("foo") - def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { - val routees = IndexedSeq(actorContext.actorOf(Props[Echo])) - registerRoutees(actorContext, routees) + def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { + val routees = IndexedSeq(routeeProvider.context.actorOf(Props[Echo])) + routeeProvider.registerRoutees(routees) { case (sender, message) ⇒ Nil @@ -542,13 +542,13 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with case class VoteCountRouter() extends RouterConfig { //#crRoute - def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { - val democratActor = actorContext.actorOf(Props(new DemocratActor()), "d") - val republicanActor = actorContext.actorOf(Props(new RepublicanActor()), "r") + def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { + val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d") + val republicanActor = routeeProvider.context.actorOf(Props(new RepublicanActor()), "r") val routees = Vector[ActorRef](democratActor, republicanActor) //#crRegisterRoutees - registerRoutees(actorContext, routees) + routeeProvider.registerRoutees(routees) //#crRegisterRoutees //#crRoutingLogic diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index abc1c80713..2bbb819b2d 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -33,18 +33,31 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute def routees = _routees - def addRoutees(newRoutees: IndexedSeq[ActorRef]) { + /** + * Adds the routees to existing routees. + * Adds death watch of the routees so that they are removed when terminated. + * Not thread safe, but intended to be called from protected points, such as + * `RouterConfig.createRoute` and `Resizer.resize` + */ + private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]) { _routees = _routees ++ newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach underlying.watch } - def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) { + /** + * Adds the routees to existing routees. + * Removes death watch of the routees. Doesn't stop the routees. + * Not thread safe, but intended to be called from protected points, such as + * `Resizer.resize` + */ + private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) { _routees = _routees diff abandonedRoutees abandonedRoutees foreach underlying.unwatch } - val route = _props.routerConfig.createRoute(routeeProps, actorContext) + private val routeeProvider = _props.routerConfig.createRouteeProvider(actorContext) + val route = _props.routerConfig.createRoute(routeeProps, routeeProvider) // initial resize, before message send resize() @@ -91,7 +104,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup for (r ← _props.routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) { try { - r.resize(routeeProps, actorContext, routees, _props.routerConfig) + r.resize(routeeProps, routeeProvider) } finally { resizeProgress.set(false) } @@ -120,7 +133,9 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup */ trait RouterConfig { - def createRoute(routeeProps: Props, actorContext: ActorContext): Route + def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route + + def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer) def createActor(): Router = new Router {} @@ -134,32 +149,6 @@ trait RouterConfig { protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _)) - def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { - case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) - case (_, xs) ⇒ xs.map(context.actorFor(_))(scala.collection.breakOut) - } - - protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Unit = { - if (resizer.isEmpty) { - registerRoutees(context, createRoutees(props, context, nrOfInstances, routees)) - } - } - - /** - * Adds new routees to the router. - */ - def registerRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = { - context.self.asInstanceOf[RoutedActorRef].addRoutees(routees) - } - - /** - * Removes routees from the router. This method doesn't stop the routees. - */ - def unregisterRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = { - context.self.asInstanceOf[RoutedActorRef].removeRoutees(routees) - } - /** * Routers with dynamically resizable number of routees return the [[akka.routing.Resizer]] * to use. @@ -168,26 +157,84 @@ trait RouterConfig { } +/** + * Factory and registry for routees of the router. + * Uses `context.actorOf` to create routees from nrOfInstances property + * and `context.actorFor` lookup routees from paths. + */ +class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { + + /** + * Adds the routees to the router. + * Adds death watch of the routees so that they are removed when terminated. + * Not thread safe, but intended to be called from protected points, such as + * `RouterConfig.createRoute` and `Resizer.resize`. + */ + def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = { + routedRef.addRoutees(routees) + } + + /** + * Adds the routees to the router. + * Adds death watch of the routees so that they are removed when terminated. + * Not thread safe, but intended to be called from protected points, such as + * `RouterConfig.createRoute` and `Resizer.resize`. + * Java API. + */ + def registerRoutees(routees: java.util.List[ActorRef]): Unit = { + import scala.collection.JavaConverters._ + registerRoutees(routees.asScala.toIndexedSeq) + } + + /** + * Removes routees from the router. This method doesn't stop the routees. + * Removes death watch of the routees. + * Not thread safe, but intended to be called from protected points, such as + * `Resizer.resize`. + */ + def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = { + routedRef.removeRoutees(routees) + } + + def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = + (nrOfInstances, routees) match { + case (x, Nil) if x <= 0 ⇒ + throw new IllegalArgumentException( + "Must specify nrOfInstances or routees for [%s]" format context.self.path.toString) + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) + case (_, xs) ⇒ xs.map(context.actorFor(_))(scala.collection.breakOut) + } + + def createAndRegisterRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): Unit = { + if (resizer.isEmpty) { + registerRoutees(createRoutees(props, nrOfInstances, routees)) + } + } + + /** + * All routees of the router + */ + def routees: IndexedSeq[ActorRef] = routedRef.routees + + private def routedRef = context.self.asInstanceOf[RoutedActorRef] + +} + /** * Java API for a custom router factory. * @see akka.routing.RouterConfig */ abstract class CustomRouterConfig extends RouterConfig { - override def createRoute(props: Props, context: ActorContext): Route = { + override def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { // as a bonus, this prevents closing of props and context in the returned Route PartialFunction - val customRoute = createCustomRoute(props, context) + val customRoute = createCustomRoute(props, routeeProvider) { case (sender, message) ⇒ customRoute.destinationsFor(sender, message) } } - def createCustomRoute(props: Props, context: ActorContext): CustomRoute - - protected def registerRoutees(context: ActorContext, routees: java.util.List[ActorRef]): Unit = { - import scala.collection.JavaConverters._ - registerRoutees(context, routees.asScala.toIndexedSeq) - } + def createCustomRoute(props: Props, routeeProvider: RouteeProvider): CustomRoute } @@ -254,23 +301,23 @@ case class Destination(sender: ActorRef, recipient: ActorRef) * Oxymoron style. */ case object NoRouter extends RouterConfig { - def createRoute(props: Props, actorContext: ActorContext): Route = null + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null } /** * Router configuration which has no default, i.e. external configuration is required. */ case object FromConfig extends RouterConfig { - def createRoute(props: Props, actorContext: ActorContext): Route = - throw new ConfigurationException("router " + actorContext.self + " needs external configuration from file (e.g. application.conf)") + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = + throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") } /** * Java API: Router configuration which has no default, i.e. external configuration is required. */ case class FromConfig() extends RouterConfig { - def createRoute(props: Props, actorContext: ActorContext): Route = - throw new ConfigurationException("router " + actorContext.self + " needs external configuration from file (e.g. application.conf)") + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = + throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") } object RoundRobinRouter { @@ -332,21 +379,20 @@ trait RoundRobinLike { this: RouterConfig ⇒ def routees: Iterable[String] - def createRoute(props: Props, context: ActorContext): Route = { - createAndRegisterRoutees(props, context, nrOfInstances, routees) + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) - val ref = context.self.asInstanceOf[RoutedActorRef] val next = new AtomicLong(0) def getNext(): ActorRef = { - val _routees = ref.routees + val _routees = routeeProvider.routees _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int]) } { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) case msg ⇒ List(Destination(sender, getNext())) } } @@ -418,18 +464,18 @@ trait RandomLike { this: RouterConfig ⇒ override def initialValue = SecureRandom.getInstance("SHA1PRNG") } - def createRoute(props: Props, context: ActorContext): Route = { - val ref = context.self.asInstanceOf[RoutedActorRef] - createAndRegisterRoutees(props, context, nrOfInstances, routees) + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) def getNext(): ActorRef = { - ref.routees(random.get.nextInt(ref.routees.size)) + val _routees = routeeProvider.routees + _routees(random.get.nextInt(_routees.size)) } { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) case msg ⇒ List(Destination(sender, getNext())) } } @@ -559,13 +605,12 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ case _ ⇒ 0 } - def createRoute(props: Props, context: ActorContext): Route = { - val ref = context.self.asInstanceOf[RoutedActorRef] - createAndRegisterRoutees(props, context, nrOfInstances, routees) + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) def getNext(): ActorRef = { // non-local actors mailbox size is unknown, so consider them lowest priority - val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) ⇒ l } + val activeLocal = routeeProvider.routees collect { case l: LocalActorRef if !isSuspended(l) ⇒ l } // 1. anyone not processing message and with empty mailbox activeLocal.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) getOrElse { // 2. anyone with empty mailbox @@ -573,7 +618,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ // 3. sort on mailbox size activeLocal.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse { // 4. no locals, just pick one, random - ref.routees(random.get.nextInt(ref.routees.size)) + val _routees = routeeProvider.routees + _routees(random.get.nextInt(_routees.size)) } } } @@ -582,7 +628,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) case msg ⇒ List(Destination(sender, getNext())) } } @@ -649,14 +695,13 @@ trait BroadcastLike { this: RouterConfig ⇒ def routees: Iterable[String] - def createRoute(props: Props, context: ActorContext): Route = { - val ref = context.self.asInstanceOf[RoutedActorRef] - createAndRegisterRoutees(props, context, nrOfInstances, routees) + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) { case (sender, message) ⇒ message match { - case _ ⇒ toAll(sender, ref.routees) + case _ ⇒ toAll(sender, routeeProvider.routees) } } } @@ -724,16 +769,16 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def within: Duration - def createRoute(props: Props, context: ActorContext): Route = { - val ref = context.self.asInstanceOf[RoutedActorRef] - createAndRegisterRoutees(props, context, nrOfInstances, routees) + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) { case (sender, message) ⇒ - val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get + // FIXME avoid this cast + val asker = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get asker.result.pipeTo(sender) message match { - case _ ⇒ toAll(asker, ref.routees) + case _ ⇒ toAll(asker, routeeProvider.routees) } } } @@ -755,11 +800,11 @@ trait Resizer { /** * Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize` * returns true and no other resize is in progress. - * Create and register more routees with `routerConfig.registerRoutees(actorContext, newRoutees) - * or remove routees with `routerConfig.unregisterRoutees(actorContext, abandonedRoutees)` and + * Create and register more routees with `routeeProvider.registerRoutees(newRoutees) + * or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and * sending [[akka.actor.PoisonPill]] to them. */ - def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) + def resize(props: Props, routeeProvider: RouteeProvider) } case object DefaultResizer { @@ -849,16 +894,17 @@ case class DefaultResizer( def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0) - def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) { + def resize(props: Props, routeeProvider: RouteeProvider) { + val currentRoutees = routeeProvider.routees val requestedCapacity = capacity(currentRoutees) if (requestedCapacity > 0) { - val newRoutees = routerConfig.createRoutees(props, actorContext, requestedCapacity, Nil) - routerConfig.registerRoutees(actorContext, newRoutees) + val newRoutees = routeeProvider.createRoutees(props, requestedCapacity, Nil) + routeeProvider.registerRoutees(newRoutees) } else if (requestedCapacity < 0) { val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity) - routerConfig.unregisterRoutees(actorContext, abandon) - delayedStop(actorContext.system.scheduler, abandon) + routeeProvider.unregisterRoutees(abandon) + delayedStop(routeeProvider.context.system.scheduler, abandon) } } diff --git a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java index c89401e5cc..3668bc1030 100644 --- a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java @@ -107,13 +107,13 @@ public class CustomRouterDocTestBase { //#crRoute @Override - public CustomRoute createCustomRoute(Props props, ActorContext context) { - final ActorRef democratActor = context.actorOf(new Props(DemocratActor.class), "d"); - final ActorRef republicanActor = context.actorOf(new Props(RepublicanActor.class), "r"); + public CustomRoute createCustomRoute(Props props, RouteeProvider routeeProvider) { + final ActorRef democratActor = routeeProvider.context().actorOf(new Props(DemocratActor.class), "d"); + final ActorRef republicanActor = routeeProvider.context().actorOf(new Props(RepublicanActor.class), "r"); List routees = Arrays.asList(new ActorRef[] { democratActor, republicanActor }); //#crRegisterRoutees - registerRoutees(context, routees); + routeeProvider.registerRoutees(routees); //#crRegisterRoutees //#crRoutingLogic diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index 7ec163726a..05101497e1 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -69,7 +69,7 @@ The "app" in this case refers to the name of the ``ActorSystem``:: actor { deployment { /serviceA/retrieval { - remote = “akka://app@10.0.0.1:2552” + remote = "akka://app@10.0.0.1:2552" } } } @@ -106,10 +106,10 @@ This is also done via configuration:: actor { deployment { /serviceA/aggregation { - router = “round-robin” + router = "round-robin" nr-of-instances = 10 - routees { - nodes = [“akka://app@10.0.0.2:2552”, “akka://app@10.0.0.3:2552”] + target { + nodes = ["akka://app@10.0.0.2:2552", "akka://app@10.0.0.3:2552"] } } } diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 5a8005fbf2..2d460aa060 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -124,10 +124,10 @@ This is also done via configuration:: actor { deployment { /serviceA/aggregation { - router = “round-robin” + router = "round-robin" nr-of-instances = 10 - routees { - nodes = [“akka://app@10.0.0.2:2552”, “akka://app@10.0.0.3:2552”] + target { + nodes = ["akka://app@10.0.0.2:2552", "akka://app@10.0.0.3:2552"] } } } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 814a85f591..8391dea8c4 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -17,7 +17,7 @@ akka { # at that node e.g. "akka://sys@host:port" remote = "" - routees { + target { # A list of hostnames and ports for instantiating the children of a # non-direct router diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index fe6844b8dc..de3e0825ff 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -14,7 +14,6 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings override protected def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ - import akka.util.ReflectiveAccess._ super.parseConfig(path, config) match { case d @ Some(deploy) ⇒ @@ -24,16 +23,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str) val nodes = deploy.config.getStringList("target.nodes").asScala if (nodes.isEmpty || deploy.routing == NoRouter) d - else { - val r = deploy.routing match { - case RoundRobinRouter(x, _, resizer) ⇒ RemoteRoundRobinRouter(x, nodes, resizer) - case RandomRouter(x, _, resizer) ⇒ RemoteRandomRouter(x, nodes, resizer) - case SmallestMailboxRouter(x, _, resizer) ⇒ RemoteSmallestMailboxRouter(x, nodes, resizer) - case BroadcastRouter(x, _, resizer) ⇒ RemoteBroadcastRouter(x, nodes, resizer) - case ScatterGatherFirstCompletedRouter(x, _, w, resizer) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer) - } - Some(deploy.copy(routing = r)) - } + else Some(deploy.copy(routing = new RemoteRouterConfig(deploy.routing, nodes))) } case None ⇒ None } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala new file mode 100644 index 0000000000..dffb874be6 --- /dev/null +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.routing + +import com.typesafe.config.ConfigFactory +import akka.actor.ActorContext +import akka.actor.ActorRef +import akka.actor.ActorSystemImpl +import akka.actor.Deploy +import akka.actor.InternalActorRef +import akka.actor.Props +import akka.config.ConfigurationException +import akka.remote.RemoteScope +import akka.remote.RemoteAddressExtractor + +/** + * [[akka.routing.RouterConfig]] implementation for remote deployment on defined + * target nodes. Delegates other duties to the local [[akka.routing.RouterConfig]], + * which makes it possible to mix this with the built-in routers such as + * [[akka.routing.RoundRobinRouter]] or custom routers. + */ +class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends RouterConfig { + + override def createRouteeProvider(context: ActorContext) = new RemoteRouteeProvider(nodes, context, resizer) + + override def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { + local.createRoute(routeeProps, routeeProvider) + } + + override def createActor(): Router = local.createActor() + + override def resizer: Option[Resizer] = local.resizer + +} + +/** + * Factory and registry for routees of the router. + * Deploys new routees on the specified `nodes`, round-robin. + * + * Routee paths may not be combined with remote target nodes. + */ +class RemoteRouteeProvider(nodes: Iterable[String], _context: ActorContext, _resizer: Option[Resizer]) + extends RouteeProvider(_context, _resizer) { + + // need this iterator as instance variable since Resizer may call createRoutees several times + private val nodeAddressIter = { + val nodeAddresses = nodes map { + case RemoteAddressExtractor(a) ⇒ a + case x ⇒ throw new ConfigurationException("unparseable remote node " + x) + } + Stream.continually(nodeAddresses).flatten.iterator + } + + override def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = + (nrOfInstances, routees, nodes) match { + case (_, _, Nil) ⇒ throw new ConfigurationException("Must specify list of remote target.nodes for [%s]" + format context.self.path.toString) + + case (n, Nil, ys) ⇒ + val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 + IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { + val name = "c" + i + val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) + impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy)) + }) + + case (_, xs, _) ⇒ throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" + format context.self.path.toString) + } +} + diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala deleted file mode 100644 index c8f61f471e..0000000000 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ -package akka.routing - -import akka.actor._ -import akka.remote._ -import scala.collection.JavaConverters._ -import com.typesafe.config.ConfigFactory -import akka.config.ConfigurationException -import akka.util.Duration - -trait RemoteRouterConfig extends RouterConfig { - override def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { - case (_, Nil) ⇒ throw new ConfigurationException("must specify list of remote nodes") - case (n, xs) ⇒ - val nodes = routees map { - case RemoteAddressExtractor(a) ⇒ a - case x ⇒ throw new ConfigurationException("unparseable remote node " + x) - } - val node = Stream.continually(nodes).flatten.iterator - val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { - val name = "c" + i - val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(node.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy)) - }) - } -} - -/** - * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. - *
- * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the round robin should both create new actors and use the 'routees' actor(s). - * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. - *
- * The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. - */ -case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) - extends RemoteRouterConfig with RoundRobinLike { - - /** - * Constructor that sets the routees to be used. - * Java API - */ - def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) - - /** - * Constructor that sets the resizer to be used. - * Java API - */ - def this(resizer: Resizer) = this(0, Nil, Some(resizer)) -} - -/** - * A Router that randomly selects one of the target connections to send a message to. - *
- * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). - * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. - *
- * The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. - */ -case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) - extends RemoteRouterConfig with RandomLike { - - /** - * Constructor that sets the routees to be used. - * Java API - */ - def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) - - /** - * Constructor that sets the resizer to be used. - * Java API - */ - def this(resizer: Resizer) = this(0, Nil, Some(resizer)) -} - -/** - * A Router that tries to send to routee with fewest messages in mailbox. - *
- * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). - * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. - *
- * The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. - */ -case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) - extends RemoteRouterConfig with SmallestMailboxLike { - - /** - * Constructor that sets the routees to be used. - * Java API - */ - def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) - - /** - * Constructor that sets the resizer to be used. - * Java API - */ - def this(resizer: Resizer) = this(0, Nil, Some(resizer)) -} - -/** - * A Router that uses broadcasts a message to all its connections. - *
- * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). - * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. - *
- * The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. - */ -case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) - extends RemoteRouterConfig with BroadcastLike { - - /** - * Constructor that sets the routees to be used. - * Java API - */ - def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) - - /** - * Constructor that sets the resizer to be used. - * Java API - */ - def this(resizer: Resizer) = this(0, Nil, Some(resizer)) -} - -/** - * Simple router that broadcasts the message to all routees, and replies with the first response. - *
- * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). - * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. - *
- * The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. - */ -case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration, - override val resizer: Option[Resizer] = None) - extends RemoteRouterConfig with ScatterGatherFirstCompletedLike { - - /** - * Constructor that sets the routees to be used. - * Java API - */ - def this(n: Int, t: java.lang.Iterable[String], w: Duration) = this(n, t.asScala, w) - - /** - * Constructor that sets the resizer to be used. - * Java API - */ - def this(resizer: Resizer, w: Duration) = this(0, Nil, w, Some(resizer)) -} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 5c06f36804..6418f93966 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -16,6 +16,7 @@ object RemoteRouterSpec { } } +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteRouterSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" @@ -25,10 +26,18 @@ akka { } actor.deployment { /blub { - router = "round-robin" + router = round-robin nr-of-instances = 2 target.nodes = ["akka://remote_sys@localhost:12346"] } + /elastic-blub { + router = round-robin + resizer { + lower-bound = 2 + upper-bound = 3 + } + target.nodes = ["akka://remote_sys@localhost:12346"] + } } } """) with ImplicitSender { @@ -52,6 +61,23 @@ akka { expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2" } + "deploy its children on remote host driven by programatic definition" in { + val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2), + Seq("akka://remote_sys@localhost:12346"))), "blub2") + router ! "" + expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c1" + router ! "" + expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c2" + } + + "deploy dynamic resizable number of children on remote host driven by configuration" in { + val router = system.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub") + router ! "" + expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c1" + router ! "" + expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c2" + } + } }