diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 57847f3553..9e690d4712 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -17,6 +17,7 @@ import java.util.concurrent.locks.ReentrantLock import akka.jsr166y.ThreadLocalRandom import akka.util.Unsafe import akka.dispatch.Dispatchers +import annotation.tailrec /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -312,9 +313,7 @@ trait Router extends Actor { }: Receive) orElse routerReceive - def routerReceive: Receive = { - case _ ⇒ - } + def routerReceive: Receive = Actor.emptyBehavior override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { // do not scrap children @@ -804,22 +803,26 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) - def getNext(): ActorRef = { - // non-local actors mailbox size is unknown, so consider them lowest priority - val activeLocal = routeeProvider.routees collect { case l: LocalActorRef if !isSuspended(l) ⇒ l } - // 1. anyone not processing message and with empty mailbox - activeLocal.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) getOrElse { - // 2. anyone with empty mailbox - activeLocal.find(a ⇒ !hasMessages(a)) getOrElse { - // 3. sort on mailbox size - activeLocal.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse { - // 4. no locals, just pick one, random - val _routees = routeeProvider.routees - _routees(random.get.nextInt(_routees.size)) - } - } + //Worst-case a 2-pass inspection with mailbox size checking done on second pass, and only until no one empty is found. + @tailrec def getNext(actives: IndexedSeq[ActorRef] = routeeProvider.routees, // Use current routees + proposedTarget: ActorRef = routeeProvider.context.system.deadLetters, // Fall back to deadLetters + currentScore: Long = Long.MaxValue, // Start at worst possible score + at: Int = 0, // Index to actives, start at 0 + deep: Boolean = false): ActorRef = // Don't do deep inspection of mailbox size unless needed + if (at >= actives.size) { // End of actives, either start doing deep inspection, or go random if candidate is terminated + if (deep) { // IF at end of deep run, we're done, and anything is better than something terminated + if (proposedTarget.isTerminated) actives(random.get.nextInt(actives.size)) else proposedTarget + } else getNext(actives, proposedTarget, currentScore, 0, deep = true) //Commence deep run from start + } else actives(at) match { // Inspect current routee + case l: LocalActorRef if !isSuspended(l) ⇒ // Just check live local ones + val newScore: Long = // A Message being processed = 1, plus mbox size, which is MaxValue - 2 if not deep, treating it as just better than deadLetters + (if (isProcessingMessage(l)) 1 else 0) + + (if (hasMessages(l)) if (deep) numberOfMessages(l) else (Long.MaxValue - 2) else 0) + if (newScore <= 0) l + else if (newScore < currentScore) getNext(actives, l, newScore, at + 1, deep) + else getNext(actives, proposedTarget, currentScore, at + 1, deep) + case _ ⇒ getNext(actives, proposedTarget, currentScore, at + 1, deep) } - } { case (sender, message) ⇒