Handle remote routers transparently. See #1606

* RemoteRouterConfig wrapper with RemoteRouteeProvider instead if fixed remote routers.
* Had to refactor and introduce RouteeProvider for different implementations of how to create routees.
* Works with Resizer also.
* Added some tests.
This commit is contained in:
Patrik Nordwall 2012-01-17 08:45:07 +01:00
parent 0cf5c22eac
commit e7a0247c0d
6 changed files with 189 additions and 243 deletions

View file

@ -45,9 +45,9 @@ object RoutingSpec {
class MyRouter(config: Config) extends RouterConfig { class MyRouter(config: Config) extends RouterConfig {
val foo = config.getString("foo") val foo = config.getString("foo")
def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
val routees = IndexedSeq(actorContext.actorOf(Props[Echo])) val routees = IndexedSeq(routeeProvider.context.actorOf(Props[Echo]))
registerRoutees(actorContext, routees) routeeProvider.registerRoutees(routees)
{ {
case (sender, message) Nil case (sender, message) Nil
@ -542,13 +542,13 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
case class VoteCountRouter() extends RouterConfig { case class VoteCountRouter() extends RouterConfig {
//#crRoute //#crRoute
def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
val democratActor = actorContext.actorOf(Props(new DemocratActor()), "d") val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d")
val republicanActor = actorContext.actorOf(Props(new RepublicanActor()), "r") val republicanActor = routeeProvider.context.actorOf(Props(new RepublicanActor()), "r")
val routees = Vector[ActorRef](democratActor, republicanActor) val routees = Vector[ActorRef](democratActor, republicanActor)
//#crRegisterRoutees //#crRegisterRoutees
registerRoutees(actorContext, routees) routeeProvider.registerRoutees(routees)
//#crRegisterRoutees //#crRegisterRoutees
//#crRoutingLogic //#crRoutingLogic

View file

@ -44,7 +44,8 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
abandonedRoutees foreach underlying.unwatch abandonedRoutees foreach underlying.unwatch
} }
val route = _props.routerConfig.createRoute(routeeProps, actorContext) private val routeeProvider = _props.routerConfig.createRouteeProvider(this, actorContext)
val route = _props.routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send // initial resize, before message send
resize() resize()
@ -91,7 +92,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
for (r _props.routerConfig.resizer) { for (r _props.routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) {
try { try {
r.resize(routeeProps, actorContext, routees, _props.routerConfig) r.resize(routeeProps, routeeProvider)
} finally { } finally {
resizeProgress.set(false) resizeProgress.set(false)
} }
@ -120,7 +121,10 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
*/ */
trait RouterConfig { trait RouterConfig {
def createRoute(routeeProps: Props, actorContext: ActorContext): Route def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route
protected[akka] def createRouteeProvider(ref: RoutedActorRef, context: ActorContext) =
new RouteeProvider(ref, context, resizer)
def createActor(): Router = new Router {} def createActor(): Router = new Router {}
@ -134,32 +138,6 @@ trait RouterConfig {
protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _)) protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _))
def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(props))(scala.collection.breakOut)
case (_, xs) xs.map(context.actorFor(_))(scala.collection.breakOut)
}
protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Unit = {
if (resizer.isEmpty) {
registerRoutees(context, createRoutees(props, context, nrOfInstances, routees))
}
}
/**
* Adds new routees to the router.
*/
def registerRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = {
context.self.asInstanceOf[RoutedActorRef].addRoutees(routees)
}
/**
* Removes routees from the router. This method doesn't stop the routees.
*/
def unregisterRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = {
context.self.asInstanceOf[RoutedActorRef].removeRoutees(routees)
}
/** /**
* Routers with dynamically resizable number of routees return the [[akka.routing.Resizer]] * Routers with dynamically resizable number of routees return the [[akka.routing.Resizer]]
* to use. * to use.
@ -168,26 +146,69 @@ trait RouterConfig {
} }
/**
* Factory and registry for routees of the router.
* Uses `context.actorOf` to create routees from nrOfInstances property
* and `context.actorFor` lookup routees from paths.
*/
class RouteeProvider(ref: RoutedActorRef, val context: ActorContext, val resizer: Option[Resizer]) {
/**
* Adds new routees to the router.
*/
def registerRoutees(routees: IndexedSeq[ActorRef]): Unit = {
context.self.asInstanceOf[RoutedActorRef].addRoutees(routees)
}
/**
* Adds new routees to the router.
* Java API.
*/
protected def registerRoutees(routees: java.util.List[ActorRef]): Unit = {
import scala.collection.JavaConverters._
registerRoutees(routees.asScala.toIndexedSeq)
}
/**
* Removes routees from the router. This method doesn't stop the routees.
*/
def unregisterRoutees(routees: IndexedSeq[ActorRef]): Unit = {
context.self.asInstanceOf[RoutedActorRef].removeRoutees(routees)
}
def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(props))(scala.collection.breakOut)
case (_, xs) xs.map(context.actorFor(_))(scala.collection.breakOut)
}
def createAndRegisterRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): Unit = {
if (resizer.isEmpty) {
registerRoutees(createRoutees(props, nrOfInstances, routees))
}
}
/**
* All routees of the router
*/
def routees: IndexedSeq[ActorRef] = ref.routees
}
/** /**
* Java API for a custom router factory. * Java API for a custom router factory.
* @see akka.routing.RouterConfig * @see akka.routing.RouterConfig
*/ */
abstract class CustomRouterConfig extends RouterConfig { abstract class CustomRouterConfig extends RouterConfig {
override def createRoute(props: Props, context: ActorContext): Route = { override def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
// as a bonus, this prevents closing of props and context in the returned Route PartialFunction // as a bonus, this prevents closing of props and context in the returned Route PartialFunction
val customRoute = createCustomRoute(props, context) val customRoute = createCustomRoute(props, routeeProvider)
{ {
case (sender, message) customRoute.destinationsFor(sender, message) case (sender, message) customRoute.destinationsFor(sender, message)
} }
} }
def createCustomRoute(props: Props, context: ActorContext): CustomRoute def createCustomRoute(props: Props, routeeProvider: RouteeProvider): CustomRoute
protected def registerRoutees(context: ActorContext, routees: java.util.List[ActorRef]): Unit = {
import scala.collection.JavaConverters._
registerRoutees(context, routees.asScala.toIndexedSeq)
}
} }
@ -254,23 +275,23 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
* Oxymoron style. * Oxymoron style.
*/ */
case object NoRouter extends RouterConfig { case object NoRouter extends RouterConfig {
def createRoute(props: Props, actorContext: ActorContext): Route = null def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
} }
/** /**
* Router configuration which has no default, i.e. external configuration is required. * Router configuration which has no default, i.e. external configuration is required.
*/ */
case object FromConfig extends RouterConfig { case object FromConfig extends RouterConfig {
def createRoute(props: Props, actorContext: ActorContext): Route = def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + actorContext.self + " needs external configuration from file (e.g. application.conf)") throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
} }
/** /**
* Java API: Router configuration which has no default, i.e. external configuration is required. * Java API: Router configuration which has no default, i.e. external configuration is required.
*/ */
case class FromConfig() extends RouterConfig { case class FromConfig() extends RouterConfig {
def createRoute(props: Props, actorContext: ActorContext): Route = def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + actorContext.self + " needs external configuration from file (e.g. application.conf)") throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
} }
object RoundRobinRouter { object RoundRobinRouter {
@ -332,21 +353,20 @@ trait RoundRobinLike { this: RouterConfig ⇒
def routees: Iterable[String] def routees: Iterable[String]
def createRoute(props: Props, context: ActorContext): Route = { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
createAndRegisterRoutees(props, context, nrOfInstances, routees) routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
val ref = context.self.asInstanceOf[RoutedActorRef]
val next = new AtomicLong(0) val next = new AtomicLong(0)
def getNext(): ActorRef = { def getNext(): ActorRef = {
val _routees = ref.routees val _routees = routeeProvider.routees
_routees((next.getAndIncrement % _routees.size).asInstanceOf[Int]) _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int])
} }
{ {
case (sender, message) case (sender, message)
message match { message match {
case Broadcast(msg) toAll(sender, ref.routees) case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext())) case msg List(Destination(sender, getNext()))
} }
} }
@ -418,18 +438,18 @@ trait RandomLike { this: RouterConfig ⇒
override def initialValue = SecureRandom.getInstance("SHA1PRNG") override def initialValue = SecureRandom.getInstance("SHA1PRNG")
} }
def createRoute(props: Props, context: ActorContext): Route = { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
val ref = context.self.asInstanceOf[RoutedActorRef] routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
createAndRegisterRoutees(props, context, nrOfInstances, routees)
def getNext(): ActorRef = { def getNext(): ActorRef = {
ref.routees(random.get.nextInt(ref.routees.size)) val _routees = routeeProvider.routees
_routees(random.get.nextInt(_routees.size))
} }
{ {
case (sender, message) case (sender, message)
message match { message match {
case Broadcast(msg) toAll(sender, ref.routees) case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext())) case msg List(Destination(sender, getNext()))
} }
} }
@ -559,13 +579,12 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
case _ 0 case _ 0
} }
def createRoute(props: Props, context: ActorContext): Route = { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
val ref = context.self.asInstanceOf[RoutedActorRef] routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
createAndRegisterRoutees(props, context, nrOfInstances, routees)
def getNext(): ActorRef = { def getNext(): ActorRef = {
// non-local actors mailbox size is unknown, so consider them lowest priority // non-local actors mailbox size is unknown, so consider them lowest priority
val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) l } val activeLocal = routeeProvider.routees collect { case l: LocalActorRef if !isSuspended(l) l }
// 1. anyone not processing message and with empty mailbox // 1. anyone not processing message and with empty mailbox
activeLocal.find(a !isProcessingMessage(a) && !hasMessages(a)) getOrElse { activeLocal.find(a !isProcessingMessage(a) && !hasMessages(a)) getOrElse {
// 2. anyone with empty mailbox // 2. anyone with empty mailbox
@ -573,7 +592,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
// 3. sort on mailbox size // 3. sort on mailbox size
activeLocal.sortBy(a numberOfMessages(a)).headOption getOrElse { activeLocal.sortBy(a numberOfMessages(a)).headOption getOrElse {
// 4. no locals, just pick one, random // 4. no locals, just pick one, random
ref.routees(random.get.nextInt(ref.routees.size)) val _routees = routeeProvider.routees
_routees(random.get.nextInt(_routees.size))
} }
} }
} }
@ -582,7 +602,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
{ {
case (sender, message) case (sender, message)
message match { message match {
case Broadcast(msg) toAll(sender, ref.routees) case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext())) case msg List(Destination(sender, getNext()))
} }
} }
@ -649,14 +669,13 @@ trait BroadcastLike { this: RouterConfig ⇒
def routees: Iterable[String] def routees: Iterable[String]
def createRoute(props: Props, context: ActorContext): Route = { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
val ref = context.self.asInstanceOf[RoutedActorRef] routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
createAndRegisterRoutees(props, context, nrOfInstances, routees)
{ {
case (sender, message) case (sender, message)
message match { message match {
case _ toAll(sender, ref.routees) case _ toAll(sender, routeeProvider.routees)
} }
} }
} }
@ -724,16 +743,15 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
def within: Duration def within: Duration
def createRoute(props: Props, context: ActorContext): Route = { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
val ref = context.self.asInstanceOf[RoutedActorRef] routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
createAndRegisterRoutees(props, context, nrOfInstances, routees)
{ {
case (sender, message) case (sender, message)
val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get val asker = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get
asker.result.pipeTo(sender) asker.result.pipeTo(sender)
message match { message match {
case _ toAll(asker, ref.routees) case _ toAll(asker, routeeProvider.routees)
} }
} }
} }
@ -755,11 +773,11 @@ trait Resizer {
/** /**
* Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize` * Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize`
* returns true and no other resize is in progress. * returns true and no other resize is in progress.
* Create and register more routees with `routerConfig.registerRoutees(actorContext, newRoutees) * Create and register more routees with `routeeProvider.registerRoutees(newRoutees)
* or remove routees with `routerConfig.unregisterRoutees(actorContext, abandonedRoutees)` and * or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and
* sending [[akka.actor.PoisonPill]] to them. * sending [[akka.actor.PoisonPill]] to them.
*/ */
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) def resize(props: Props, routeeProvider: RouteeProvider)
} }
case object DefaultResizer { case object DefaultResizer {
@ -849,16 +867,17 @@ case class DefaultResizer(
def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0) def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0)
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) { def resize(props: Props, routeeProvider: RouteeProvider) {
val currentRoutees = routeeProvider.routees
val requestedCapacity = capacity(currentRoutees) val requestedCapacity = capacity(currentRoutees)
if (requestedCapacity > 0) { if (requestedCapacity > 0) {
val newRoutees = routerConfig.createRoutees(props, actorContext, requestedCapacity, Nil) val newRoutees = routeeProvider.createRoutees(props, requestedCapacity, Nil)
routerConfig.registerRoutees(actorContext, newRoutees) routeeProvider.registerRoutees(newRoutees)
} else if (requestedCapacity < 0) { } else if (requestedCapacity < 0) {
val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity) val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity)
routerConfig.unregisterRoutees(actorContext, abandon) routeeProvider.unregisterRoutees(abandon)
delayedStop(actorContext.system.scheduler, abandon) delayedStop(routeeProvider.context.system.scheduler, abandon)
} }
} }

View file

@ -107,13 +107,13 @@ public class CustomRouterDocTestBase {
//#crRoute //#crRoute
@Override @Override
public CustomRoute createCustomRoute(Props props, ActorContext context) { public CustomRoute createCustomRoute(Props props, RouteeProvider routeeProvider) {
final ActorRef democratActor = context.actorOf(new Props(DemocratActor.class), "d"); final ActorRef democratActor = routeeProvider.context().actorOf(new Props(DemocratActor.class), "d");
final ActorRef republicanActor = context.actorOf(new Props(RepublicanActor.class), "r"); final ActorRef republicanActor = routeeProvider.context().actorOf(new Props(RepublicanActor.class), "r");
List<ActorRef> routees = Arrays.asList(new ActorRef[] { democratActor, republicanActor }); List<ActorRef> routees = Arrays.asList(new ActorRef[] { democratActor, republicanActor });
//#crRegisterRoutees //#crRegisterRoutees
registerRoutees(context, routees); routeeProvider.registerRoutees(routees);
//#crRegisterRoutees //#crRegisterRoutees
//#crRoutingLogic //#crRoutingLogic

View file

@ -14,7 +14,6 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings
override protected def parseConfig(path: String, config: Config): Option[Deploy] = { override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.util.ReflectiveAccess._
super.parseConfig(path, config) match { super.parseConfig(path, config) match {
case d @ Some(deploy) case d @ Some(deploy)
@ -25,13 +24,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings
val nodes = deploy.config.getStringList("target.nodes").asScala val nodes = deploy.config.getStringList("target.nodes").asScala
if (nodes.isEmpty || deploy.routing == NoRouter) d if (nodes.isEmpty || deploy.routing == NoRouter) d
else { else {
val r = deploy.routing match { val r = new RemoteRouterConfig(deploy.routing, nodes)
case RoundRobinRouter(x, _, resizer) RemoteRoundRobinRouter(x, nodes, resizer)
case RandomRouter(x, _, resizer) RemoteRandomRouter(x, nodes, resizer)
case SmallestMailboxRouter(x, _, resizer) RemoteSmallestMailboxRouter(x, nodes, resizer)
case BroadcastRouter(x, _, resizer) RemoteBroadcastRouter(x, nodes, resizer)
case ScatterGatherFirstCompletedRouter(x, _, w, resizer) RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer)
}
Some(deploy.copy(routing = r)) Some(deploy.copy(routing = r))
} }
} }

View file

@ -3,163 +3,71 @@
*/ */
package akka.routing package akka.routing
import akka.actor._
import akka.remote._
import scala.collection.JavaConverters._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.ActorSystemImpl
import akka.actor.Deploy
import akka.actor.InternalActorRef
import akka.actor.Props
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.util.Duration import akka.remote.RemoteScope
import akka.remote.RemoteAddressExtractor
trait RemoteRouterConfig extends RouterConfig { /**
override def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { * [[akka.routing.RouterConfig]] implementation for remote deployment on defined
case (_, Nil) throw new ConfigurationException("must specify list of remote nodes") * target nodes. Delegates other duties to the local [[akka.routing.RouterConfig]],
case (n, xs) * which makes it possible to mix this with the built-in routers such as
val nodes = routees map { * [[akka.routing.RoundRobinRouter]] or custom routers.
case RemoteAddressExtractor(a) a */
case x throw new ConfigurationException("unparseable remote node " + x) class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends RouterConfig {
}
val node = Stream.continually(nodes).flatten.iterator override protected[akka] def createRouteeProvider(ref: RoutedActorRef, context: ActorContext) =
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 new RemoteRouteeProvider(nodes, ref, context, resizer)
IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i override def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next)) local.createRoute(routeeProps, routeeProvider)
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy))
})
} }
override def createActor(): Router = local.createActor()
override def resizer: Option[Resizer] = local.resizer
} }
/** /**
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. * Factory and registry for routees of the router.
* <br> * Deploys new routees on the specified `nodes`, round-robin.
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means *
* that the round robin should both create new actors and use the 'routees' actor(s). * Routee paths may not be combined with remote target nodes.
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) class RemoteRouteeProvider(nodes: Iterable[String], _ref: RoutedActorRef, _context: ActorContext, _resizer: Option[Resizer])
extends RemoteRouterConfig with RoundRobinLike { extends RouteeProvider(_ref, _context, _resizer) {
/** // need this iterator as instance variable since Resizer may call createRoutees several times
* Constructor that sets the routees to be used. private val nodeAddressIter = {
* Java API val nodeAddresses = nodes map {
*/ case RemoteAddressExtractor(a) a
def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) case x throw new ConfigurationException("unparseable remote node " + x)
}
Stream.continually(nodeAddresses).flatten.iterator
}
/** override def createRoutees(props: Props, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] =
* Constructor that sets the resizer to be used. (nrOfInstances, routees, nodes) match {
* Java API case (_, _, Nil) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]"
*/ format context.self.path.toString)
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
case (n, Nil, ys)
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i
val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(nodeAddressIter.next))
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy))
})
case (_, xs, _) throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"
format context.self.path.toString)
}
} }
/**
* A Router that randomly selects one of the target connections to send a message to.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with RandomLike {
/**
* Constructor that sets the routees to be used.
* Java API
*/
def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
}
/**
* A Router that tries to send to routee with fewest messages in mailbox.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with SmallestMailboxLike {
/**
* Constructor that sets the routees to be used.
* Java API
*/
def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
}
/**
* A Router that uses broadcasts a message to all its connections.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with BroadcastLike {
/**
* Constructor that sets the routees to be used.
* Java API
*/
def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
}
/**
* Simple router that broadcasts the message to all routees, and replies with the first response.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration,
override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with ScatterGatherFirstCompletedLike {
/**
* Constructor that sets the routees to be used.
* Java API
*/
def this(n: Int, t: java.lang.Iterable[String], w: Duration) = this(n, t.asScala, w)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer, w: Duration) = this(0, Nil, w, Some(resizer))
}

