Making sure that the code works....
This commit is contained in:
parent
1415617dee
commit
b7564d06ce
1 changed files with 39 additions and 24 deletions
|
|
@ -783,10 +783,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
|
|||
* routers based on mailbox and actor internal state.
|
||||
*/
|
||||
protected def isSuspended(a: ActorRef): Boolean = a match {
|
||||
case x: LocalActorRef ⇒
|
||||
val cell = x.underlying
|
||||
cell.mailbox.isSuspended
|
||||
case _ ⇒ false
|
||||
case x: LocalActorRef ⇒ x.underlying.mailbox.isSuspended
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -803,27 +801,44 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
|
|||
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
|
||||
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
|
||||
|
||||
//Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found.
|
||||
@tailrec def getNext(actives: IndexedSeq[ActorRef] = routeeProvider.routees, // Use current routees
|
||||
proposedTarget: ActorRef = routeeProvider.context.system.deadLetters, // Fall back to deadLetters
|
||||
currentScore: Long = Long.MaxValue, // Start at worst possible score
|
||||
at: Int = 0, // Index to actives, start at 0
|
||||
deep: Boolean = false): ActorRef = // Don't do deep inspection of mailbox size unless needed
|
||||
if (at >= actives.size) { // End of actives, either start doing deep inspection, or go random if candidate is terminated
|
||||
if (deep) { // IF at end of deep run, we're done, and anything is better than something terminated
|
||||
if (proposedTarget.isTerminated) actives(random.get.nextInt(actives.size)) else proposedTarget
|
||||
} else getNext(actives, proposedTarget, currentScore, 0, deep = true) //Commence deep run from start
|
||||
} else actives(at) match { // Inspect current routee
|
||||
case l: LocalActorRef if !isSuspended(l) ⇒ // Just check live local ones
|
||||
val newScore: Long = // A Message being processed = 1, plus mbox size, which is MaxValue - 2 if not deep, treating it as just better than deadLetters
|
||||
(if (isProcessingMessage(l)) 1 else 0) +
|
||||
(if (hasMessages(l)) if (deep) numberOfMessages(l) else (Long.MaxValue - 2) else 0)
|
||||
if (newScore <= 0) l
|
||||
else if (newScore < currentScore) getNext(actives, l, newScore, at + 1, deep)
|
||||
else getNext(actives, proposedTarget, currentScore, at + 1, deep)
|
||||
case _ ⇒ getNext(actives, proposedTarget, currentScore, at + 1, deep)
|
||||
}
|
||||
// Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found.
|
||||
// Lowest score wins, score 0 is autowin
|
||||
// If no actor with score 0 is found, it will return that, or if it is terminated, a random of the entire set.
|
||||
// Why? Well, in case we had 0 viable actors and all we got was the default, which is the DeadLetters, anything else is better.
|
||||
@tailrec def getNext(targets: IndexedSeq[ActorRef] = routeeProvider.routees,
|
||||
proposedTarget: ActorRef = routeeProvider.context.system.deadLetters,
|
||||
currentScore: Long = Long.MaxValue,
|
||||
at: Int = 0,
|
||||
deep: Boolean = false): ActorRef =
|
||||
if (at >= targets.size) {
|
||||
if (deep) {
|
||||
if (proposedTarget.isTerminated) targets(random.get.nextInt(targets.size)) else proposedTarget
|
||||
} else getNext(targets, proposedTarget, currentScore, 0, deep = true)
|
||||
} else {
|
||||
targets(at) match {
|
||||
case l if !isSuspended(l) ⇒
|
||||
|
||||
val newScore: Long = (if (isProcessingMessage(l)) 1l else 0l) + (
|
||||
if (!hasMessages(l)) 0l else {
|
||||
val UnknownMailboxSize = Long.MaxValue - 2 //Just about better than the DeadLetters
|
||||
if (deep) {
|
||||
numberOfMessages(l) match { //Race between hasMessages and numberOfMessages here, unfortunate the numberOfMessages returns 0 if unknown
|
||||
case n if n > 0 ⇒ n
|
||||
case _ ⇒ UnknownMailboxSize
|
||||
}
|
||||
} else {
|
||||
UnknownMailboxSize
|
||||
}
|
||||
})
|
||||
|
||||
if (newScore == 0) l
|
||||
else if (newScore < 0 || newScore >= currentScore) getNext(targets, proposedTarget, currentScore, at + 1, deep)
|
||||
else getNext(targets, l, newScore, at + 1, deep)
|
||||
|
||||
case _ ⇒
|
||||
getNext(targets, proposedTarget, currentScore, at + 1, deep)
|
||||
}
|
||||
}
|
||||
{
|
||||
case (sender, message) ⇒
|
||||
message match {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue