Merge pull request #209 from jboner/wip-1619-SmallestMailboxRouter-patriknw

Implemented SmallestMailboxRouter. See #1619
This commit is contained in:
patriknw 2012-01-12 05:26:39 -08:00
commit 1f3926fa0e
11 changed files with 346 additions and 40 deletions

View file

@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
val latch1 = new TestLatch(1)
router.!((latch1, busy))
router ! (latch1, busy)
Await.ready(latch1, 2 seconds)
val latch2 = new TestLatch(1)
router.!((latch2, busy))
router ! (latch2, busy)
Await.ready(latch2, 2 seconds)
val latch3 = new TestLatch(1)
router.!((latch3, busy))
router ! (latch3, busy)
Await.ready(latch3, 2 seconds)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)

View file

@ -12,6 +12,7 @@ import akka.dispatch.Await
import akka.util.Duration
import akka.config.ConfigurationException
import com.typesafe.config.ConfigFactory
import java.util.concurrent.ConcurrentHashMap
object RoutingSpec {
@ -256,6 +257,61 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
}
"smallest mailbox router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1)))
routedActor.isTerminated must be(false)
}
"deliver messages to idle actor" in {
val usedActors = new ConcurrentHashMap[Int, String]()
val router = system.actorOf(Props(new Actor {
def receive = {
case (busy: TestLatch, receivedLatch: TestLatch)
usedActors.put(0, self.path.toString)
self ! "another in busy mailbox"
receivedLatch.countDown()
Await.ready(busy, TestLatch.DefaultTimeout)
case (msg: Int, receivedLatch: TestLatch)
usedActors.put(msg, self.path.toString)
receivedLatch.countDown()
case s: String
}
}).withRouter(SmallestMailboxRouter(3)))
val busy = TestLatch(1)
val received0 = TestLatch(1)
router ! (busy, received0)
Await.ready(received0, TestLatch.DefaultTimeout)
val received1 = TestLatch(1)
router ! (1, received1)
Await.ready(received1, TestLatch.DefaultTimeout)
val received2 = TestLatch(1)
router ! (2, received2)
Await.ready(received2, TestLatch.DefaultTimeout)
val received3 = TestLatch(1)
router ! (3, received3)
Await.ready(received3, TestLatch.DefaultTimeout)
busy.countDown()
val busyPath = usedActors.get(0)
busyPath must not be (null)
val path1 = usedActors.get(1)
val path2 = usedActors.get(2)
val path3 = usedActors.get(3)
path1 must not be (busyPath)
path2 must not be (busyPath)
path3 must not be (busyPath)
}
}
"broadcast router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1)))

View file

