diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 9e18239f49..bc97a61d13 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -13,7 +13,7 @@ import se.scalablesolutions.akka.actor.Actor * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. *

* The preferred way of creating dispatchers is to use - * the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object. + * the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object. * * * TODO: make sure everything in the pool is the same type of actor @@ -40,11 +40,41 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess if (lockedForDispatching) { // Only dispatch if we got the lock. Otherwise another thread is already dispatching. try { - processMailbox(invocation) + processMailbox(invocation.receiver) } finally { invocation.receiver._dispatcherLock.unlock } - stealAndScheduleWork(invocation.receiver) + } else { + // we have the thread, but we can not do anything with our actor -> donate work to another actor, and process that actor here + var thief: Option[Actor] = None + for (actor <- new Wrapper(references.values.iterator)) { + if (actor != invocation.receiver) { // skip ourselves + thief = Some(actor) + } + } + + thief match { + case None => {} + case Some(t) => { + // TODO: need to get the lock here! + // donate new message to the mailbox of the thief + val donated: MessageInvocation = invocation.receiver._mailbox.pollLast + if (donated != null) { + t.forward(donated.message)(Some(donated.receiver)) + // try processing it in situ, while we still hold the thread + val lockedTForDispatching = t._dispatcherLock.tryLock + if (lockedTForDispatching) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + try { + processMailbox(t) + } finally { + t._dispatcherLock.unlock + } + } + } + } + } + } } }) @@ -53,52 +83,52 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Process the messages in the mailbox of the receiver of the invocation. */ - private def processMailbox(invocation: MessageInvocation) = { - var messageInvocation = invocation.receiver._mailbox.poll + private def processMailbox(receiver: Actor) = { + var messageInvocation = receiver._mailbox.poll while (messageInvocation != null) { messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + messageInvocation = receiver._mailbox.poll } } - /** - * Help another busy actor in the pool by stealing some work from its queue and forwarding it to the actor - * we were being invoked for (because we are done with the mailbox messages). - */ - private def stealAndScheduleWork(thief: Actor) = { - tryStealWork(thief).foreach { - invocation => { - log.debug("[%s] stole work [%s] from [%s]", thief, invocation.message, invocation.receiver) - // as if the original receiving actor would forward it to the thief - thief.forward(invocation.message)(Some(invocation.receiver)) - } - } - } - - def tryStealWork(thief: Actor): Option[MessageInvocation] = { - // TODO: use random or round robin scheme to not always steal from the same actor? - for (actor <- new Wrapper(references.values.iterator)) { - if (actor != thief) { - val stolenWork: MessageInvocation = actor._mailbox.pollLast - if (stolenWork != null) - return Some(stolenWork) - } - } - - // nothing found to steal - return None - } + // /** + // * Help another busy actor in the pool by stealing some work from its queue and forwarding it to the actor + // * we were being invoked for (because we are done with the mailbox messages). + // */ + // private def stealAndScheduleWork(thief: Actor) = { + // tryStealWork(thief).foreach { + // invocation => { + // log.debug("[%s] stole work [%s] from [%s]", thief, invocation.message, invocation.receiver) + // // as if the original receiving actor would forward it to the thief + // thief.forward(invocation.message)(Some(invocation.receiver)) + // } + // } + // } + // + // def tryStealWork(thief: Actor): Option[MessageInvocation] = { + // // TODO: use random or round robin scheme to not always steal from the same actor? + // for (actor <- new Wrapper(references.values.iterator)) { + // if (actor != thief) { + // val stolenWork: MessageInvocation = actor._mailbox.pollLast + // if (stolenWork != null) + // return Some(stolenWork) + // } + // } + // + // // nothing found to steal + // return None + // } - override def register(actor: Actor) = { - super.register(actor) - executor.execute(new Runnable() { - def run = { - stealAndScheduleWork(actor) - } - }) - actor // TODO: why is this necessary? - } + // override def register(actor: Actor) = { + // super.register(actor) + // executor.execute(new Runnable() { + // def run = { + // stealAndScheduleWork(actor) + // } + // }) + // actor // TODO: why is this necessary? + // } def start = if (!active) { active = true diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala index 0314a1261d..35bafc9d48 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala @@ -21,7 +21,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with Thread.sleep(delay) invocationCount += 1 finishedCounter.countDown -// println(id + " processed " + x) + println(id + " processed " + x) } } } diff --git a/akka.iml b/akka.iml deleted file mode 100644 index 74542e8e48..0000000000 --- a/akka.iml +++ /dev/null @@ -1,35 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - -