diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 106e2eab4c..51428440fd 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -74,8 +74,11 @@ An (unbounded) deque-based mailbox can be configured as follows: * Prepends all messages in the stash to the mailbox, and then clears the stash. */ def unstashAll(): Unit = { - theStash.reverseIterator foreach mailbox.queue.addFirst - theStash = Vector.empty[Envelope] + try { + mailbox.enqueueAllFirst(self, theStash.reverseIterator, theStash.size) + } finally { + theStash = Vector.empty[Envelope] + } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 1c95843977..281bf9ef09 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -318,8 +318,77 @@ trait QueueBasedMessageQueue extends MessageQueue { final def hasMessages = !queue.isEmpty } +trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle + + final def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle + + final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int) = + handleIterator foreach { enqueueFirst(receiver, _) } + + final def dequeue(): Envelope = queue.poll() +} + +trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { + import java.util.concurrent.locks.ReentrantLock + + /* used to enable atomic all-or-nothing enqueueAllFirst in the presence of potential + * capacity violations + */ + private val lock = new ReentrantLock(false) + + def pushTimeOut: Duration + override def queue: BlockingDeque[Envelope] + + final def enqueue(receiver: ActorRef, handle: Envelope) { + lock.lock() + try { + if (pushTimeOut.length > 0) { + queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) + } + } else queue put handle + } finally { + lock.unlock() + } + } + + final def enqueueFirst(receiver: ActorRef, handle: Envelope) { + lock.lock() + try { + if (pushTimeOut.length > 0) { + queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || { + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) + } + } else queue putFirst handle + } finally { + lock.unlock() + } + } + + final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int) { + lock.lock() + try { + handleIterator foreach { enqueueFirst(receiver, _) } + } finally { + lock.unlock() + } + } + + final def dequeue(): Envelope = { + lock.lock() + try { + queue.poll() + } finally { + lock.unlock() + } + } +} + trait DequeBasedMessageQueue extends QueueBasedMessageQueue { def queue: Deque[Envelope] + def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit + def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int): Unit } /** @@ -339,13 +408,6 @@ case class UnboundedMailbox() extends MailboxType { } } -case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType { - final override def create(receiver: ActorContext): Mailbox = - new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { - final val queue = new LinkedBlockingDeque[Envelope]() - } -} - case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") @@ -377,3 +439,21 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va } } +case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType { + final override def create(receiver: ActorContext): Mailbox = + new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new LinkedBlockingDeque[Envelope]() + } +} + +case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { + + if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") + if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") + + final override def create(receiver: ActorContext): Mailbox = + new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new LinkedBlockingDeque[Envelope](capacity) + final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut + } +}