diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a4b6db2443..f320999e6f 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -224,7 +224,6 @@ trait Actor extends TransactionManagement { */ private[akka] val _dispatcherLock:Lock = new ReentrantLock - // ==================================== // protected fields // ==================================== diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 77f16954d6..ed8fa9acfe 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,19 +62,24 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock - if (lockedForDispatching) { - try { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - var messageInvocation = invocation.receiver._mailbox.poll - while (messageInvocation != null) { - messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + var lockAcquiredOnce = false + // this do-wile loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock + do { + if (invocation.receiver._dispatcherLock.tryLock) { + lockAcquiredOnce = true + try { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } + } finally { + invocation.receiver._dispatcherLock.unlock } - } finally { - invocation.receiver._dispatcherLock.unlock } - } + } while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty)) } }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") @@ -94,4 +99,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche "Can't build a new thread pool for a dispatcher that is already up and running") private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool -} \ No newline at end of file +} diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index d9b5b1b6e0..ded6cd7f14 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -60,14 +60,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * @return true if the mailbox was processed, false otherwise */ private def tryProcessMailbox(receiver: Actor): Boolean = { - if (receiver._dispatcherLock.tryLock) { - try { - processMailbox(receiver) - } finally { - receiver._dispatcherLock.unlock + var lockAcquiredOnce = false + // this do-wile loop is required to prevent missing new messages between the end of processing + // the mailbox and releasing the lock + do { + if (receiver._dispatcherLock.tryLock) { + lockAcquiredOnce = true + try { + processMailbox(receiver) + } finally { + receiver._dispatcherLock.unlock + } } - return true - } else return false + } while ((lockAcquiredOnce && !receiver._mailbox.isEmpty)) + + return lockAcquiredOnce } /** @@ -127,7 +134,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = { val donated = receiver._mailbox.pollLast if (donated != null) { - thief.forward(donated.message)(Some(donated.receiver)) + //TODO: forward seems to fail from time to time ?! + //thief.forward(donated.message)(Some(donated.receiver)) + thief.send(donated.message) return Some(donated) } else return None } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index c3c7211d97..891126f22e 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -358,7 +358,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { def deployTask(info: ProjectInfo, toDir: Path) = task { val projectPath = info.projectPath.toString - val moduleName = projectPath.substring(projectPath.lastIndexOf('/') + 1, projectPath.length) + val moduleName = projectPath.substring(projectPath.lastIndexOf(System.getProperty("file.separator")) + 1, projectPath.length) // FIXME need to find out a way to grab these paths from the sbt system val JAR_FILE_NAME = moduleName + "_%s-%s.jar".format(defScalaVersion.value, version) val JAR_FILE_PATH = projectPath + "/target/scala_%s/".format(defScalaVersion.value) + JAR_FILE_NAME