New implementation of smallest mailbox router

This commit is contained in:
Viktor Klang 2012-03-14 10:42:18 +01:00
parent d1630e5f49
commit 1415617dee

View file

@ -17,6 +17,7 @@ import java.util.concurrent.locks.ReentrantLock
import akka.jsr166y.ThreadLocalRandom
import akka.util.Unsafe
import akka.dispatch.Dispatchers
import annotation.tailrec
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -312,9 +313,7 @@ trait Router extends Actor {
}: Receive) orElse routerReceive
def routerReceive: Receive = {
case _
}
def routerReceive: Receive = Actor.emptyBehavior
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
@ -804,22 +803,26 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
def getNext(): ActorRef = {
// non-local actors mailbox size is unknown, so consider them lowest priority
val activeLocal = routeeProvider.routees collect { case l: LocalActorRef if !isSuspended(l) l }
// 1. anyone not processing message and with empty mailbox
activeLocal.find(a !isProcessingMessage(a) && !hasMessages(a)) getOrElse {
// 2. anyone with empty mailbox
activeLocal.find(a !hasMessages(a)) getOrElse {
// 3. sort on mailbox size
activeLocal.sortBy(a numberOfMessages(a)).headOption getOrElse {
// 4. no locals, just pick one, random
val _routees = routeeProvider.routees
_routees(random.get.nextInt(_routees.size))
}
}
//Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found.
@tailrec def getNext(actives: IndexedSeq[ActorRef] = routeeProvider.routees, // Use current routees
proposedTarget: ActorRef = routeeProvider.context.system.deadLetters, // Fall back to deadLetters
currentScore: Long = Long.MaxValue, // Start at worst possible score
at: Int = 0, // Index to actives, start at 0
deep: Boolean = false): ActorRef = // Don't do deep inspection of mailbox size unless needed
if (at >= actives.size) { // End of actives, either start doing deep inspection, or go random if candidate is terminated
if (deep) { // IF at end of deep run, we're done, and anything is better than something terminated
if (proposedTarget.isTerminated) actives(random.get.nextInt(actives.size)) else proposedTarget
} else getNext(actives, proposedTarget, currentScore, 0, deep = true) //Commence deep run from start
} else actives(at) match { // Inspect current routee
case l: LocalActorRef if !isSuspended(l) // Just check live local ones
val newScore: Long = // A Message being processed = 1, plus mbox size, which is MaxValue - 2 if not deep, treating it as just better than deadLetters
(if (isProcessingMessage(l)) 1 else 0) +
(if (hasMessages(l)) if (deep) numberOfMessages(l) else (Long.MaxValue - 2) else 0)
if (newScore <= 0) l
else if (newScore < currentScore) getNext(actives, l, newScore, at + 1, deep)
else getNext(actives, proposedTarget, currentScore, at + 1, deep)
case _ getNext(actives, proposedTarget, currentScore, at + 1, deep)
}
}
{
case (sender, message)