diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 676c33b48c..58a498b948 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -292,6 +292,12 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ def hasSystemMessages: Boolean = systemQueueGet ne null } +trait QueueBasedMessageQueue extends MessageQueue { + def queue: Queue[Envelope] + final def numberOfMessages = queue.size + final def hasMessages = !queue.isEmpty +} + trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle final def dequeue(): Envelope = queue.poll() @@ -301,21 +307,20 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(receiver: ActorRef, handle: Envelope) { - if (pushTimeOut.length > 0) { + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = + if (pushTimeOut.length > 0) queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) } - } else queue put handle - } + else queue put handle final def dequeue(): Envelope = queue.poll() } -trait QueueBasedMessageQueue extends MessageQueue { - def queue: Queue[Envelope] - final def numberOfMessages = queue.size - final def hasMessages = !queue.isEmpty +trait DequeBasedMessageQueue extends QueueBasedMessageQueue { + def queue: Deque[Envelope] + def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit + def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int): Unit } trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { @@ -323,7 +328,7 @@ trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { final def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle - final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int) = + final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int): Unit = handleIterator foreach { enqueueFirst(receiver, _) } final def dequeue(): Envelope = queue.poll() @@ -333,37 +338,28 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingDeque[Envelope] - final def enqueue(receiver: ActorRef, handle: Envelope) { - if (pushTimeOut.length > 0) { + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = + if (pushTimeOut.length > 0) queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) } - } else queue put handle - } + else queue put handle - final def enqueueFirst(receiver: ActorRef, handle: Envelope) { - if (pushTimeOut.length > 0) { + final def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = + if (pushTimeOut.length > 0) queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) } - } else queue putFirst handle - } + else queue putFirst handle - final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int) { - if (queue.asInstanceOf[BlockingQueue[Envelope]].remainingCapacity >= size) { + final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int): Unit = + if (queue.asInstanceOf[BlockingQueue[Envelope]].remainingCapacity >= size) handleIterator foreach { enqueueFirst(receiver, _) } - } else throw new MessageQueueAppendFailedException("Couldn't enqueue stash to " + receiver) - } + else throw new MessageQueueAppendFailedException("Couldn't enqueue stash to " + receiver) final def dequeue(): Envelope = queue.poll() } -trait DequeBasedMessageQueue extends QueueBasedMessageQueue { - def queue: Deque[Envelope] - def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit - def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int): Unit -} - /** * Mailbox configuration. */