diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index d87d688231..6ccad2a95f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer)))) val latch1 = new TestLatch(1) - router.!((latch1, busy)) + router ! (latch1, busy) Await.ready(latch1, 2 seconds) val latch2 = new TestLatch(1) - router.!((latch2, busy)) + router ! (latch2, busy) Await.ready(latch2, 2 seconds) val latch3 = new TestLatch(1) - router.!((latch3, busy)) + router ! (latch3, busy) Await.ready(latch3, 2 seconds) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 12fada0880..077e69e5d9 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -12,6 +12,7 @@ import akka.dispatch.Await import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory +import java.util.concurrent.ConcurrentHashMap object RoutingSpec { @@ -256,6 +257,61 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } } + "smallest mailbox router" must { + "be started when constructed" in { + val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) + } + + "deliver messages to idle actor" in { + val usedActors = new ConcurrentHashMap[Int, String]() + val router = system.actorOf(Props(new Actor { + def receive = { + case (busy: TestLatch, receivedLatch: TestLatch) ⇒ + usedActors.put(0, self.path.toString) + self ! "another in busy mailbox" + receivedLatch.countDown() + Await.ready(busy, TestLatch.DefaultTimeout) + case (msg: Int, receivedLatch: TestLatch) ⇒ + usedActors.put(msg, self.path.toString) + receivedLatch.countDown() + case s: String ⇒ + } + }).withRouter(SmallestMailboxRouter(3))) + + val busy = TestLatch(1) + val received0 = TestLatch(1) + router ! (busy, received0) + Await.ready(received0, TestLatch.DefaultTimeout) + + val received1 = TestLatch(1) + router ! (1, received1) + Await.ready(received1, TestLatch.DefaultTimeout) + + val received2 = TestLatch(1) + router ! (2, received2) + Await.ready(received2, TestLatch.DefaultTimeout) + + val received3 = TestLatch(1) + router ! (3, received3) + Await.ready(received3, TestLatch.DefaultTimeout) + + busy.countDown() + + val busyPath = usedActors.get(0) + busyPath must not be (null) + + val path1 = usedActors.get(1) + val path2 = usedActors.get(2) + val path3 = usedActors.get(3) + + path1 must not be (busyPath) + path2 must not be (busyPath) + path3 must not be (busyPath) + + } + } + "broadcast router" must { "be started when constructed" in { val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1))) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 882fccea55..07e363fca9 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -64,7 +64,7 @@ akka { default { # routing (load-balance) scheme to use - # available: "from-code", "round-robin", "random", "scatter-gather", "broadcast" + # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast" # or: fully qualified class name of the router class # default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 35ec05432a..23c6da6661 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -71,12 +71,13 @@ class Deployer(val settings: ActorSystem.Settings) { } val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) } val recipe: Option[ActorRecipe] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 5e7c9ae701..f3065788ec 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -286,12 +286,15 @@ object RoundRobinRouter { * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the round robin should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RoundRobinLike { @@ -307,9 +310,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -361,12 +366,15 @@ object RandomRouter { * A Router that randomly selects one of the target connections to send a message to. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RandomLike { @@ -382,9 +390,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -424,6 +434,159 @@ trait RandomLike { this: RouterConfig ⇒ } } +object SmallestMailboxRouter { + def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } +} +/** + * A Router that tries to send to the non-suspended routee with fewest messages in mailbox. + * The selection is done in this order: + * + * + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ +case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) + extends RouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the routees to be used. + * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) + } + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) +} + +trait SmallestMailboxLike { this: RouterConfig ⇒ + + import java.security.SecureRandom + + def nrOfInstances: Int + + def routees: Iterable[String] + + private val random = new ThreadLocal[SecureRandom] { + override def initialValue = SecureRandom.getInstance("SHA1PRNG") + } + + /** + * Returns true if the actor is currently processing a message. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def isProcessingMessage(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ + val cell = x.underlying + cell.mailbox.isScheduled && cell.currentMessage != null + case _ ⇒ false + } + + /** + * Returns true if the actor currently has any pending messages + * in the mailbox, i.e. the mailbox is not empty. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def hasMessages(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages + case _ ⇒ false + } + + /** + * Returns true if the actor is currently suspended. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def isSuspended(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ + val cell = x.underlying + cell.mailbox.isSuspended + case _ ⇒ false + } + + /** + * Returns the number of pending messages in the mailbox of the actor. + * It will always return 0 for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def numberOfMessages(a: ActorRef): Int = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages + case _ ⇒ 0 + } + + def createRoute(props: Props, context: ActorContext): Route = { + val ref = context.self.asInstanceOf[RoutedActorRef] + createAndRegisterRoutees(props, context, nrOfInstances, routees) + + def getNext(): ActorRef = { + // non-local actors mailbox size is unknown, so consider them lowest priority + val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) ⇒ l } + // 1. anyone not processing message and with empty mailbox + activeLocal.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) getOrElse { + // 2. anyone with empty mailbox + activeLocal.find(a ⇒ !hasMessages(a)) getOrElse { + // 3. sort on mailbox size + activeLocal.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse { + // 4. no locals, just pick one, random + ref.routees(random.get.nextInt(ref.routees.size)) + } + } + } + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + object BroadcastRouter { def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString)) @@ -439,12 +602,15 @@ object BroadcastRouter { * A Router that uses broadcasts a message to all its connections. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with BroadcastLike { @@ -460,9 +626,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -507,12 +675,15 @@ object ScatterGatherFirstCompletedRouter { * Simple router that broadcasts the message to all routees, and replies with the first response. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, override val resizer: Option[Resizer] = None) @@ -529,9 +700,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String], w: Duration) = { - this(routees = iterableAsScalaIterable(t), within = w) + def this(routeePaths: java.lang.Iterable[String], w: Duration) = { + this(routees = iterableAsScalaIterable(routeePaths), within = w) } /** diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index c8d8b019bb..2125ae35a8 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -7,6 +7,7 @@ import akka.routing.ScatterGatherFirstCompletedRouter; import akka.routing.BroadcastRouter; import akka.routing.RandomRouter; import akka.routing.RoundRobinRouter; +import akka.routing.SmallestMailboxRouter; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; @@ -34,6 +35,14 @@ public class ParentActor extends UntypedActor { randomRouter.tell(i, getSelf()); } //#randomRouter + } else if (msg.equals("smr")) { + //#smallestMailboxRouter + ActorRef smallestMailboxRouter = getContext().actorOf( + new Props(PrintlnActor.class).withRouter(new SmallestMailboxRouter(5)), "router"); + for (int i = 1; i <= 10; i++) { + smallestMailboxRouter.tell(i, getSelf()); + } + //#smallestMailboxRouter } else if (msg.equals("br")) { //#broadcastRouter ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)), diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index e80514a8fe..cdcc869b2a 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -44,9 +45,8 @@ You can also give the router already created routees as in: When you create a router programatically you define the number of routees *or* you pass already created routees to it. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. -*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in -the configuration file then this value will be used instead of any programmatically sent parameters, but you must -also define the ``router`` property in the configuration.* +*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used +instead of any programmatically sent parameters.* Once you have the router actor it is just to send messages to it as you would to any actor: @@ -122,6 +122,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the non-suspended routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/jrouting/ParentActor.java#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index d688da6544..3a9f566ed8 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -8,6 +8,7 @@ import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ import akka.dispatch.Await +import akka.routing.SmallestMailboxRouter case class FibonacciNumber(nbr: Int) @@ -59,6 +60,14 @@ class ParentActor extends Actor { i ⇒ randomRouter ! i } //#randomRouter + case "smr" ⇒ + //#smallestMailboxRouter + val smallestMailboxRouter = + context.actorOf(Props[PrintlnActor].withRouter(SmallestMailboxRouter(5)), "router") + 1 to 10 foreach { + i ⇒ smallestMailboxRouter ! i + } + //#smallestMailboxRouter case "br" ⇒ //#broadcastRouter val broadcastRouter = diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index ad06b67b8b..4e75be8798 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -44,9 +45,8 @@ You can also give the router already created routees as in: When you create a router programatically you define the number of routees *or* you pass already created routees to it. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. -*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in -the configuration file then this value will be used instead of any programmatically sent parameters, but you must -also define the ``router`` property in the configuration.* +*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used +instead of any programmatically sent parameters.* Once you have the router actor it is just to send messages to it as you would to any actor: @@ -123,6 +123,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the non-suspended routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/routing/RouterTypeExample.scala#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index d8f466c9d2..fe6844b8dc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -28,6 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings val r = deploy.routing match { case RoundRobinRouter(x, _, resizer) ⇒ RemoteRoundRobinRouter(x, nodes, resizer) case RandomRouter(x, _, resizer) ⇒ RemoteRandomRouter(x, nodes, resizer) + case SmallestMailboxRouter(x, _, resizer) ⇒ RemoteSmallestMailboxRouter(x, nodes, resizer) case BroadcastRouter(x, _, resizer) ⇒ RemoteBroadcastRouter(x, nodes, resizer) case ScatterGatherFirstCompletedRouter(x, _, w, resizer) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer) } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala index 52b2d05618..83a64d09a7 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -82,6 +82,33 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], ove def this(resizer: Resizer) = this(0, Nil, Some(resizer)) } +/** + * A Router that tries to send to routee with fewest messages in mailbox. + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the random router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) + extends RemoteRouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets the routees to be used. + * Java API + */ + def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(0, Nil, Some(resizer)) +} + /** * A Router that uses broadcasts a message to all its connections. *