Fixing neglected configuration in WorkStealer

This commit is contained in:
Viktor Klang 2011-02-09 18:31:58 +01:00
parent 418b5cee25
commit d1213f26bb

View file

@ -11,7 +11,7 @@ import akka.util.Switch
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList} import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque} import jsr166x.{Deque, LinkedBlockingDeque}
/** /**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -192,11 +192,16 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque case UnboundedMailbox(blockDequeue) =>
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle) final def enqueue(handle: MessageInvocation) {
this add handle
}
def dequeue: MessageInvocation = this.poll() final def dequeue(): MessageInvocation = {
if (blockDequeue) this.take()
else this.poll()
}
def run = if (!tryProcessMailbox(this)) { def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
@ -204,11 +209,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
} }
} }
case BoundedMailbox(blocking, capacity, pushTimeOut) => case BoundedMailbox(blockDequeue, capacity, pushTimeOut) =>
new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable { new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
def dequeue: MessageInvocation = this.poll() final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
} else this put handle
}
final def dequeue(): MessageInvocation =
if (blockDequeue) this.take()
else this.poll()
def run = if (!tryProcessMailbox(this)) { def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox