switched from "work stealing" implementation to "work donating". Needs more testing, cleanup and documentation but looks promissing.

This commit is contained in:
Jan Van Besien 2010-03-13 11:40:32 +01:00
parent aab222edff
commit 9e796ded8b
3 changed files with 73 additions and 78 deletions

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.actor.Actor
* actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
* <p/>
* The preferred way of creating dispatchers is to use
* the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object.
* the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object.
*
*
* TODO: make sure everything in the pool is the same type of actor
@ -40,11 +40,41 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
if (lockedForDispatching) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
try {
processMailbox(invocation)
processMailbox(invocation.receiver)
} finally {
invocation.receiver._dispatcherLock.unlock
}
stealAndScheduleWork(invocation.receiver)
} else {
// we have the thread, but we can not do anything with our actor -> donate work to another actor, and process that actor here
var thief: Option[Actor] = None
for (actor <- new Wrapper(references.values.iterator)) {
if (actor != invocation.receiver) { // skip ourselves
thief = Some(actor)
}
}
thief match {
case None => {}
case Some(t) => {
// TODO: need to get the lock here!
// donate new message to the mailbox of the thief
val donated: MessageInvocation = invocation.receiver._mailbox.pollLast
if (donated != null) {
t.forward(donated.message)(Some(donated.receiver))
// try processing it in situ, while we still hold the thread
val lockedTForDispatching = t._dispatcherLock.tryLock
if (lockedTForDispatching) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
try {
processMailbox(t)
} finally {
t._dispatcherLock.unlock
}
}
}
}
}
}
}
})
@ -53,52 +83,52 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/**
* Process the messages in the mailbox of the receiver of the invocation.
*/
private def processMailbox(invocation: MessageInvocation) = {
var messageInvocation = invocation.receiver._mailbox.poll
private def processMailbox(receiver: Actor) = {
var messageInvocation = receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
messageInvocation = receiver._mailbox.poll
}
}
/**
* Help another busy actor in the pool by stealing some work from its queue and forwarding it to the actor
* we were being invoked for (because we are done with the mailbox messages).
*/
private def stealAndScheduleWork(thief: Actor) = {
tryStealWork(thief).foreach {
invocation => {
log.debug("[%s] stole work [%s] from [%s]", thief, invocation.message, invocation.receiver)
// as if the original receiving actor would forward it to the thief
thief.forward(invocation.message)(Some(invocation.receiver))
}
}
}
def tryStealWork(thief: Actor): Option[MessageInvocation] = {
// TODO: use random or round robin scheme to not always steal from the same actor?
for (actor <- new Wrapper(references.values.iterator)) {
if (actor != thief) {
val stolenWork: MessageInvocation = actor._mailbox.pollLast
if (stolenWork != null)
return Some(stolenWork)
}
}
// nothing found to steal
return None
}
// /**
// * Help another busy actor in the pool by stealing some work from its queue and forwarding it to the actor
// * we were being invoked for (because we are done with the mailbox messages).
// */
// private def stealAndScheduleWork(thief: Actor) = {
// tryStealWork(thief).foreach {
// invocation => {
// log.debug("[%s] stole work [%s] from [%s]", thief, invocation.message, invocation.receiver)
// // as if the original receiving actor would forward it to the thief
// thief.forward(invocation.message)(Some(invocation.receiver))
// }
// }
// }
//
// def tryStealWork(thief: Actor): Option[MessageInvocation] = {
// // TODO: use random or round robin scheme to not always steal from the same actor?
// for (actor <- new Wrapper(references.values.iterator)) {
// if (actor != thief) {
// val stolenWork: MessageInvocation = actor._mailbox.pollLast
// if (stolenWork != null)
// return Some(stolenWork)
// }
// }
//
// // nothing found to steal
// return None
// }
override def register(actor: Actor) = {
super.register(actor)
executor.execute(new Runnable() {
def run = {
stealAndScheduleWork(actor)
}
})
actor // TODO: why is this necessary?
}
// override def register(actor: Actor) = {
// super.register(actor)
// executor.execute(new Runnable() {
// def run = {
// stealAndScheduleWork(actor)
// }
// })
// actor // TODO: why is this necessary?
// }
def start = if (!active) {
active = true

View file

@ -21,7 +21,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
Thread.sleep(delay)
invocationCount += 1
finishedCounter.countDown
// println(id + " processed " + x)
println(id + " processed " + x)
}
}
}

View file

@ -1,35 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="FacetManager">
<facet type="Scala" name="Scala">
<configuration>
<option name="myScalaCompilerJarPaths">
<array>
<option value="$APPLICATION_HOME_DIR$/plugins/Scala/lib/scala-compiler.jar" />
</array>
</option>
<option name="myScalaSdkJarPaths">
<array>
<option value="$APPLICATION_HOME_DIR$/plugins/Scala/lib/scala-library.jar" />
</array>
</option>
</configuration>
</facet>
<facet type="Spring" name="Spring">
<configuration />
</facet>
<facet type="WebBeans" name="Web Beans">
<configuration />
</facet>
</component>
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>