From b7564d06ce50d809b3c3e65fc138582fea57e8b8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 14 Mar 2012 12:00:47 +0100 Subject: [PATCH] Making sure that the code works.... --- .../src/main/scala/akka/routing/Routing.scala | 63 ++++++++++++------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 9e690d4712..7acc7f3a06 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -783,10 +783,8 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def isSuspended(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ - val cell = x.underlying - cell.mailbox.isSuspended - case _ ⇒ false + case x: LocalActorRef ⇒ x.underlying.mailbox.isSuspended + case _ ⇒ false } /** @@ -803,27 +801,44 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) - //Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found. - @tailrec def getNext(actives: IndexedSeq[ActorRef] = routeeProvider.routees, // Use current routees - proposedTarget: ActorRef = routeeProvider.context.system.deadLetters, // Fall back to deadLetters - currentScore: Long = Long.MaxValue, // Start at worst possible score - at: Int = 0, // Index to actives, start at 0 - deep: Boolean = false): ActorRef = // Don't do deep inspection of mailbox size unless needed - if (at >= actives.size) { // End of actives, either start doing deep inspection, or go random if candidate is terminated - if (deep) { // IF at end of deep run, we're done, and anything is better than something terminated - if (proposedTarget.isTerminated) actives(random.get.nextInt(actives.size)) else proposedTarget - } else getNext(actives, proposedTarget, currentScore, 0, deep = true) //Commence deep run from start - } else actives(at) match { // Inspect current routee - case l: LocalActorRef if !isSuspended(l) ⇒ // Just check live local ones - val newScore: Long = // A Message being processed = 1, plus mbox size, which is MaxValue - 2 if not deep, treating it as just better than deadLetters - (if (isProcessingMessage(l)) 1 else 0) + - (if (hasMessages(l)) if (deep) numberOfMessages(l) else (Long.MaxValue - 2) else 0) - if (newScore <= 0) l - else if (newScore < currentScore) getNext(actives, l, newScore, at + 1, deep) - else getNext(actives, proposedTarget, currentScore, at + 1, deep) - case _ ⇒ getNext(actives, proposedTarget, currentScore, at + 1, deep) - } + // Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found. + // Lowest score wins, score 0 is autowin + // If no actor with score 0 is found, it will return that, or if it is terminated, a random of the entire set. + // Why? Well, in case we had 0 viable actors and all we got was the default, which is the DeadLetters, anything else is better. + @tailrec def getNext(targets: IndexedSeq[ActorRef] = routeeProvider.routees, + proposedTarget: ActorRef = routeeProvider.context.system.deadLetters, + currentScore: Long = Long.MaxValue, + at: Int = 0, + deep: Boolean = false): ActorRef = + if (at >= targets.size) { + if (deep) { + if (proposedTarget.isTerminated) targets(random.get.nextInt(targets.size)) else proposedTarget + } else getNext(targets, proposedTarget, currentScore, 0, deep = true) + } else { + targets(at) match { + case l if !isSuspended(l) ⇒ + val newScore: Long = (if (isProcessingMessage(l)) 1l else 0l) + ( + if (!hasMessages(l)) 0l else { + val UnknownMailboxSize = Long.MaxValue - 2 //Just about better than the DeadLetters + if (deep) { + numberOfMessages(l) match { //Race between hasMessages and numberOfMessages here, unfortunate the numberOfMessages returns 0 if unknown + case n if n > 0 ⇒ n + case _ ⇒ UnknownMailboxSize + } + } else { + UnknownMailboxSize + } + }) + + if (newScore == 0) l + else if (newScore < 0 || newScore >= currentScore) getNext(targets, proposedTarget, currentScore, at + 1, deep) + else getNext(targets, l, newScore, at + 1, deep) + + case _ ⇒ + getNext(targets, proposedTarget, currentScore, at + 1, deep) + } + } { case (sender, message) ⇒ message match {