From 6b6d4a9f3aaa756d5ffbbd27275aa6357b045048 Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Sat, 13 Mar 2010 20:10:06 +0100 Subject: [PATCH] cleanup, added documentation. --- ...sedEventDrivenWorkStealingDispatcher.scala | 132 +++++++----------- 1 file changed, 54 insertions(+), 78 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index bc97a61d13..3713f9452e 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -13,14 +13,13 @@ import se.scalablesolutions.akka.actor.Actor * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. *

* The preferred way of creating dispatchers is to use - * the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object. - * + * the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object. + *

+ * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably + * best described as "work donating" because the actor of which work is being stolen takes the initiative. * * TODO: make sure everything in the pool is the same type of actor * - * TODO: Find a way to only send new work to an actor if that actor will actually be scheduled - * immidiately afterwards. Otherwize the work gets a change of being stolen back again... which is not optimal. - * * @see se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher * @see se.scalablesolutions.akka.dispatch.Dispatchers * @@ -36,52 +35,46 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock - if (lockedForDispatching) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - try { - processMailbox(invocation.receiver) - } finally { - invocation.receiver._dispatcherLock.unlock - } - } else { - // we have the thread, but we can not do anything with our actor -> donate work to another actor, and process that actor here - var thief: Option[Actor] = None - for (actor <- new Wrapper(references.values.iterator)) { - if (actor != invocation.receiver) { // skip ourselves - thief = Some(actor) - } - } - - thief match { - case None => {} - case Some(t) => { - // TODO: need to get the lock here! - // donate new message to the mailbox of the thief - val donated: MessageInvocation = invocation.receiver._mailbox.pollLast - if (donated != null) { - t.forward(donated.message)(Some(donated.receiver)) - // try processing it in situ, while we still hold the thread - val lockedTForDispatching = t._dispatcherLock.tryLock - if (lockedTForDispatching) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - try { - processMailbox(t) - } finally { - t._dispatcherLock.unlock - } + if (!tryProcessMailbox(invocation.receiver)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(invocation.receiver) match { + case Some(thief) => { + // TODO: maybe do the donation with the lock held, to prevent donating messages that will not be processed + donateMessage(invocation.receiver, thief) match { + case Some(donatedInvocation) => { + tryProcessMailbox(thief) } + case None => { /* no messages left to donate */ } } } + case None => { /* no other actor in the pool */ } } - } } }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") /** - * Process the messages in the mailbox of the receiver of the invocation. + * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by + * another thread. + * + * @return true if the mailbox was processed, false otherwise + */ + private def tryProcessMailbox(receiver: Actor): Boolean = { + if (receiver._dispatcherLock.tryLock) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + try { + processMailbox(receiver) + } finally { + receiver._dispatcherLock.unlock + } + return true + } else return false + } + + /** + * Process the messages in the mailbox of the given actor. */ private def processMailbox(receiver: Actor) = { var messageInvocation = receiver._mailbox.poll @@ -91,44 +84,27 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } } - // /** - // * Help another busy actor in the pool by stealing some work from its queue and forwarding it to the actor - // * we were being invoked for (because we are done with the mailbox messages). - // */ - // private def stealAndScheduleWork(thief: Actor) = { - // tryStealWork(thief).foreach { - // invocation => { - // log.debug("[%s] stole work [%s] from [%s]", thief, invocation.message, invocation.receiver) - // // as if the original receiving actor would forward it to the thief - // thief.forward(invocation.message)(Some(invocation.receiver)) - // } - // } - // } - // - // def tryStealWork(thief: Actor): Option[MessageInvocation] = { - // // TODO: use random or round robin scheme to not always steal from the same actor? - // for (actor <- new Wrapper(references.values.iterator)) { - // if (actor != thief) { - // val stolenWork: MessageInvocation = actor._mailbox.pollLast - // if (stolenWork != null) - // return Some(stolenWork) - // } - // } - // - // // nothing found to steal - // return None - // } + private def findThief(receiver: Actor): Option[Actor] = { + // TODO: round robin or random? + for (actor <- new Wrapper(references.values.iterator)) { + if (actor != receiver) { // skip ourselves + return Some(actor) + } + } + return None + } - - // override def register(actor: Actor) = { - // super.register(actor) - // executor.execute(new Runnable() { - // def run = { - // stealAndScheduleWork(actor) - // } - // }) - // actor // TODO: why is this necessary? - // } + /** + * Steal a message from the receiver and give it to the thief. + */ + private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = { + val donated = receiver._mailbox.pollLast + if (donated != null) { + //log.debug("donating %s from %s to %s", donated.message, receiver, thief) + thief.forward(donated.message)(Some(donated.receiver)) + return Some(donated) + } else return None + } def start = if (!active) { active = true