From d8a1c447e90117df8fd657b68c570aaa8d7cab29 Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Sun, 14 Mar 2010 20:55:51 +0100 Subject: [PATCH] don't just steal one message, but continue as long as there are more messages available. --- ...sedEventDrivenWorkStealingDispatcher.scala | 54 ++++++++++++++----- ...ventDrivenWorkStealingDispatcherTest.scala | 28 +++++----- 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 3713f9452e..d9b5b1b6e0 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -12,11 +12,15 @@ import se.scalablesolutions.akka.actor.Actor * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. *

- * The preferred way of creating dispatchers is to use - * the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object. - *

* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably * best described as "work donating" because the actor of which work is being stolen takes the initiative. + *

+ * This dispatcher attempts to redistribute work between actors each time a message is dispatched on a busy actor. Work + * will not be redistributed when actors are busy, but no new messages are dispatched. + * TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?! + *

+ * The preferred way of creating dispatchers is to use + * the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object. * * TODO: make sure everything in the pool is the same type of actor * @@ -40,13 +44,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess // to another actor and then process his mailbox in stead. findThief(invocation.receiver) match { case Some(thief) => { - // TODO: maybe do the donation with the lock held, to prevent donating messages that will not be processed - donateMessage(invocation.receiver, thief) match { - case Some(donatedInvocation) => { - tryProcessMailbox(thief) - } - case None => { /* no messages left to donate */ } - } + tryDonateAndProcessMessages(invocation.receiver, thief) } case None => { /* no other actor in the pool */ } } @@ -57,13 +55,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by - * another thread. + * another thread (because then that thread is already processing the mailbox). * * @return true if the mailbox was processed, false otherwise */ private def tryProcessMailbox(receiver: Actor): Boolean = { if (receiver._dispatcherLock.tryLock) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. try { processMailbox(receiver) } finally { @@ -85,7 +82,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } private def findThief(receiver: Actor): Option[Actor] = { - // TODO: round robin or random? + // TODO: round robin or random? and maybe best to pick an actor which is currently not dispatching (lock not held) for (actor <- new Wrapper(references.values.iterator)) { if (actor != receiver) { // skip ourselves return Some(actor) @@ -94,13 +91,42 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess return None } + /** + * Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire + * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. + */ + private def tryDonateAndProcessMessages(receiver: Actor, thief: Actor) = { + if (thief._dispatcherLock.tryLock) { + try { + donateAndProcessMessages(receiver, thief) + } finally { + thief._dispatcherLock.unlock + } + } + } + + /** + * Donate messages to the thief and process them on the thief as long as the receiver has more messages. + */ + private def donateAndProcessMessages(receiver: Actor, thief: Actor): Unit = { + donateMessage(receiver, thief) match { + case None => { + // no more messages to donate + return + } + case Some(donatedInvocation) => { + processMailbox(thief) + return donateAndProcessMessages(receiver, thief) + } + } + } + /** * Steal a message from the receiver and give it to the thief. */ private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = { val donated = receiver._mailbox.pollLast if (donated != null) { - //log.debug("donating %s from %s to %s", donated.message, receiver, thief) thief.forward(donated.message)(Some(donated.receiver)) return Some(donated) } else return None diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala index 35bafc9d48..a7299dcc1a 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala @@ -12,23 +12,24 @@ import se.scalablesolutions.akka.dispatch.Dispatchers class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with MustMatchers with ActorTestUtil { val poolDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") - class DelayableActor(id: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { + class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { messageDispatcher = poolDispatcher var invocationCount = 0 + id = name def receive = { case x: Int => { Thread.sleep(delay) invocationCount += 1 finishedCounter.countDown - println(id + " processed " + x) +// println(id + " processed " + x) } } } @Test def fastActorShouldStealWorkFromSlowActor = verify(new TestActor { def test = { - val finishedCounter = new CountDownLatch(100) + val finishedCounter = new CountDownLatch(110) val slow = new DelayableActor("slow", 50, finishedCounter) val fast = new DelayableActor("fast", 10, finishedCounter) @@ -42,22 +43,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with slow ! i } + // now send some messages to actors to keep the dispatcher dispatching messages + for (i <- 1 to 10) { + Thread.sleep(150) + if (i % 2 == 0) + fast ! i + else + slow ! i + } + finishedCounter.await fast.invocationCount must be > (slow.invocationCount) } } }) -// @Test def canNotRegisterTwoDifferentActors = { -// new Actor() { -// override var messageDispatcher = poolDispatcher -// } -// -// intercept(classOf[NullPointerException]) { -// new Actor() { -// override var messageDispatcher = poolDispatcher -// } -// } -// } - }