From 63f0aa4bd26f06ed85e0c9410885ddbc16887faf Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 10 May 2010 21:27:46 +0200 Subject: [PATCH] Fixing bug with !! and WorkStealing? --- ...sedEventDrivenWorkStealingDispatcher.scala | 58 ++++++++----------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 4edf6651c0..289645ee7d 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -51,12 +51,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess if (!tryProcessMailbox(invocation.receiver)) { // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox // to another actor and then process his mailbox in stead. - findThief(invocation.receiver) match { - case Some(thief) => { - tryDonateAndProcessMessages(invocation.receiver, thief) - } - case None => { /* no other actor in the pool */ } - } + findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) ) } } }) @@ -101,18 +96,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def findThief(receiver: ActorRef): Option[ActorRef] = { // copy to prevent concurrent modifications having any impact val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size)) - var i = lastThiefIndex - if (i > actors.size) - i = 0 - + val i = if ( lastThiefIndex > actors.size ) 0 else lastThiefIndex + // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means // the dispatcher is being shut down... - doFindThief(receiver, actors, i) match { - case (thief: Option[ActorRef], index: Int) => { - lastThiefIndex = (index + 1) % actors.size - return thief - } - } + val (thief: Option[ActorRef], index: Int) = doFindThief(receiver, actors, i) + lastThiefIndex = (index + 1) % actors.size + thief } /** @@ -127,13 +117,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess for (i <- 0 to actors.length) { val index = (i + startIndex) % actors.length val actor = actors(index) - if (actor != receiver) { // skip ourselves - if (actor._mailbox.isEmpty) { // only pick actors that will most likely be able to process the messages - return (Some(actor), index) - } - } + if (actor != receiver && actor._mailbox.isEmpty) + return (Some(actor), index) } - return (None, startIndex) // nothing found, reuse same start index next time + + (None, startIndex) // nothing found, reuse same start index next time } /** @@ -154,27 +142,27 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Donate messages to the thief and process them on the thief as long as the receiver has more messages. */ private def donateAndProcessMessages(receiver: ActorRef, thief: ActorRef): Unit = { - donateMessage(receiver, thief) match { - case None => { - // no more messages to donate - return - } - case Some(donatedInvocation) => { + if(donateMessage(receiver, thief)) { processMailbox(thief) - return donateAndProcessMessages(receiver, thief) - } + donateAndProcessMessages(receiver, thief) } } /** * Steal a message from the receiver and give it to the thief. */ - private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = { + private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = { val donated = receiver._mailbox.pollLast - if (donated != null) { - thief.self ! donated.message - return Some(donated) - } else return None + if (donated ne null) { + donated.replyTo match { + case None => thief.self.!(donated.message)(None) + case Some(Left(actor)) => thief.self.postMessageToMailbox(donated.message,Some(actor)) + case Some(Right(future)) => thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](donated.message,receiver.timeout,Some(future)) + } + true + } + else + false } def start = if (!active) {