diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 32a8268c29..29ac371eea 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -44,11 +44,9 @@ class BalancingDispatcher( protected val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] - final val dispatcher = BalancingDispatcher.this } case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { final val queue = new LinkedBlockingQueue[Envelope](cap) - final val dispatcher = BalancingDispatcher.this final val pushTimeOut = timeout } case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") @@ -73,18 +71,19 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { super.unregister(actor) - intoTheFray(except = actor) buddies.remove(actor) + intoTheFray(except = actor) } protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { if (mailBox.hasSystemMessages) { - var messages = mailBox.systemDrain() - while (messages ne null) { - deadLetterMailbox.systemEnqueue(actor.self, messages) //Send to dead letter queue - messages = messages.next - if (messages eq null) //Make sure that any system messages received after the current drain are also sent to the dead letter mbox - messages = mailBox.systemDrain() + var message = mailBox.systemDrain() + while (message ne null) { + // message must be “virgin” before being able to systemEnqueue again + val next = message.next + message.next = null + deadLetterMailbox.systemEnqueue(actor.self, message) + message = next } } }