From c3d66ed3d79eb814fcf1f589e1d8e0e5da7a61f3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 11 Sep 2010 15:24:09 +0200 Subject: [PATCH] 1 entry per mailbox at most --- .../ExecutorBasedEventDrivenDispatcher.scala | 106 ++++++++++-------- .../main/scala/dispatch/MessageHandling.scala | 42 ++++--- .../scala/dispatch/ThreadPoolBuilder.scala | 7 +- akka-actor/src/main/scala/util/LockUtil.scala | 6 + 4 files changed, 92 insertions(+), 69 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 0e9acf62e1..19045e123b 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} import java.util.Queue -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} +import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -80,6 +80,52 @@ class ExecutorBasedEventDrivenDispatcher( val name = "akka:event-driven:dispatcher:" + _name init + /** + * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox + */ + trait ExecutableMailbox { self: MessageQueue with Runnable => + def run = { + try { + val reDispatch = processMailbox()//Returns true if we need to reschedule the processing + self.dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing + if (reDispatch) + dispatch(self) + } catch { + case e => + dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing + if(!self.isEmpty) //If the mailbox isn't empty, try to re-schedule processing, equivalent to reDispatch + dispatch(self) + throw e //Can't just swallow exceptions or errors + } + } + + /** + * Process the messages in the mailbox + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + def processMailbox(): Boolean = { + val throttle = throughput > 0 + var processedMessages = 0 + var nextMessage = self.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke + + if(throttle) { //Will be elided when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !self.isEmpty + } + nextMessage = self.dequeue + } + while (nextMessage ne null) + } + + false + } + } + def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation @@ -93,56 +139,18 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = new DefaultMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,false) with Runnable { - def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - if (dispatcherLock.tryLock()) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - try { - finishedBeforeMailboxEmpty = processMailbox() - } finally { - dispatcherLock.unlock() - if (finishedBeforeMailboxEmpty) - dispatch(this) - } - } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !this.isEmpty)) - } - - /** - * Process the messages in the mailbox - * - * @return true if the processing finished before the mailbox was empty, due to the throughput constraint - */ - def processMailbox(): Boolean = { - val throttle = throughput > 0 - var processedMessages = 0 - var nextMessage = this.dequeue - if (nextMessage ne null) { - do { - nextMessage.invoke - - if(throttle) { //Will be JIT:Ed away when false - processedMessages += 1 - if (processedMessages >= throughput) //If we're throttled, break out - return !this.isEmpty - } - nextMessage = this.dequeue - } - while (nextMessage ne null) - } - - false - } - } + override def createMailbox(actorRef: ActorRef): AnyRef = + if (mailboxCapacity > 0) new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with Runnable with ExecutableMailbox + else new DefaultUnboundedMessageQueue(blockDequeue = false) with Runnable with ExecutableMailbox def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) { - executor.execute(mailbox) + if (mailbox.dispatcherLock.tryLock()) {//Ensure that only one runnable can be in the executor pool at the same time + try { + executor execute mailbox + } catch { + case e: RejectedExecutionException => mailbox.dispatcherLock.unlock() + } + } } else { log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 015ae9422b..c2ec47c446 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -85,28 +85,36 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD */ def newMailbox(bounds: Int = capacity, pushTime: Option[Duration] = pushTimeOut, - blockDequeue: Boolean = blockingDequeue) : MessageQueue = new DefaultMessageQueue(bounds,pushTime,blockDequeue) + blockDequeue: Boolean = blockingDequeue) : MessageQueue = + if (capacity > 0) new DefaultBoundedMessageQueue(bounds,pushTime,blockDequeue) + else new DefaultUnboundedMessageQueue(blockDequeue) } -class DefaultMessageQueue(override val capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends BoundableTransferQueue[MessageInvocation](capacity) with MessageQueue { - def enqueue(handle: MessageInvocation) { - if(bounded) { - if (pushTimeOut.isDefined) { - if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) - } - else { - this.put(handle) - } - } else { - this.add(handle) - } +class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { + final def enqueue(handle: MessageInvocation) { + this add handle } - - def dequeue(): MessageInvocation = { + + final def dequeue(): MessageInvocation = if (blockDequeue) this.take() else this.poll() +} + +class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue { + final def enqueue(handle: MessageInvocation) { + if (pushTimeOut.isDefined) { + if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) + } + else { + this put handle + } } + + final def dequeue(): MessageInvocation = + if (blockDequeue) this.take() + else this.poll() + } /** @@ -128,7 +136,7 @@ trait MessageDispatcher extends Logging { } def unregister(actorRef: ActorRef) = { uuids remove actorRef.uuid - //actorRef.mailbox = null //FIXME should we null out the mailbox here? + actorRef.mailbox = null if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 5ad1b89aca..eb573cde70 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException import se.scalablesolutions.akka.util.{Logger, Logging} +import concurrent.forkjoin.LinkedTransferQueue trait ThreadPoolBuilder extends Logging { val name: String @@ -69,7 +70,7 @@ trait ThreadPoolBuilder extends Logging { def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] + blockingQueue = new LinkedTransferQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) boundedExecutorBound = bound this @@ -78,7 +79,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] + blockingQueue = new LinkedTransferQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this @@ -87,7 +88,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable](capacity) + blockingQueue = new BoundableTransferQueue[Runnable](capacity) threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index ee7f4f0efc..3d1261e468 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -102,6 +102,12 @@ class SimpleLock { else acquired.compareAndSet(false,true) } + def tryUnlock() = { + acquired.compareAndSet(true,false) + } + + def locked = acquired.get + def unlock() { acquired.set(false) }