diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ed88921e16..ea5b89671a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -168,36 +168,9 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext val mailBox = actor.mailbox mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up actor.mailbox = deadLetterMailbox - cleanUpMailboxFor(actor, mailBox) mailBox.cleanUp() } - /** - * Overridable callback to clean up the mailbox for a given actor, - * called when an actor is unregistered. - */ - protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { - - if (mailBox.hasSystemMessages) { - var message = mailBox.systemDrain() - while (message ne null) { - // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null - deadLetterMailbox.systemEnqueue(actor.self, message) - message = next - } - } - - if (mailBox.hasMessages) { - var envelope = mailBox.dequeue - while (envelope ne null) { - deadLetterMailbox.enqueue(actor.self, envelope) - envelope = mailBox.dequeue - } - } - } - private val shutdownAction = new Runnable { @tailrec final def run() { diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 6f45d8629c..9dd0733328 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -65,6 +65,21 @@ class BalancingDispatcher( final def numberOfMessages: Int = messageQueue.numberOfMessages final def hasMessages: Boolean = messageQueue.hasMessages + + override def cleanUp(): Unit = { + //Don't call the original implementation of this since it scraps all messages, and we don't want to do that + if (hasSystemMessages) { + val dlq = actor.systemImpl.deadLetterMailbox + var message = systemDrain() + while (message ne null) { + // message must be “virgin” before being able to systemEnqueue again + val next = message.next + message.next = null + dlq.systemEnqueue(actor.self, message) + message = next + } + } + } } protected[akka] override def register(actor: ActorCell) = { @@ -78,19 +93,6 @@ class BalancingDispatcher( intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray } - protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { - if (mailBox.hasSystemMessages) { - var message = mailBox.systemDrain() - while (message ne null) { - // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null - prerequisites.deadLetterMailbox.systemEnqueue(actor.self, message) - message = next - } - } - } - def intoTheFray(except: ActorCell): Unit = if (rebalance.compareAndSet(false, true)) { try { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 3389e413a9..4787b22f78 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -209,8 +209,29 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes /** * Overridable callback to clean up the mailbox, * called when an actor is unregistered. + * By default it dequeues all system messages + messages and ships them to the owning actors' systems' DeadLetterMailbox */ - protected[dispatch] def cleanUp() {} + protected[dispatch] def cleanUp() { + val dlq = actor.systemImpl.deadLetterMailbox + if (hasSystemMessages) { + var message = systemDrain() + while (message ne null) { + // message must be “virgin” before being able to systemEnqueue again + val next = message.next + message.next = null + dlq.systemEnqueue(actor.self, message) + message = next + } + } + + if (hasMessages) { + var envelope = dequeue + while (envelope ne null) { + dlq.enqueue(actor.self, envelope) + envelope = dequeue + } + } + } } trait MessageQueue {