don't just steal one message, but continue as long as there are more messages available.

This commit is contained in:
Jan Van Besien 2010-03-14 20:55:51 +01:00
parent f615bf862d
commit d8a1c447e9
2 changed files with 53 additions and 29 deletions

View file

@ -12,11 +12,15 @@ import se.scalablesolutions.akka.actor.Actor
* that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
* actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
* <p/>
* The preferred way of creating dispatchers is to use
* the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object.
* <p/>
* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
* best described as "work donating" because the actor of which work is being stolen takes the initiative.
* <p/>
* This dispatcher attempts to redistribute work between actors each time a message is dispatched on a busy actor. Work
* will not be redistributed when actors are busy, but no new messages are dispatched.
* TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?!
* <p/>
* The preferred way of creating dispatchers is to use
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
*
* TODO: make sure everything in the pool is the same type of actor
*
@ -40,13 +44,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
// to another actor and then process his mailbox in stead.
findThief(invocation.receiver) match {
case Some(thief) => {
// TODO: maybe do the donation with the lock held, to prevent donating messages that will not be processed
donateMessage(invocation.receiver, thief) match {
case Some(donatedInvocation) => {
tryProcessMailbox(thief)
}
case None => { /* no messages left to donate */ }
}
tryDonateAndProcessMessages(invocation.receiver, thief)
}
case None => { /* no other actor in the pool */ }
}
@ -57,13 +55,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
* another thread.
* another thread (because then that thread is already processing the mailbox).
*
* @return true if the mailbox was processed, false otherwise
*/
private def tryProcessMailbox(receiver: Actor): Boolean = {
if (receiver._dispatcherLock.tryLock) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
try {
processMailbox(receiver)
} finally {
@ -85,7 +82,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
}
private def findThief(receiver: Actor): Option[Actor] = {
// TODO: round robin or random?
// TODO: round robin or random? and maybe best to pick an actor which is currently not dispatching (lock not held)
for (actor <- new Wrapper(references.values.iterator)) {
if (actor != receiver) { // skip ourselves
return Some(actor)
@ -94,13 +91,42 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
return None
}
/**
* Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
*/
private def tryDonateAndProcessMessages(receiver: Actor, thief: Actor) = {
if (thief._dispatcherLock.tryLock) {
try {
donateAndProcessMessages(receiver, thief)
} finally {
thief._dispatcherLock.unlock
}
}
}
/**
* Donate messages to the thief and process them on the thief as long as the receiver has more messages.
*/
private def donateAndProcessMessages(receiver: Actor, thief: Actor): Unit = {
donateMessage(receiver, thief) match {
case None => {
// no more messages to donate
return
}
case Some(donatedInvocation) => {
processMailbox(thief)
return donateAndProcessMessages(receiver, thief)
}
}
}
/**
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
//log.debug("donating %s from %s to %s", donated.message, receiver, thief)
thief.forward(donated.message)(Some(donated.receiver))
return Some(donated)
} else return None

View file

@ -12,23 +12,24 @@ import se.scalablesolutions.akka.dispatch.Dispatchers
class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with MustMatchers with ActorTestUtil {
val poolDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
class DelayableActor(id: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = poolDispatcher
var invocationCount = 0
id = name
def receive = {
case x: Int => {
Thread.sleep(delay)
invocationCount += 1
finishedCounter.countDown
println(id + " processed " + x)
// println(id + " processed " + x)
}
}
}
@Test def fastActorShouldStealWorkFromSlowActor = verify(new TestActor {
def test = {
val finishedCounter = new CountDownLatch(100)
val finishedCounter = new CountDownLatch(110)
val slow = new DelayableActor("slow", 50, finishedCounter)
val fast = new DelayableActor("fast", 10, finishedCounter)
@ -42,22 +43,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
slow ! i
}
// now send some messages to actors to keep the dispatcher dispatching messages
for (i <- 1 to 10) {
Thread.sleep(150)
if (i % 2 == 0)
fast ! i
else
slow ! i
}
finishedCounter.await
fast.invocationCount must be > (slow.invocationCount)
}
}
})
// @Test def canNotRegisterTwoDifferentActors = {
// new Actor() {
// override var messageDispatcher = poolDispatcher
// }
//
// intercept(classOf[NullPointerException]) {
// new Actor() {
// override var messageDispatcher = poolDispatcher
// }
// }
// }
}