View file

@ -16,6 +16,7 @@ object RemoteRouterSpec {
} }
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteRouterSpec extends AkkaSpec(""" class RemoteRouterSpec extends AkkaSpec("""
akka { akka {
actor.provider = "akka.remote.RemoteActorRefProvider" actor.provider = "akka.remote.RemoteActorRefProvider"
@ -26,10 +27,18 @@ akka {
} }
actor.deployment { actor.deployment {
/blub { /blub {
router = "round-robin" router = round-robin
nr-of-instances = 2 nr-of-instances = 2
target.nodes = ["akka://remote_sys@localhost:12346"] target.nodes = ["akka://remote_sys@localhost:12346"]
} }
/elastic-blub {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 3
}
target.nodes = ["akka://remote_sys@localhost:12346"]
}
} }
} }
""") with ImplicitSender { """) with ImplicitSender {
@ -53,6 +62,23 @@ akka {
expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2" expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2"
} }
"deploy its children on remote host driven by programatic definition" in {
val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2),
Seq("akka://remote_sys@localhost:12346"))), "blub2")
router ! ""
expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c1"
router ! ""
expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c2"
}
"deploy dynamic resizable number of children on remote host driven by configuration" in {
val router = system.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub")
router ! ""
expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c1"
router ! ""
expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c2"
}
} }
} }