From 5068f0d48a3575d0f921d9d9ea46573a3c30a46b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 27 Apr 2011 12:21:19 +0200 Subject: [PATCH] Removing blocking dequeues from MailboxConfig due to high risk and no gain --- .../akka/dispatch/MailboxConfigSpec.scala | 42 ++++++------------- .../dispatch/PriorityDispatcherSpec.scala | 4 +- .../ExecutorBasedEventDrivenDispatcher.scala | 26 +++++------- .../scala/akka/dispatch/MailboxHandling.scala | 38 ++++++----------- .../scala/akka/dispatch/MessageHandling.scala | 3 +- .../akka/dispatch/ThreadBasedDispatcher.scala | 6 +-- 6 files changed, 41 insertions(+), 78 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 9ddbfdc332..15d123867e 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -24,7 +24,7 @@ abstract class MailboxSpec extends name should { "create a !blockDequeue && unbounded mailbox" in { - val config = UnboundedMailbox(false) + val config = UnboundedMailbox() val q = factory(config) ensureInitialMailboxState(config, q) @@ -37,8 +37,8 @@ abstract class MailboxSpec extends f.await.resultOrException must be === Some(null) } - "create a !blockDequeue and bounded mailbox with 10 capacity and with push timeout" in { - val config = BoundedMailbox(false, 10, Duration(10,TimeUnit.MILLISECONDS)) + "create a bounded mailbox with 10 capacity and with push timeout" in { + val config = BoundedMailbox(10, Duration(10,TimeUnit.MILLISECONDS)) val q = factory(config) ensureInitialMailboxState(config, q) @@ -59,30 +59,16 @@ abstract class MailboxSpec extends } "dequeue what was enqueued properly for unbounded mailboxes" in { - testEnqueueDequeue(UnboundedMailbox(false)) + testEnqueueDequeue(UnboundedMailbox()) } "dequeue what was enqueued properly for bounded mailboxes" in { - testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(-1, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS))) } "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { - testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(100, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS))) } - - /** FIXME Adapt test so it works with the last dequeue - - "dequeue what was enqueued properly for unbounded mailboxes with blockDeque" in { - testEnqueueDequeue(UnboundedMailbox(true)) - } - - "dequeue what was enqueued properly for bounded mailboxes with blockDeque" in { - testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(-1, TimeUnit.MILLISECONDS))) - } - - "dequeue what was enqueued properly for bounded mailboxes with blockDeque and pushTimeout" in { - testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(100, TimeUnit.MILLISECONDS))) - }*/ } //CANDIDATE FOR TESTKIT @@ -111,8 +97,8 @@ abstract class MailboxSpec extends q match { case aQueue: BlockingQueue[_] => config match { - case BoundedMailbox(_,capacity,_) => aQueue.remainingCapacity must be === capacity - case UnboundedMailbox(_) => aQueue.remainingCapacity must be === Int.MaxValue + case BoundedMailbox(capacity,_) => aQueue.remainingCapacity must be === capacity + case UnboundedMailbox() => aQueue.remainingCapacity must be === Int.MaxValue } case _ => } @@ -165,10 +151,8 @@ abstract class MailboxSpec extends class DefaultMailboxSpec extends MailboxSpec { lazy val name = "The default mailbox implementation" def factory = { - case UnboundedMailbox(blockDequeue) => - new DefaultUnboundedMessageQueue(blockDequeue) - case BoundedMailbox(blocking, capacity, pushTimeOut) => - new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) + case UnboundedMailbox() => new DefaultUnboundedMessageQueue() + case BoundedMailbox(capacity, pushTimeOut) => new DefaultBoundedMessageQueue(capacity, pushTimeOut) } } @@ -176,9 +160,7 @@ class PriorityMailboxSpec extends MailboxSpec { val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { - case UnboundedMailbox(blockDequeue) => - new UnboundedPriorityMessageQueue(blockDequeue, comparator) - case BoundedMailbox(blocking, capacity, pushTimeOut) => - new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) + case UnboundedMailbox() => new UnboundedPriorityMessageQueue(comparator) + case BoundedMailbox(capacity, pushTimeOut) => new BoundedPriorityMessageQueue(capacity, pushTimeOut, comparator) } } \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index f256715b8c..002267a6c7 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -10,11 +10,11 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers { "A PriorityExecutorBasedEventDrivenDispatcher" must { "Order it's messages according to the specified comparator using an unbounded mailbox" in { - testOrdering(UnboundedMailbox(false)) + testOrdering(UnboundedMailbox()) } "Order it's messages according to the specified comparator using a bounded mailbox" in { - testOrdering(BoundedMailbox(false,1000)) + testOrdering(BoundedMailbox(1000)) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 105028f693..494fa85f28 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -117,20 +117,14 @@ class ExecutorBasedEventDrivenDispatcher( def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { case b: UnboundedMailbox => - if (b.blocking) { - new DefaultUnboundedMessageQueue(true) with ExecutableMailbox { - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this - } - } else { //If we have an unbounded, non-blocking mailbox, we can go lockless - new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this - final def enqueue(m: MessageInvocation) = this.add(m) - final def dequeue(): MessageInvocation = this.poll() - } + new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { + @inline final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + @inline final def enqueue(m: MessageInvocation) = this.add(m) + @inline final def dequeue(): MessageInvocation = this.poll() } case b: BoundedMailbox => - new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut, b.blocking) with ExecutableMailbox { - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { + @inline final def dispatcher = ExecutorBasedEventDrivenDispatcher.this } } @@ -294,13 +288,13 @@ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { case b: UnboundedMailbox => - new UnboundedPriorityMessageQueue(b.blocking, comparator) with ExecutableMailbox { - final def dispatcher = self + new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { + @inline final def dispatcher = self } case b: BoundedMailbox => - new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, b.blocking, comparator) with ExecutableMailbox { - final def dispatcher = self + new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox { + @inline final def dispatcher = self } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index e0586a40a7..cacdefe95c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -30,9 +30,8 @@ trait MessageQueue { */ sealed trait MailboxType -case class UnboundedMailbox(val blocking: Boolean = false) extends MailboxType +case class UnboundedMailbox() extends MailboxType case class BoundedMailbox( - val blocking: Boolean = false, val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") @@ -40,46 +39,35 @@ case class BoundedMailbox( } trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => - def blockDequeue: Boolean - - final def enqueue(handle: MessageInvocation) { - this add handle - } - - final def dequeue(): MessageInvocation = { - if (blockDequeue) this.take() - else this.poll() - } + @inline final def enqueue(handle: MessageInvocation): Unit = this add handle + @inline final def dequeue(): MessageInvocation = this.poll() } trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => - def blockDequeue: Boolean def pushTimeOut: Duration final def enqueue(handle: MessageInvocation) { - if (pushTimeOut.length > 0 && pushTimeOut.toMillis > 0) { - if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) + if (pushTimeOut.length > 0) { + this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) } } else this put handle } - final def dequeue(): MessageInvocation = - if (blockDequeue) this.take() - else this.poll() + @inline final def dequeue(): MessageInvocation = this.poll() } -class DefaultUnboundedMessageQueue(val blockDequeue: Boolean) extends +class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics -class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean) extends +class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics -class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends +class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics -class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends - BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with - BoundedMessageQueueSemantics +class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends + BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with + BoundedMessageQueueSemantics diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 14348e9f85..9e53bb09ca 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -221,9 +221,8 @@ abstract class MessageDispatcherConfigurator { def mailboxType(config: Configuration): MailboxType = { val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY) - // FIXME how do we read in isBlocking for mailbox? Now set to 'false'. if (capacity < 1) UnboundedMailbox() - else BoundedMailbox(false, capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) + else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) } def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala index a8dfcf5860..9ed0ce8ef1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala @@ -25,13 +25,13 @@ class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType) private[akka] val owner = new AtomicReference[ActorRef](_actor) def this(actor: ActorRef) = - this(actor, UnboundedMailbox(true)) // For Java API + this(actor, UnboundedMailbox()) // For Java API def this(actor: ActorRef, capacity: Int) = - this(actor, BoundedMailbox(true, capacity)) //For Java API + this(actor, BoundedMailbox(capacity)) //For Java API def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API - this(actor, BoundedMailbox(true, capacity, pushTimeOut)) + this(actor, BoundedMailbox(capacity, pushTimeOut)) override def register(actorRef: ActorRef) = { val actor = owner.get()