Fixing bug with !! and WorkStealing?

This commit is contained in:
Viktor Klang 2010-05-10 21:27:46 +02:00
parent 9a1c10a7dd
commit 63f0aa4bd2

View file

@ -51,12 +51,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
if (!tryProcessMailbox(invocation.receiver)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(invocation.receiver) match {
case Some(thief) => {
tryDonateAndProcessMessages(invocation.receiver, thief)
}
case None => { /* no other actor in the pool */ }
}
findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) )
}
}
})
@ -101,18 +96,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def findThief(receiver: ActorRef): Option[ActorRef] = {
// copy to prevent concurrent modifications having any impact
val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size))
var i = lastThiefIndex
if (i > actors.size)
i = 0
val i = if ( lastThiefIndex > actors.size ) 0 else lastThiefIndex
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down...
doFindThief(receiver, actors, i) match {
case (thief: Option[ActorRef], index: Int) => {
lastThiefIndex = (index + 1) % actors.size
return thief
}
}
val (thief: Option[ActorRef], index: Int) = doFindThief(receiver, actors, i)
lastThiefIndex = (index + 1) % actors.size
thief
}
/**
@ -127,13 +117,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
for (i <- 0 to actors.length) {
val index = (i + startIndex) % actors.length
val actor = actors(index)
if (actor != receiver) { // skip ourselves
if (actor._mailbox.isEmpty) { // only pick actors that will most likely be able to process the messages
return (Some(actor), index)
}
}
if (actor != receiver && actor._mailbox.isEmpty)
return (Some(actor), index)
}
return (None, startIndex) // nothing found, reuse same start index next time
(None, startIndex) // nothing found, reuse same start index next time
}
/**
@ -154,27 +142,27 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Donate messages to the thief and process them on the thief as long as the receiver has more messages.
*/
private def donateAndProcessMessages(receiver: ActorRef, thief: ActorRef): Unit = {
donateMessage(receiver, thief) match {
case None => {
// no more messages to donate
return
}
case Some(donatedInvocation) => {
if(donateMessage(receiver, thief)) {
processMailbox(thief)
return donateAndProcessMessages(receiver, thief)
}
donateAndProcessMessages(receiver, thief)
}
}
/**
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = {
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
thief.self ! donated.message
return Some(donated)
} else return None
if (donated ne null) {
donated.replyTo match {
case None => thief.self.!(donated.message)(None)
case Some(Left(actor)) => thief.self.postMessageToMailbox(donated.message,Some(actor))
case Some(Right(future)) => thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](donated.message,receiver.timeout,Some(future))
}
true
}
else
false
}
def start = if (!active) {