diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index fab5580b9e..cea68135e5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -115,13 +115,21 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { - case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox { - def dispatcher = ExecutorBasedEventDrivenDispatcher.this - } + case b: UnboundedMailbox if b.blocking => + new DefaultUnboundedMessageQueue(true) with ExecutableMailbox { + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + } - case BoundedMailbox(blocking, capacity, pushTimeOut) => - new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox { - def dispatcher = ExecutorBasedEventDrivenDispatcher.this + case b: UnboundedMailbox if !b.blocking => //If we have an unbounded, non-blocking mailbox, we can go lockless + new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + final def enqueue(m: MessageInvocation) = this.add(m) + final def dequeue(): MessageInvocation = this.poll() + } + + case b: BoundedMailbox => + new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut, b.blocking) with ExecutableMailbox { + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this } } @@ -255,13 +263,14 @@ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => def comparator: java.util.Comparator[MessageInvocation] override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { - case UnboundedMailbox(blocking) => new UnboundedPriorityMessageQueue(blocking, comparator) with ExecutableMailbox { - def dispatcher = self - } + case b: UnboundedMailbox => + new UnboundedPriorityMessageQueue(b.blocking, comparator) with ExecutableMailbox { + final def dispatcher = self + } - case BoundedMailbox(blocking, capacity, pushTimeOut) => - new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox { - def dispatcher = self + case b: BoundedMailbox => + new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, b.blocking, comparator) with ExecutableMailbox { + final def dispatcher = self } } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index 7a229c31b3..2a7452b172 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -311,5 +311,12 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin } } - //FIXME Implement toArray[T] => Array[T] + override def toArray[X](a: Array[X with AnyRef]) = { + lock.lock() + try { + backing.toArray[X](a) + } finally { + lock.unlock() + } + } } \ No newline at end of file