From 6a12fb7876b38e0235a7a185abb8149de2ac4c26 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Jan 2012 13:56:38 +0100 Subject: [PATCH 1/2] Implemented SmallestMailboxRouter. See #1619 --- .../test/scala/akka/routing/RoutingSpec.scala | 51 ++++ akka-actor/src/main/resources/reference.conf | 2 +- .../src/main/scala/akka/actor/Deployer.scala | 13 +- .../src/main/scala/akka/routing/Routing.scala | 225 ++++++++++++++---- .../code/akka/docs/jrouting/ParentActor.java | 9 + akka-docs/java/routing.rst | 20 +- .../akka/docs/routing/RouterTypeExample.scala | 9 + akka-docs/scala/routing.rst | 20 +- .../scala/akka/remote/RemoteDeployer.scala | 1 + .../scala/akka/routing/RemoteRouters.scala | 27 +++ 10 files changed, 322 insertions(+), 55 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 12fada0880..9811313688 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -12,6 +12,7 @@ import akka.dispatch.Await import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory +import java.util.concurrent.ConcurrentHashMap object RoutingSpec { @@ -256,6 +257,56 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } } + "smallest mailbox router" must { + "be started when constructed" in { + val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) + } + + "deliver messages to idle actor" in { + val usedActors = new ConcurrentHashMap[Int, String]() + val router = system.actorOf(Props(new Actor { + def receive = { + case busy: TestLatch ⇒ + usedActors.put(0, self.path.toString) + Await.ready(busy, TestLatch.DefaultTimeout) + case (msg: Int, receivedLatch: TestLatch) ⇒ + usedActors.put(msg, self.path.toString) + receivedLatch.countDown() + } + }).withRouter(SmallestMailboxRouter(3))) + + val busy = TestLatch(1) + router ! busy + + val received1 = TestLatch(1) + router.!((1, received1)) + Await.ready(received1, TestLatch.DefaultTimeout) + + val received2 = TestLatch(1) + router.!((2, received2)) + Await.ready(received2, TestLatch.DefaultTimeout) + + val received3 = TestLatch(1) + router.!((3, received3)) + Await.ready(received3, TestLatch.DefaultTimeout) + + busy.countDown() + + val busyPath = usedActors.get(0) + busyPath must not be (null) + + val path1 = usedActors.get(1) + val path2 = usedActors.get(2) + val path3 = usedActors.get(3) + + path1 must not be (busyPath) + path2 must not be (busyPath) + path3 must not be (busyPath) + + } + } + "broadcast router" must { "be started when constructed" in { val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1))) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 882fccea55..07e363fca9 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -64,7 +64,7 @@ akka { default { # routing (load-balance) scheme to use - # available: "from-code", "round-robin", "random", "scatter-gather", "broadcast" + # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast" # or: fully qualified class name of the router class # default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 35ec05432a..23c6da6661 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -71,12 +71,13 @@ class Deployer(val settings: ActorSystem.Settings) { } val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) } val recipe: Option[ActorRecipe] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 69589ae651..f021eb867a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -286,7 +286,7 @@ object RoundRobinRouter { * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the round robin should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -361,7 +361,7 @@ object RandomRouter { * A Router that randomly selects one of the target connections to send a message to. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -424,6 +424,143 @@ trait RandomLike { this: RouterConfig ⇒ } } +object SmallestMailboxRouter { + def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } +} +/** + * A Router that tries to send to the routee with fewest messages in mailbox. + * The selection is done in this order: + * + * + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) + extends RouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the routees to be used. + * Java API + */ + def this(t: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(t)) + } + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) +} + +trait SmallestMailboxLike { this: RouterConfig ⇒ + + import java.security.SecureRandom + + def nrOfInstances: Int + + def routees: Iterable[String] + + private val random = new ThreadLocal[SecureRandom] { + override def initialValue = SecureRandom.getInstance("SHA1PRNG") + } + + /** + * Returns true if the actor is currently processing a message. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def isProcessingMessage(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ + val cell = x.underlying + cell.mailbox.isScheduled && cell.currentMessage != null + case _ ⇒ false + } + + /** + * Returns true if the actor currently has any pending messages + * in the mailbox, i.e. the mailbox is not empty. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def hasMessages(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages + case _ ⇒ false + } + + /** + * Returns the number of pending messages in the mailbox of the actor. + * It will always return 0 for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def numberOfMessages(a: ActorRef): Int = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages + case _ ⇒ 0 + } + + def createRoute(props: Props, context: ActorContext): Route = { + val ref = context.self.asInstanceOf[RoutedActorRef] + createAndRegisterRoutees(props, context, nrOfInstances, routees) + + def getNext(): ActorRef = { + // non-local actors mailbox size is unknown, so consider them lowest priority + val local: IndexedSeq[LocalActorRef] = for (a ← ref.routees if a.isInstanceOf[LocalActorRef]) yield a.asInstanceOf[LocalActorRef] + // anyone not processing message and with empty mailbox + val idle = local.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) + idle getOrElse { + // anyone with empty mailbox + val emptyMailbox = local.find(a ⇒ !hasMessages(a)) + emptyMailbox getOrElse { + // sort on mailbox size + local.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse { + // no locals, just pick one, random + ref.routees(random.get.nextInt(ref.routees.size)) + } + } + } + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + object BroadcastRouter { def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString)) @@ -439,7 +576,7 @@ object BroadcastRouter { * A Router that uses broadcasts a message to all its connections. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -507,7 +644,7 @@ object ScatterGatherFirstCompletedRouter { * Simple router that broadcasts the message to all routees, and replies with the first response. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -515,7 +652,7 @@ object ScatterGatherFirstCompletedRouter { * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, - override val resizer: Option[Resizer] = None) + override val resizer: Option[Resizer] = None) extends RouterConfig with ScatterGatherFirstCompletedLike { /** @@ -593,57 +730,57 @@ case class DefaultResizer( */ lowerBound: Int = 1, /** - * The most number of routees the router should ever have. - * Must be greater than or equal to `lowerBound`. - */ + * The most number of routees the router should ever have. + * Must be greater than or equal to `lowerBound`. + */ upperBound: Int = 10, /** - * Threshold to evaluate if routee is considered to be busy (under pressure). - * Implementation depends on this value (default is 1). - * - */ + * Threshold to evaluate if routee is considered to be busy (under pressure). + * Implementation depends on this value (default is 1). + * + */ pressureThreshold: Int = 1, /** - * Percentage to increase capacity whenever all routees are busy. - * For example, 0.2 would increase 20% (rounded up), i.e. if current - * capacity is 6 it will request an increase of 2 more routees. - */ + * Percentage to increase capacity whenever all routees are busy. + * For example, 0.2 would increase 20% (rounded up), i.e. if current + * capacity is 6 it will request an increase of 2 more routees. + */ rampupRate: Double = 0.2, /** - * Minimum fraction of busy routees before backing off. - * For example, if this is 0.3, then we'll remove some routees only when - * less than 30% of routees are busy, i.e. if current capacity is 10 and - * 3 are busy then the capacity is unchanged, but if 2 or less are busy - * the capacity is decreased. - * - * Use 0.0 or negative to avoid removal of routees. - */ + * Minimum fraction of busy routees before backing off. + * For example, if this is 0.3, then we'll remove some routees only when + * less than 30% of routees are busy, i.e. if current capacity is 10 and + * 3 are busy then the capacity is unchanged, but if 2 or less are busy + * the capacity is decreased. + * + * Use 0.0 or negative to avoid removal of routees. + */ backoffThreshold: Double = 0.3, /** - * Fraction of routees to be removed when the resizer reaches the - * backoffThreshold. - * For example, 0.1 would decrease 10% (rounded up), i.e. if current - * capacity is 9 it will request an decrease of 1 routee. - */ + * Fraction of routees to be removed when the resizer reaches the + * backoffThreshold. + * For example, 0.1 would decrease 10% (rounded up), i.e. if current + * capacity is 9 it will request an decrease of 1 routee. + */ backoffRate: Double = 0.1, /** - * When the resizer reduce the capacity the abandoned routee actors are stopped - * with PoisonPill after this delay. The reason for the delay is to give concurrent - * messages a chance to be placed in mailbox before sending PoisonPill. - * Use 0 seconds to skip delay. - */ + * When the resizer reduce the capacity the abandoned routee actors are stopped + * with PoisonPill after this delay. The reason for the delay is to give concurrent + * messages a chance to be placed in mailbox before sending PoisonPill. + * Use 0 seconds to skip delay. + */ stopDelay: Duration = 1.second, /** - * Number of messages between resize operation. - * Use 1 to resize before each message. - */ + * Number of messages between resize operation. + * Use 1 to resize before each message. + */ messagesPerResize: Int = 10) extends Resizer { /** diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index c8d8b019bb..2125ae35a8 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -7,6 +7,7 @@ import akka.routing.ScatterGatherFirstCompletedRouter; import akka.routing.BroadcastRouter; import akka.routing.RandomRouter; import akka.routing.RoundRobinRouter; +import akka.routing.SmallestMailboxRouter; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; @@ -34,6 +35,14 @@ public class ParentActor extends UntypedActor { randomRouter.tell(i, getSelf()); } //#randomRouter + } else if (msg.equals("smr")) { + //#smallestMailboxRouter + ActorRef smallestMailboxRouter = getContext().actorOf( + new Props(PrintlnActor.class).withRouter(new SmallestMailboxRouter(5)), "router"); + for (int i = 1; i <= 10; i++) { + smallestMailboxRouter.tell(i, getSelf()); + } + //#smallestMailboxRouter } else if (msg.equals("br")) { //#broadcastRouter ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)), diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index e80514a8fe..8cc5b94260 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -122,6 +123,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/jrouting/ParentActor.java#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index d688da6544..3a9f566ed8 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -8,6 +8,7 @@ import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ import akka.dispatch.Await +import akka.routing.SmallestMailboxRouter case class FibonacciNumber(nbr: Int) @@ -59,6 +60,14 @@ class ParentActor extends Actor { i ⇒ randomRouter ! i } //#randomRouter + case "smr" ⇒ + //#smallestMailboxRouter + val smallestMailboxRouter = + context.actorOf(Props[PrintlnActor].withRouter(SmallestMailboxRouter(5)), "router") + 1 to 10 foreach { + i ⇒ smallestMailboxRouter ! i + } + //#smallestMailboxRouter case "br" ⇒ //#broadcastRouter val broadcastRouter = diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index ad06b67b8b..8c0e0f7366 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -123,6 +124,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/routing/RouterTypeExample.scala#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index d8f466c9d2..fe6844b8dc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -28,6 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings val r = deploy.routing match { case RoundRobinRouter(x, _, resizer) ⇒ RemoteRoundRobinRouter(x, nodes, resizer) case RandomRouter(x, _, resizer) ⇒ RemoteRandomRouter(x, nodes, resizer) + case SmallestMailboxRouter(x, _, resizer) ⇒ RemoteSmallestMailboxRouter(x, nodes, resizer) case BroadcastRouter(x, _, resizer) ⇒ RemoteBroadcastRouter(x, nodes, resizer) case ScatterGatherFirstCompletedRouter(x, _, w, resizer) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer) } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala index 52b2d05618..83a64d09a7 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -82,6 +82,33 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], ove def this(resizer: Resizer) = this(0, Nil, Some(resizer)) } +/** + * A Router that tries to send to routee with fewest messages in mailbox. + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the random router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) + extends RemoteRouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets the routees to be used. + * Java API + */ + def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(0, Nil, Some(resizer)) +} + /** * A Router that uses broadcasts a message to all its connections. *
From 2399f02531ce1d8fda33f5f65bbf951f2671976a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 Jan 2012 09:53:53 +0100 Subject: [PATCH 2/2] Improvements based on feedback. See #1619 --- .../test/scala/akka/routing/ResizerSpec.scala | 6 +- .../test/scala/akka/routing/RoutingSpec.scala | 15 ++- .../src/main/scala/akka/routing/Routing.scala | 98 +++++++++++++------ akka-docs/java/routing.rst | 7 +- akka-docs/scala/routing.rst | 7 +- 5 files changed, 86 insertions(+), 47 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index d87d688231..6ccad2a95f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer)))) val latch1 = new TestLatch(1) - router.!((latch1, busy)) + router ! (latch1, busy) Await.ready(latch1, 2 seconds) val latch2 = new TestLatch(1) - router.!((latch2, busy)) + router ! (latch2, busy) Await.ready(latch2, 2 seconds) val latch3 = new TestLatch(1) - router.!((latch3, busy)) + router ! (latch3, busy) Await.ready(latch3, 2 seconds) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9811313688..077e69e5d9 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -267,28 +267,33 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with val usedActors = new ConcurrentHashMap[Int, String]() val router = system.actorOf(Props(new Actor { def receive = { - case busy: TestLatch ⇒ + case (busy: TestLatch, receivedLatch: TestLatch) ⇒ usedActors.put(0, self.path.toString) + self ! "another in busy mailbox" + receivedLatch.countDown() Await.ready(busy, TestLatch.DefaultTimeout) case (msg: Int, receivedLatch: TestLatch) ⇒ usedActors.put(msg, self.path.toString) receivedLatch.countDown() + case s: String ⇒ } }).withRouter(SmallestMailboxRouter(3))) val busy = TestLatch(1) - router ! busy + val received0 = TestLatch(1) + router ! (busy, received0) + Await.ready(received0, TestLatch.DefaultTimeout) val received1 = TestLatch(1) - router.!((1, received1)) + router ! (1, received1) Await.ready(received1, TestLatch.DefaultTimeout) val received2 = TestLatch(1) - router.!((2, received2)) + router ! (2, received2) Await.ready(received2, TestLatch.DefaultTimeout) val received3 = TestLatch(1) - router.!((3, received3)) + router ! (3, received3) Await.ready(received3, TestLatch.DefaultTimeout) busy.countDown() diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index f021eb867a..f3065788ec 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -290,8 +290,11 @@ object RoundRobinRouter { * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RoundRobinLike { @@ -307,9 +310,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -365,8 +370,11 @@ object RandomRouter { * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RandomLike { @@ -382,9 +390,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -436,7 +446,7 @@ object SmallestMailboxRouter { } } /** - * A Router that tries to send to the routee with fewest messages in mailbox. + * A Router that tries to send to the non-suspended routee with fewest messages in mailbox. * The selection is done in this order: *