From fc77a138e71923d0af9ea631f993f03669dc2aa8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 27 Sep 2010 17:30:49 +0200 Subject: [PATCH] Fixing ticket 413 --- .../ExecutorBasedEventDrivenDispatcher.scala | 4 ++++ ...orBasedEventDrivenWorkStealingDispatcher.scala | 15 +++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index de4512c094..aa525adb92 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -116,6 +116,10 @@ class ExecutorBasedEventDrivenDispatcher( val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 do { nextMessage.invoke + + if (nextMessage.receiver.isBeingRestarted) + return !self.isEmpty + if (throttle) { // Will be elided when false processedMessages += 1 if ((processedMessages >= throughput) || diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index faefa4fd10..a5ed113b97 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -75,33 +75,36 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return true if the mailbox was processed, false otherwise */ private def tryProcessMailbox(mailbox: MessageQueue): Boolean = { - var lockAcquiredOnce = false + var mailboxWasProcessed = false // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { if (mailbox.dispatcherLock.tryLock) { - lockAcquiredOnce = true try { - processMailbox(mailbox) + mailboxWasProcessed = processMailbox(mailbox) } finally { mailbox.dispatcherLock.unlock } } - } while ((lockAcquiredOnce && !mailbox.isEmpty)) + } while ((mailboxWasProcessed && !mailbox.isEmpty)) - lockAcquiredOnce + mailboxWasProcessed } /** * Process the messages in the mailbox of the given actor. + * @return */ - private def processMailbox(mailbox: MessageQueue) = { + private def processMailbox(mailbox: MessageQueue): Boolean = { var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke + if (messageInvocation.receiver.isBeingRestarted) + return false messageInvocation = mailbox.dequeue } + true } private def findThief(receiver: ActorRef): Option[ActorRef] = {