cleanup, added documentation.

This commit is contained in:
Jan Van Besien 2010-03-13 20:10:06 +01:00
parent 9e796ded8b
commit 6b6d4a9f3a

View file

@ -13,14 +13,13 @@ import se.scalablesolutions.akka.actor.Actor
* actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
* <p/>
* The preferred way of creating dispatchers is to use
* the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object.
*
* the { @link se.scalablesolutions.akka.dispatch.Dispatchers } factory object.
* <p/>
* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
* best described as "work donating" because the actor of which work is being stolen takes the initiative.
*
* TODO: make sure everything in the pool is the same type of actor
*
* TODO: Find a way to only send new work to an actor if that actor will actually be scheduled
* immidiately afterwards. Otherwize the work gets a change of being stolen back again... which is not optimal.
*
* @see se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
* @see se.scalablesolutions.akka.dispatch.Dispatchers
*
@ -36,52 +35,46 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = {
val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock
if (lockedForDispatching) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
try {
processMailbox(invocation.receiver)
} finally {
invocation.receiver._dispatcherLock.unlock
}
} else {
// we have the thread, but we can not do anything with our actor -> donate work to another actor, and process that actor here
var thief: Option[Actor] = None
for (actor <- new Wrapper(references.values.iterator)) {
if (actor != invocation.receiver) { // skip ourselves
thief = Some(actor)
}
}
thief match {
case None => {}
case Some(t) => {
// TODO: need to get the lock here!
// donate new message to the mailbox of the thief
val donated: MessageInvocation = invocation.receiver._mailbox.pollLast
if (donated != null) {
t.forward(donated.message)(Some(donated.receiver))
// try processing it in situ, while we still hold the thread
val lockedTForDispatching = t._dispatcherLock.tryLock
if (lockedTForDispatching) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
try {
processMailbox(t)
} finally {
t._dispatcherLock.unlock
}
if (!tryProcessMailbox(invocation.receiver)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(invocation.receiver) match {
case Some(thief) => {
// TODO: maybe do the donation with the lock held, to prevent donating messages that will not be processed
donateMessage(invocation.receiver, thief) match {
case Some(donatedInvocation) => {
tryProcessMailbox(thief)
}
case None => { /* no messages left to donate */ }
}
}
case None => { /* no other actor in the pool */ }
}
}
}
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
/**
* Process the messages in the mailbox of the receiver of the invocation.
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
* another thread.
*
* @return true if the mailbox was processed, false otherwise
*/
private def tryProcessMailbox(receiver: Actor): Boolean = {
if (receiver._dispatcherLock.tryLock) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
try {
processMailbox(receiver)
} finally {
receiver._dispatcherLock.unlock
}
return true
} else return false
}
/**
* Process the messages in the mailbox of the given actor.
*/
private def processMailbox(receiver: Actor) = {
var messageInvocation = receiver._mailbox.poll
@ -91,44 +84,27 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
}
}
// /**
// * Help another busy actor in the pool by stealing some work from its queue and forwarding it to the actor
// * we were being invoked for (because we are done with the mailbox messages).
// */
// private def stealAndScheduleWork(thief: Actor) = {
// tryStealWork(thief).foreach {
// invocation => {
// log.debug("[%s] stole work [%s] from [%s]", thief, invocation.message, invocation.receiver)
// // as if the original receiving actor would forward it to the thief
// thief.forward(invocation.message)(Some(invocation.receiver))
// }
// }
// }
//
// def tryStealWork(thief: Actor): Option[MessageInvocation] = {
// // TODO: use random or round robin scheme to not always steal from the same actor?
// for (actor <- new Wrapper(references.values.iterator)) {
// if (actor != thief) {
// val stolenWork: MessageInvocation = actor._mailbox.pollLast
// if (stolenWork != null)
// return Some(stolenWork)
// }
// }
//
// // nothing found to steal
// return None
// }
private def findThief(receiver: Actor): Option[Actor] = {
// TODO: round robin or random?
for (actor <- new Wrapper(references.values.iterator)) {
if (actor != receiver) { // skip ourselves
return Some(actor)
}
}
return None
}
// override def register(actor: Actor) = {
// super.register(actor)
// executor.execute(new Runnable() {
// def run = {
// stealAndScheduleWork(actor)
// }
// })
// actor // TODO: why is this necessary?
// }
/**
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
//log.debug("donating %s from %s to %s", donated.message, receiver, thief)
thief.forward(donated.message)(Some(donated.receiver))
return Some(donated)
} else return None
}
def start = if (!active) {
active = true