@ -64,7 +64,7 @@ akka {
default {
# routing (load-balance) scheme to use
# available: "from-code", "round-robin", "random", "scatter-gather", "broadcast"
# available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast"
# or: fully qualified class name of the router class
# default is "from-code";
# Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter).

View file

@ -74,6 +74,7 @@ class Deployer(val settings: ActorSystem.Settings) {
case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer)
case "smallest-mailbox" SmallestMailboxRouter(nrOfInstances, routees, resizer)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case x throw new ConfigurationException("unknown router type " + x + " for path " + key)

View file

@ -286,12 +286,15 @@ object RoundRobinRouter {
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the round robin should both create new actors and use the 'routees' actor(s).
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RoundRobinLike {
@ -307,9 +310,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(t: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(t))
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@ -361,12 +366,15 @@ object RandomRouter {
* A Router that randomly selects one of the target connections to send a message to.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RandomLike {
@ -382,9 +390,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(t: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(t))
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@ -424,6 +434,159 @@ trait RandomLike { this: RouterConfig ⇒
}
}
object SmallestMailboxRouter {
def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = {
import scala.collection.JavaConverters._
apply(routees.asScala)
}
}
/**
* A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
* The selection is done in this order:
* <ul>
* <li>pick any idle routee (not processing message) with empty mailbox</li>
* <li>pick any routee with empty mailbox</li>
* <li>pick routee with fewest pending messages in mailbox</li>
* <li>pick any remote routee, remote actors are consider lowest priority,
* since their mailbox size is unknown</li>
* </ul>
*
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with SmallestMailboxLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = {
this(nrOfInstances = nr)
}
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
}
trait SmallestMailboxLike { this: RouterConfig
import java.security.SecureRandom
def nrOfInstances: Int
def routees: Iterable[String]
private val random = new ThreadLocal[SecureRandom] {
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
}
/**
* Returns true if the actor is currently processing a message.
* It will always return false for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def isProcessingMessage(a: ActorRef): Boolean = a match {
case x: LocalActorRef
val cell = x.underlying
cell.mailbox.isScheduled && cell.currentMessage != null
case _ false
}
/**
* Returns true if the actor currently has any pending messages
* in the mailbox, i.e. the mailbox is not empty.
* It will always return false for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def hasMessages(a: ActorRef): Boolean = a match {
case x: LocalActorRef x.underlying.mailbox.hasMessages
case _ false
}
/**
* Returns true if the actor is currently suspended.
* It will always return false for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def isSuspended(a: ActorRef): Boolean = a match {
case x: LocalActorRef
val cell = x.underlying
cell.mailbox.isSuspended
case _ false
}
/**
* Returns the number of pending messages in the mailbox of the actor.
* It will always return 0 for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def numberOfMessages(a: ActorRef): Int = a match {
case x: LocalActorRef x.underlying.mailbox.numberOfMessages
case _ 0
}
def createRoute(props: Props, context: ActorContext): Route = {
val ref = context.self.asInstanceOf[RoutedActorRef]
createAndRegisterRoutees(props, context, nrOfInstances, routees)
def getNext(): ActorRef = {
// non-local actors mailbox size is unknown, so consider them lowest priority
val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) l }
// 1. anyone not processing message and with empty mailbox
activeLocal.find(a !isProcessingMessage(a) && !hasMessages(a)) getOrElse {
// 2. anyone with empty mailbox
activeLocal.find(a !hasMessages(a)) getOrElse {
// 3. sort on mailbox size
activeLocal.sortBy(a numberOfMessages(a)).headOption getOrElse {
// 4. no locals, just pick one, random
ref.routees(random.get.nextInt(ref.routees.size))
}
}
}
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, ref.routees)
case msg List(Destination(sender, getNext()))
}
}
}
}
object BroadcastRouter {
def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString))
@ -439,12 +602,15 @@ object BroadcastRouter {
* A Router that uses broadcasts a message to all its connections.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with BroadcastLike {
@ -460,9 +626,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(t: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(t))
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@ -507,12 +675,15 @@ object ScatterGatherFirstCompletedRouter {
* Simple router that broadcasts the message to all routees, and replies with the first response.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None)
@ -529,9 +700,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(t: java.lang.Iterable[String], w: Duration) = {
this(routees = iterableAsScalaIterable(t), within = w)
def this(routeePaths: java.lang.Iterable[String], w: Duration) = {
this(routees = iterableAsScalaIterable(routeePaths), within = w)
}
/**

View file

@ -7,6 +7,7 @@ import akka.routing.ScatterGatherFirstCompletedRouter;
import akka.routing.BroadcastRouter;
import akka.routing.RandomRouter;
import akka.routing.RoundRobinRouter;
import akka.routing.SmallestMailboxRouter;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
@ -34,6 +35,14 @@ public class ParentActor extends UntypedActor {
randomRouter.tell(i, getSelf());
}
//#randomRouter
} else if (msg.equals("smr")) {
//#smallestMailboxRouter
ActorRef smallestMailboxRouter = getContext().actorOf(
new Props(PrintlnActor.class).withRouter(new SmallestMailboxRouter(5)), "router");
for (int i = 1; i <= 10; i++) {
smallestMailboxRouter.tell(i, getSelf());
}
//#smallestMailboxRouter
} else if (msg.equals("br")) {
//#broadcastRouter
ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)),

View file

@ -16,11 +16,12 @@ Router
A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'.
Akka comes with four defined routers out of the box, but as you will see in this chapter it
is really easy to create your own. The four routers shipped with Akka are:
Akka comes with some defined routers out of the box, but as you will see in this chapter it
is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.RoundRobinRouter``
* ``akka.routing.RandomRouter``
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
@ -44,9 +45,8 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in
the configuration file then this value will be used instead of any programmatically sent parameters, but you must
also define the ``router`` property in the configuration.*
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
instead of any programmatically sent parameters.*
Once you have the router actor it is just to send messages to it as you would to any actor:
@ -122,6 +122,21 @@ When run you should see a similar output to this:
The result from running the random router should be different, or at least random, every time you run it.
Try to run it a couple of times to verify its behavior if you don't trust us.
SmallestMailboxRouter
*********************
A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
The selection is done in this order:
* pick any idle routee (not processing message) with empty mailbox
* pick any routee with empty mailbox
* pick routee with fewest pending messages in mailbox
* pick any remote routee, remote actors are consider lowest priority,
since their mailbox size is unknown
Code example:
.. includecode:: code/akka/docs/jrouting/ParentActor.java#smallestMailboxRouter
BroadcastRouter
***************
A broadcast router forwards the message it receives to *all* its routees.

View file

@ -8,6 +8,7 @@ import annotation.tailrec
import akka.actor.{ Props, Actor }
import akka.util.duration._
import akka.dispatch.Await
import akka.routing.SmallestMailboxRouter
case class FibonacciNumber(nbr: Int)
@ -59,6 +60,14 @@ class ParentActor extends Actor {
i randomRouter ! i
}
//#randomRouter
case "smr"
//#smallestMailboxRouter
val smallestMailboxRouter =
context.actorOf(Props[PrintlnActor].withRouter(SmallestMailboxRouter(5)), "router")
1 to 10 foreach {
i smallestMailboxRouter ! i
}
//#smallestMailboxRouter
case "br"
//#broadcastRouter
val broadcastRouter =

View file

@ -16,11 +16,12 @@ Router
A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'.
Akka comes with four defined routers out of the box, but as you will see in this chapter it
is really easy to create your own. The four routers shipped with Akka are:
Akka comes with some defined routers out of the box, but as you will see in this chapter it
is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.RoundRobinRouter``
* ``akka.routing.RandomRouter``
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
@ -44,9 +45,8 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in
the configuration file then this value will be used instead of any programmatically sent parameters, but you must
also define the ``router`` property in the configuration.*
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
instead of any programmatically sent parameters.*
Once you have the router actor it is just to send messages to it as you would to any actor:
@ -123,6 +123,21 @@ When run you should see a similar output to this:
The result from running the random router should be different, or at least random, every time you run it.
Try to run it a couple of times to verify its behavior if you don't trust us.
SmallestMailboxRouter
*********************
A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
The selection is done in this order:
* pick any idle routee (not processing message) with empty mailbox
* pick any routee with empty mailbox
* pick routee with fewest pending messages in mailbox
* pick any remote routee, remote actors are consider lowest priority,
since their mailbox size is unknown
Code example:
.. includecode:: code/akka/docs/routing/RouterTypeExample.scala#smallestMailboxRouter
BroadcastRouter
***************
A broadcast router forwards the message it receives to *all* its routees.

View file

@ -28,6 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings
val r = deploy.routing match {
case RoundRobinRouter(x, _, resizer) RemoteRoundRobinRouter(x, nodes, resizer)
case RandomRouter(x, _, resizer) RemoteRandomRouter(x, nodes, resizer)
case SmallestMailboxRouter(x, _, resizer) RemoteSmallestMailboxRouter(x, nodes, resizer)
case BroadcastRouter(x, _, resizer) RemoteBroadcastRouter(x, nodes, resizer)
case ScatterGatherFirstCompletedRouter(x, _, w, resizer) RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer)
}

View file

@ -82,6 +82,33 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], ove
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
}
/**
* A Router that tries to send to routee with fewest messages in mailbox.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with SmallestMailboxLike {
/**
* Constructor that sets the routees to be used.
* Java API
*/
def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
}
/**
* A Router that uses broadcasts a message to all its connections.
* <br>