diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index bb0f845aba..3abd961d0f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -227,27 +227,28 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue * called when an actor is unregistered. * By default it dequeues all system messages + messages and ships them to the owning actors' systems' DeadLetterMailbox */ - protected[dispatch] def cleanUp(): Unit = if (actor ne null) { - val dlq = actor.systemImpl.deadLetterMailbox - if (hasSystemMessages) { - var message = systemDrain() - while (message ne null) { - // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null - dlq.systemEnqueue(actor.self, message) - message = next + protected[dispatch] def cleanUp(): Unit = + if (actor ne null) { // actor is null for the deadLetterMailbox + val dlq = actor.systemImpl.deadLetterMailbox + if (hasSystemMessages) { + var message = systemDrain() + while (message ne null) { + // message must be “virgin” before being able to systemEnqueue again + val next = message.next + message.next = null + dlq.systemEnqueue(actor.self, message) + message = next + } } - } - if (hasMessages) { - var envelope = dequeue - while (envelope ne null) { - dlq.enqueue(actor.self, envelope) - envelope = dequeue + if (hasMessages) { + var envelope = dequeue + while (envelope ne null) { + dlq.enqueue(actor.self, envelope) + envelope = dequeue + } } } - } } trait MessageQueue { diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index e17507d427..65bcf563fc 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -148,6 +148,11 @@ class Switch(startAsOn: Boolean = false) { if (switch.get) on else off } + /** + * Executes the given code while holding this switch’s lock, i.e. protected from concurrent modification of the switch status. + */ + def locked[T](code: ⇒ T) = synchronized { code } + /** * Returns whether the switch is IMMEDIATELY on (no locking) */ diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 947ae4e262..191901b4ee 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -133,11 +133,6 @@ class CallingThreadDispatcher( protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor) - private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match { - case m: CallingThreadMailbox ⇒ Some(m) - case _ ⇒ None - } - protected[akka] override def shutdown() {} protected[akka] override def throughput = 0 @@ -147,7 +142,10 @@ class CallingThreadDispatcher( protected[akka] override def shutdownTimeout = 1 second override def suspend(actor: ActorCell) { - getMailbox(actor) foreach (_.suspendSwitch.switchOn) + actor.mailbox match { + case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn + case m ⇒ m.systemEnqueue(actor.self, Suspend()) + } } override def resume(actor: ActorCell) { @@ -187,12 +185,10 @@ class CallingThreadDispatcher( false } { queue.push(handle) - if (queue.isActive) - false - else { + if (!queue.isActive) { queue.enter true - } + } else false } if (execute) runQueue(mbox, queue) case m ⇒ m.enqueue(receiver.self, handle) @@ -214,14 +210,14 @@ class CallingThreadDispatcher( private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue, interruptedex: InterruptedException = null) { var intex = interruptedex; assert(queue.isActive) - mbox.lock.lock + mbox.ctdLock.lock val recurse = try { mbox.processAllSystemMessages() val handle = mbox.suspendSwitch.fold[Envelope] { queue.leave null } { - val ret = queue.pop + val ret = if (mbox.isClosed) null else queue.pop if (ret eq null) queue.leave ret } @@ -248,7 +244,7 @@ class CallingThreadDispatcher( } catch { case e ⇒ queue.leave; throw e } finally { - mbox.lock.unlock + mbox.ctdLock.unlock } if (recurse) { runQueue(mbox, queue, intex) @@ -295,11 +291,23 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with def queue = q.get - val lock = new ReentrantLock + val ctdLock = new ReentrantLock val suspendSwitch = new Switch override def enqueue(receiver: ActorRef, msg: Envelope) {} override def dequeue() = null override def hasMessages = queue.isEmpty override def numberOfMessages = queue.size + + override def cleanUp(): Unit = { + /* + * This is called from dispatcher.unregister, i.e. under this.lock. If + * another thread obtained a reference to this mailbox and enqueues after + * the gather operation, tough luck: no guaranteed delivery to deadLetters. + */ + suspendSwitch.locked { + CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue) + super.cleanUp() + } + } }