From 5fdaad467fae7df48d9dd2f22ef57563a43b446f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 9 Sep 2010 17:34:05 +0200 Subject: [PATCH 1/9] Optimization started of EBEDD --- .../ExecutorBasedEventDrivenDispatcher.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index dbbfe3442a..bbe4b53cde 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -64,7 +64,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} */ class ExecutorBasedEventDrivenDispatcher( _name: String, - throughput: Int = Dispatchers.THROUGHPUT, + val throughput: Int = Dispatchers.THROUGHPUT, mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG, config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { @@ -109,7 +109,7 @@ class ExecutorBasedEventDrivenDispatcher( // Only dispatch if we got the lock. Otherwise another thread is already dispatching. lockAcquiredOnce = true try { - finishedBeforeMailboxEmpty = processMailbox(receiver) + finishedBeforeMailboxEmpty = processMailbox(receiver,mailbox) } finally { lock.unlock if (finishedBeforeMailboxEmpty) dispatch(receiver) @@ -128,20 +128,24 @@ class ExecutorBasedEventDrivenDispatcher( * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - def processMailbox(receiver: ActorRef): Boolean = { + def processMailbox(receiver: ActorRef,mailbox: MessageQueue): Boolean = { + val throttle = throughput > 0 var processedMessages = 0 - val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.dequeue - while (messageInvocation != null) { - messageInvocation.invoke - processedMessages += 1 - // check if we simply continue with other messages, or reached the throughput limit - if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue - else { - messageInvocation = null - return !mailbox.isEmpty + var nextMessage = mailbox.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke + + if(throttle) { //Will be JIT:Ed away when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !mailbox.isEmpty + } + nextMessage = mailbox.dequeue } + while (nextMessage ne null) } + false } From 158ea29bfde1713e746df71a8147450f43d250fe Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 10 Sep 2010 18:12:09 +0200 Subject: [PATCH 2/9] Massive refactoring of EBEDD and WorkStealer and basically everything... --- .../src/main/scala/actor/ActorRef.scala | 6 - .../ExecutorBasedEventDrivenDispatcher.scala | 94 +++++---- ...sedEventDrivenWorkStealingDispatcher.scala | 77 ++++--- .../main/scala/dispatch/MessageHandling.scala | 61 ++---- .../src/main/scala/dispatch/Queues.scala | 195 +++++++++++------- akka-actor/src/main/scala/util/LockUtil.scala | 48 +++++ 6 files changed, 280 insertions(+), 201 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index c41408a1de..86b6c2ec65 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -196,12 +196,6 @@ trait ActorRef extends */ @volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None - /** - * This lock ensures thread safety in the dispatching: only one message can - * be dispatched at once on the actor. - */ - protected[akka] val dispatcherLock = new ReentrantLock - /** * This is a reference to the message currently being processed by the actor */ diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index bbe4b53cde..0e9acf62e1 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -81,72 +81,70 @@ class ExecutorBasedEventDrivenDispatcher( init def dispatch(invocation: MessageInvocation) = { - getMailbox(invocation.receiver) enqueue invocation - dispatch(invocation.receiver) + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + dispatch(mbox) } /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with Runnable] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false) - - def dispatch(receiver: ActorRef): Unit = if (active) { - - executor.execute(new Runnable() { - def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - val lock = receiver.dispatcherLock - val mailbox = getMailbox(receiver) - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - if (lock.tryLock) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - try { - finishedBeforeMailboxEmpty = processMailbox(receiver,mailbox) - } finally { - lock.unlock - if (finishedBeforeMailboxEmpty) dispatch(receiver) - } + override def createMailbox(actorRef: ActorRef): AnyRef = new DefaultMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,false) with Runnable { + def run = { + var lockAcquiredOnce = false + var finishedBeforeMailboxEmpty = false + // this do-while loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock + do { + if (dispatcherLock.tryLock()) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + lockAcquiredOnce = true + try { + finishedBeforeMailboxEmpty = processMailbox() + } finally { + dispatcherLock.unlock() + if (finishedBeforeMailboxEmpty) + dispatch(this) } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) - } - }) - } else { - log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver) - } - + } + } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !this.isEmpty)) + } /** - * Process the messages in the mailbox of the given actor. + * Process the messages in the mailbox * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - def processMailbox(receiver: ActorRef,mailbox: MessageQueue): Boolean = { - val throttle = throughput > 0 - var processedMessages = 0 - var nextMessage = mailbox.dequeue - if (nextMessage ne null) { - do { - nextMessage.invoke + def processMailbox(): Boolean = { + val throttle = throughput > 0 + var processedMessages = 0 + var nextMessage = this.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke - if(throttle) { //Will be JIT:Ed away when false - processedMessages += 1 - if (processedMessages >= throughput) //If we're throttled, break out - return !mailbox.isEmpty + if(throttle) { //Will be JIT:Ed away when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !this.isEmpty + } + nextMessage = this.dequeue } - nextMessage = mailbox.dequeue + while (nextMessage ne null) } - while (nextMessage ne null) - } - false + false + } + } + + def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) { + executor.execute(mailbox) + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) } def start = if (!active) { diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 9b1097213e..c97c16c238 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -56,21 +56,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def dispatch(invocation: MessageInvocation) = if (active) { - getMailbox(invocation.receiver).add(invocation) - executor.execute(new Runnable() { - def run = { - if (!tryProcessMailbox(invocation.receiver)) { - // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox - // to another actor and then process his mailbox in stead. - findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) ) - } - } - }) + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + executor execute mbox } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") /** @@ -79,22 +72,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * * @return true if the mailbox was processed, false otherwise */ - private def tryProcessMailbox(receiver: ActorRef): Boolean = { + private def tryProcessMailbox(mailbox: MessageQueue): Boolean = { var lockAcquiredOnce = false - val lock = receiver.dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { - if (lock.tryLock) { + if (mailbox.dispatcherLock.tryLock) { lockAcquiredOnce = true try { - processMailbox(receiver) + processMailbox(mailbox) } finally { - lock.unlock + mailbox.dispatcherLock.unlock } } - } while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty)) + } while ((lockAcquiredOnce && !mailbox.isEmpty)) lockAcquiredOnce } @@ -102,12 +94,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Process the messages in the mailbox of the given actor. */ - private def processMailbox(receiver: ActorRef) = { - val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.poll - while (messageInvocation != null) { + private def processMailbox(mailbox: MessageQueue) = { + var messageInvocation = mailbox.dequeue + while (messageInvocation ne null) { messageInvocation.invoke - messageInvocation = mailbox.poll + messageInvocation = mailbox.dequeue } } @@ -145,11 +136,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. */ private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { - if (thief.dispatcherLock.tryLock) { + val mailbox = getMailbox(thief) + if (mailbox.dispatcherLock.tryLock) { try { - while(donateMessage(receiver, thief)) processMailbox(thief) + while(donateMessage(receiver, thief)) processMailbox(mailbox) } finally { - thief.dispatcherLock.unlock + mailbox.dispatcherLock.unlock } } } @@ -191,18 +183,45 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } protected override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation] - else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) + if (mailboxCapacity <= 0) { + new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { + def enqueue(handle: MessageInvocation): Unit = this.add(handle) + def dequeue: MessageInvocation = this.poll() + + def run = { + if (!tryProcessMailbox(this)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) + } + } + } + } + else { + new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) with MessageQueue with Runnable { + def enqueue(handle: MessageInvocation): Unit = this.add(handle) + + def dequeue: MessageInvocation = this.poll() + + def run = { + if (!tryProcessMailbox(this)) { + // we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox + // to another actor and then process his mailbox in stead. + findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) + } + } + } + } } override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) - pooledActors.add(actorRef) + pooledActors add actorRef super.register(actorRef) } override def unregister(actorRef: ActorRef) = { - pooledActors.remove(actorRef) + pooledActors remove actorRef super.unregister(actorRef) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 383c58905a..015ae9422b 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce import org.multiverse.commitbarriers.CountDownCommitBarrier import se.scalablesolutions.akka.AkkaException -import se.scalablesolutions.akka.util.{Duration, HashCode, Logging} import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue +import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} /** * @author Jonas Bonér @@ -63,6 +63,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m * @author Jonas Bonér */ trait MessageQueue { + val dispatcherLock = new SimpleLock def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int @@ -84,40 +85,28 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD */ def newMailbox(bounds: Int = capacity, pushTime: Option[Duration] = pushTimeOut, - blockDequeue: Boolean = blockingDequeue) : MessageQueue = { - if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue - new LinkedTransferQueue[MessageInvocation] with MessageQueue { - def enqueue(handle: MessageInvocation): Unit = this add handle - def dequeue(): MessageInvocation = { - if(blockDequeue) this.take() - else this.poll() - } - } - } - else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue - val time = pushTime.get - new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue { - def enqueue(handle: MessageInvocation) { - if (!this.offer(handle,time.length,time.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) - } + blockDequeue: Boolean = blockingDequeue) : MessageQueue = new DefaultMessageQueue(bounds,pushTime,blockDequeue) +} - def dequeue(): MessageInvocation = { - if (blockDequeue) this.take() - else this.poll() - } +class DefaultMessageQueue(override val capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends BoundableTransferQueue[MessageInvocation](capacity) with MessageQueue { + def enqueue(handle: MessageInvocation) { + if(bounded) { + if (pushTimeOut.isDefined) { + if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) } - } - else { //BOUNDED: Blocking enqueue and optionally blocking dequeue - new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue { - def enqueue(handle: MessageInvocation): Unit = this put handle - def dequeue(): MessageInvocation = { - if(blockDequeue) this.take() - else this.poll() - } + else { + this.put(handle) } + } else { + this.add(handle) } } + + def dequeue(): MessageInvocation = { + if (blockDequeue) this.take() + else this.poll() + } } /** @@ -156,14 +145,4 @@ trait MessageDispatcher extends Logging { * Creates and returns a mailbox for the given actor */ protected def createMailbox(actorRef: ActorRef): AnyRef = null -} - -/** - * @author Jonas Bonér - */ -trait MessageDemultiplexer { - def select - def wakeUp - def acquireSelectedInvocations: List[MessageInvocation] - def releaseSelectedInvocations -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala index 2ba88f25c3..8c75d6a42b 100644 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ b/akka-actor/src/main/scala/dispatch/Queues.scala @@ -9,120 +9,160 @@ import java.util.concurrent.{TimeUnit, Semaphore} import java.util.Iterator import se.scalablesolutions.akka.util.Logger -class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { - require(capacity > 0) +class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { + val bounded = (capacity > 0) - protected val guard = new Semaphore(capacity) + protected lazy val guard = new Semaphore(capacity) override def take(): E = { - val e = super.take - if (e ne null) guard.release - e + if (!bounded) { + super.take + } else { + val e = super.take + if (e ne null) guard.release + e + } } override def poll(): E = { - val e = super.poll - if (e ne null) guard.release - e + if (!bounded) { + super.poll + } else { + val e = super.poll + if (e ne null) guard.release + e + } } override def poll(timeout: Long, unit: TimeUnit): E = { - val e = super.poll(timeout,unit) - if (e ne null) guard.release - e + if (!bounded) { + super.poll(timeout,unit) + } else { + val e = super.poll(timeout,unit) + if (e ne null) guard.release + e + } } - override def remainingCapacity = guard.availablePermits + override def remainingCapacity: Int = { + if (!bounded) super.remainingCapacity + else guard.availablePermits + } override def remove(o: AnyRef): Boolean = { - if (super.remove(o)) { - guard.release - true + if (!bounded) { + super.remove(o) } else { - false + if (super.remove(o)) { + guard.release + true + } else false } } override def offer(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.offer(e) + } else { + if (guard.tryAcquire) { + val result = try { + super.offer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.offer(e,timeout,unit) + } else { + if (guard.tryAcquire(timeout,unit)) { + val result = try { + super.offer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def add(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.add(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.add(e) + } else { + if (guard.tryAcquire) { + val result = try { + super.add(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def put(e :E): Unit = { - guard.acquire - try { + if (!bounded) { super.put(e) - } catch { - case e => guard.release; throw e + } else { + guard.acquire + try { + super.put(e) + } catch { + case e => guard.release; throw e + } } } override def tryTransfer(e: E): Boolean = { - if (guard.tryAcquire) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.tryTransfer(e) + } else { + if (guard.tryAcquire) { + val result = try { + super.tryTransfer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else - false + if (!bounded) { + super.tryTransfer(e,timeout,unit) + } else { + if (guard.tryAcquire(timeout,unit)) { + val result = try { + super.tryTransfer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else false + } } override def transfer(e: E): Unit = { - if (guard.tryAcquire) { - try { - super.transfer(e) - } catch { - case e => guard.release; throw e + if (!bounded) { + super.transfer(e) + } else { + if (guard.tryAcquire) { + try { + super.transfer(e) + } catch { + case e => guard.release; throw e + } } } } @@ -134,7 +174,8 @@ class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransfe def next = it.next def remove { it.remove - guard.release //Assume remove worked if no exception was thrown + if (bounded) + guard.release //Assume remove worked if no exception was thrown } } } diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 885e11def7..ee7f4f0efc 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} +import java.util.concurrent.atomic.AtomicBoolean /** * @author Jonas Bonér @@ -58,3 +59,50 @@ class ReadWriteGuard { } } +/** + * A very simple lock that uses CCAS (Compare Compare-And-Swap) + * Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods + */ +class SimpleLock { + val acquired = new AtomicBoolean(false) + + def ifPossible(perform: () => Unit): Boolean = { + if (tryLock()) { + try { + perform + } finally { + unlock() + } + true + } else false + } + + def ifPossibleYield[T](perform: () => T): Option[T] = { + if (tryLock()) { + try { + Some(perform()) + } finally { + unlock() + } + } else None + } + + def ifPossibleApply[T,R](value: T)(function: (T) => R): Option[R] = { + if (tryLock()) { + try { + Some(function(value)) + } finally { + unlock() + } + } else None + } + + def tryLock() = { + if (acquired.get) false + else acquired.compareAndSet(false,true) + } + + def unlock() { + acquired.set(false) + } +} \ No newline at end of file From 94ad3f9ff20a8da83495f3c4ece88bb568d7c5fe Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 10 Sep 2010 18:25:24 +0200 Subject: [PATCH 3/9] Added more safeguards to the WorkStealers tests --- ...asedEventDrivenWorkStealingDispatcher.scala | 1 - ...EventDrivenWorkStealingDispatcherSpec.scala | 18 +++++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index c97c16c238..10afb1bfb6 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -200,7 +200,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( else { new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) with MessageQueue with Runnable { def enqueue(handle: MessageInvocation): Unit = this.add(handle) - def dequeue: MessageInvocation = this.poll() def run = { diff --git a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index cde57a0544..3285e450c6 100644 --- a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -5,11 +5,10 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test -import se.scalablesolutions.akka.dispatch.Dispatchers - import java.util.concurrent.{TimeUnit, CountDownLatch} import se.scalablesolutions.akka.actor.{IllegalActorStateException, Actor} import Actor._ +import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers} object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") @@ -18,7 +17,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { self.dispatcher = delayableActorDispatcher - var invocationCount = 0 + @volatile var invocationCount = 0 self.id = name def receive = { @@ -61,10 +60,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with val slow = actorOf(new DelayableActor("slow", 50, finishedCounter)).start val fast = actorOf(new DelayableActor("fast", 10, finishedCounter)).start + var sentToFast = 0 + for (i <- 1 to 100) { // send most work to slow actor - if (i % 20 == 0) + if (i % 20 == 0) { fast ! i + sentToFast += 1 + } else slow ! i } @@ -72,13 +75,18 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with // now send some messages to actors to keep the dispatcher dispatching messages for (i <- 1 to 10) { Thread.sleep(150) - if (i % 2 == 0) + if (i % 2 == 0) { fast ! i + sentToFast += 1 + } else slow ! i } finishedCounter.await(5, TimeUnit.SECONDS) + fast.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) + slow.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) + fast.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.actor.asInstanceOf[DelayableActor].invocationCount) slow.stop From c3d66ed3d79eb814fcf1f589e1d8e0e5da7a61f3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 11 Sep 2010 15:24:09 +0200 Subject: [PATCH 4/9] 1 entry per mailbox at most --- .../ExecutorBasedEventDrivenDispatcher.scala | 106 ++++++++++-------- .../main/scala/dispatch/MessageHandling.scala | 42 ++++--- .../scala/dispatch/ThreadPoolBuilder.scala | 7 +- akka-actor/src/main/scala/util/LockUtil.scala | 6 + 4 files changed, 92 insertions(+), 69 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 0e9acf62e1..19045e123b 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} import java.util.Queue -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} +import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -80,6 +80,52 @@ class ExecutorBasedEventDrivenDispatcher( val name = "akka:event-driven:dispatcher:" + _name init + /** + * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox + */ + trait ExecutableMailbox { self: MessageQueue with Runnable => + def run = { + try { + val reDispatch = processMailbox()//Returns true if we need to reschedule the processing + self.dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing + if (reDispatch) + dispatch(self) + } catch { + case e => + dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing + if(!self.isEmpty) //If the mailbox isn't empty, try to re-schedule processing, equivalent to reDispatch + dispatch(self) + throw e //Can't just swallow exceptions or errors + } + } + + /** + * Process the messages in the mailbox + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + def processMailbox(): Boolean = { + val throttle = throughput > 0 + var processedMessages = 0 + var nextMessage = self.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke + + if(throttle) { //Will be elided when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !self.isEmpty + } + nextMessage = self.dequeue + } + while (nextMessage ne null) + } + + false + } + } + def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation @@ -93,56 +139,18 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = new DefaultMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,false) with Runnable { - def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - if (dispatcherLock.tryLock()) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - try { - finishedBeforeMailboxEmpty = processMailbox() - } finally { - dispatcherLock.unlock() - if (finishedBeforeMailboxEmpty) - dispatch(this) - } - } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !this.isEmpty)) - } - - /** - * Process the messages in the mailbox - * - * @return true if the processing finished before the mailbox was empty, due to the throughput constraint - */ - def processMailbox(): Boolean = { - val throttle = throughput > 0 - var processedMessages = 0 - var nextMessage = this.dequeue - if (nextMessage ne null) { - do { - nextMessage.invoke - - if(throttle) { //Will be JIT:Ed away when false - processedMessages += 1 - if (processedMessages >= throughput) //If we're throttled, break out - return !this.isEmpty - } - nextMessage = this.dequeue - } - while (nextMessage ne null) - } - - false - } - } + override def createMailbox(actorRef: ActorRef): AnyRef = + if (mailboxCapacity > 0) new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with Runnable with ExecutableMailbox + else new DefaultUnboundedMessageQueue(blockDequeue = false) with Runnable with ExecutableMailbox def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) { - executor.execute(mailbox) + if (mailbox.dispatcherLock.tryLock()) {//Ensure that only one runnable can be in the executor pool at the same time + try { + executor execute mailbox + } catch { + case e: RejectedExecutionException => mailbox.dispatcherLock.unlock() + } + } } else { log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 015ae9422b..c2ec47c446 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -85,28 +85,36 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD */ def newMailbox(bounds: Int = capacity, pushTime: Option[Duration] = pushTimeOut, - blockDequeue: Boolean = blockingDequeue) : MessageQueue = new DefaultMessageQueue(bounds,pushTime,blockDequeue) + blockDequeue: Boolean = blockingDequeue) : MessageQueue = + if (capacity > 0) new DefaultBoundedMessageQueue(bounds,pushTime,blockDequeue) + else new DefaultUnboundedMessageQueue(blockDequeue) } -class DefaultMessageQueue(override val capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends BoundableTransferQueue[MessageInvocation](capacity) with MessageQueue { - def enqueue(handle: MessageInvocation) { - if(bounded) { - if (pushTimeOut.isDefined) { - if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) - } - else { - this.put(handle) - } - } else { - this.add(handle) - } +class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { + final def enqueue(handle: MessageInvocation) { + this add handle } - - def dequeue(): MessageInvocation = { + + final def dequeue(): MessageInvocation = if (blockDequeue) this.take() else this.poll() +} + +class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue { + final def enqueue(handle: MessageInvocation) { + if (pushTimeOut.isDefined) { + if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) + } + else { + this put handle + } } + + final def dequeue(): MessageInvocation = + if (blockDequeue) this.take() + else this.poll() + } /** @@ -128,7 +136,7 @@ trait MessageDispatcher extends Logging { } def unregister(actorRef: ActorRef) = { uuids remove actorRef.uuid - //actorRef.mailbox = null //FIXME should we null out the mailbox here? + actorRef.mailbox = null if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 5ad1b89aca..eb573cde70 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException import se.scalablesolutions.akka.util.{Logger, Logging} +import concurrent.forkjoin.LinkedTransferQueue trait ThreadPoolBuilder extends Logging { val name: String @@ -69,7 +70,7 @@ trait ThreadPoolBuilder extends Logging { def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] + blockingQueue = new LinkedTransferQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) boundedExecutorBound = bound this @@ -78,7 +79,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] + blockingQueue = new LinkedTransferQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this @@ -87,7 +88,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable](capacity) + blockingQueue = new BoundableTransferQueue[Runnable](capacity) threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index ee7f4f0efc..3d1261e468 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -102,6 +102,12 @@ class SimpleLock { else acquired.compareAndSet(false,true) } + def tryUnlock() = { + acquired.compareAndSet(true,false) + } + + def locked = acquired.get + def unlock() { acquired.set(false) } From 4f473d307173ba2837f1ac327a6028aacb1d163b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 12 Sep 2010 11:24:27 +0200 Subject: [PATCH 5/9] Safekeeping --- .../src/main/scala/actor/ActorRegistry.scala | 4 +-- .../src/main/scala/dispatch/Dispatchers.scala | 2 +- .../main/scala/util/ListenerManagement.scala | 17 ++++++++++++ .../src/main/scala/remote/RemoteClient.scala | 22 ++++++++-------- .../src/main/scala/remote/RemoteServer.scala | 26 +++++++++---------- .../src/main/scala/actor/TypedActor.scala | 4 +-- 6 files changed, 46 insertions(+), 29 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index f3a479e6fd..51bbfd3477 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -125,7 +125,7 @@ object ActorRegistry extends ListenerManagement { actorsByUUID.put(actor.uuid, actor) // notify listeners - foreachListener(_ ! ActorRegistered(actor)) + notifyListeners(ActorRegistered(actor)) } /** @@ -137,7 +137,7 @@ object ActorRegistry extends ListenerManagement { actorsById.remove(actor.id,actor) // notify listeners - foreachListener(_ ! ActorUnregistered(actor)) + notifyListeners(ActorUnregistered(actor)) } /** diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 9a7e44a197..2ebba03928 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -45,7 +45,7 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID} */ object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MAILBOX_CONFIG = MailboxConfig( capacity = Dispatchers.MAILBOX_CAPACITY, pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)), diff --git a/akka-actor/src/main/scala/util/ListenerManagement.scala b/akka-actor/src/main/scala/util/ListenerManagement.scala index 0e17058380..7ad0f451f1 100644 --- a/akka-actor/src/main/scala/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/util/ListenerManagement.scala @@ -40,6 +40,23 @@ trait ListenerManagement extends Logging { if (manageLifeCycleOfListeners) listener.stop } + /* + * Returns whether there are any listeners currently + */ + def hasListeners: Boolean = !listeners.isEmpty + + protected def notifyListeners(message: => Any) { + if (hasListeners) { + val msg = message + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + if (listener.isRunning) listener ! msg + else log.warning("Can't notify [%s] since it is not running.", listener) + } + } + } + /** * Execute f with each listener as argument. */ diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index f61a5d63a1..264081259f 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -220,10 +220,10 @@ class RemoteClient private[akka] ( val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - foreachListener(_ ! RemoteClientError(connection.getCause, this)) + notifyListeners(RemoteClientError(connection.getCause, this)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } - foreachListener(_ ! RemoteClientStarted(this)) + notifyListeners(RemoteClientStarted(this)) isRunning = true } } @@ -232,7 +232,7 @@ class RemoteClient private[akka] ( log.info("Shutting down %s", name) if (isRunning) { isRunning = false - foreachListener(_ ! RemoteClientShutdown(this)) + notifyListeners(RemoteClientShutdown(this)) timer.stop timer = null openChannels.close.awaitUninterruptibly @@ -250,7 +250,7 @@ class RemoteClient private[akka] ( @deprecated("Use removeListener instead") def deregisterListener(actorRef: ActorRef) = removeListener(actorRef) - override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected override def manageLifeCycleOfListeners = false @@ -287,7 +287,7 @@ class RemoteClient private[akka] ( } else { val exception = new RemoteClientException( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - foreachListener(l => l ! RemoteClientError(exception, this)) + notifyListeners(RemoteClientError(exception, this)) throw exception } @@ -403,12 +403,12 @@ class RemoteClientHandler( futures.remove(reply.getId) } else { val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.foreachListener(_ ! RemoteClientError(exception, client)) + client.notifyListeners(RemoteClientError(exception, client)) throw exception } } catch { case e: Exception => - client.foreachListener(_ ! RemoteClientError(e, client)) + client.notifyListeners(RemoteClientError(e, client)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -423,7 +423,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client)) + client.notifyListeners(RemoteClientError(client.connection.getCause, client)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -433,7 +433,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.foreachListener(_ ! RemoteClientConnected(client)) + client.notifyListeners(RemoteClientConnected(client)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -450,12 +450,12 @@ class RemoteClientHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.foreachListener(_ ! RemoteClientDisconnected(client)) + client.notifyListeners(RemoteClientDisconnected(client)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.foreachListener(_ ! RemoteClientError(event.getCause, client)) + client.notifyListeners(RemoteClientError(event.getCause, client)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index b10d8e5825..27b5a50bfc 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) - foreachListener(_ ! RemoteServerStarted(this)) + notifyListeners(RemoteServerStarted(this)) } } catch { case e => log.error(e, "Could not start up remote server") - foreachListener(_ ! RemoteServerError(e, this)) + notifyListeners(RemoteServerError(e, this)) } this } @@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) - foreachListener(_ ! RemoteServerShutdown(this)) + notifyListeners(RemoteServerShutdown(this)) } catch { case e: java.nio.channels.ClosedChannelException => {} case e => log.warning("Could not close remote server channel in a graceful way") @@ -323,7 +323,7 @@ class RemoteServer extends Logging with ListenerManagement { protected override def manageLifeCycleOfListeners = false - protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { RemoteServer.actorsFor(address).actors @@ -413,18 +413,18 @@ class RemoteServerHandler( def operationComplete(future: ChannelFuture): Unit = { if (future.isSuccess) { openChannels.add(future.getChannel) - server.foreachListener(_ ! RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server)) } else future.getChannel.close } }) } else { - server.foreachListener(_ ! RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server)) } } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { log.debug("Remote client disconnected from [%s]", server.name) - server.foreachListener(_ ! RemoteServerClientDisconnected(server)) + server.notifyListeners(RemoteServerClientDisconnected(server)) } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -446,7 +446,7 @@ class RemoteServerHandler( override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close - server.foreachListener(_ ! RemoteServerError(event.getCause, server)) + server.notifyListeners(RemoteServerError(event.getCause, server)) } private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { @@ -491,7 +491,7 @@ class RemoteServerHandler( } catch { case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) } } } @@ -523,10 +523,10 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => channel.write(createErrorReplyMessage(e, request, false)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) } } @@ -559,7 +559,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote actor instance") - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) throw e } } else actorRefOrNull @@ -590,7 +590,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote typed actor instance") - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) throw e } } else typedActorOrNull diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index b27f5b4b4d..52bd0e6cb6 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -674,7 +674,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { def register(proxy: AnyRef, init: AspectInit) = { val res = initializations.put(proxy, init) - foreachListener(_ ! AspectInitRegistered(proxy, init)) + notifyListeners(AspectInitRegistered(proxy, init)) res } @@ -683,7 +683,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { */ def unregister(proxy: AnyRef): AspectInit = { val init = initializations.remove(proxy) - foreachListener(_ ! AspectInitUnregistered(proxy, init)) + notifyListeners(AspectInitUnregistered(proxy, init)) init.actorRef.stop init } From acbcd9ef5498a3f1720eb1fd47570a149b9bf81a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 12 Sep 2010 18:59:21 +0200 Subject: [PATCH 6/9] Take advantage of short-circuit to avoid lazy init if possible --- akka-actor/src/main/scala/actor/ActorRef.scala | 8 ++++---- akka-actor/src/main/scala/dispatch/MessageHandling.scala | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 86b6c2ec65..81d4c40b74 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -972,12 +972,12 @@ class LocalActorRef private[akka]( protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) - if (isRemotingEnabled && remoteAddress.isDefined) { + if (remoteAddress.isDefined && isRemotingEnabled) { RemoteClientModule.send[Any]( message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor) } else { val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) - invocation.send + dispatcher dispatch invocation } } @@ -988,7 +988,7 @@ class LocalActorRef private[akka]( senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { joinTransaction(message) - if (isRemotingEnabled && remoteAddress.isDefined) { + if (remoteAddress.isDefined && isRemotingEnabled) { val future = RemoteClientModule.send[T]( message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor) if (future.isDefined) future.get @@ -998,7 +998,7 @@ class LocalActorRef private[akka]( else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get) - invocation.send + dispatcher dispatch invocation future } } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index c2ec47c446..25a02f2603 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -30,8 +30,6 @@ final class MessageInvocation(val receiver: ActorRef, "Don't call 'self ! message' in the Actor's constructor (e.g. body of the class).") } - def send = receiver.dispatcher.dispatch(this) - override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, receiver.actor) From 6ae312fcd6207e0bcdfc19c9ce4964700e8aaada Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 12 Sep 2010 21:00:50 +0200 Subject: [PATCH 7/9] Switching dispatching strategy to 1 runnable per mailbox and removing use of TransferQueue --- .../src/main/scala/actor/ActorRef.scala | 21 +- .../ExecutorBasedEventDrivenDispatcher.scala | 51 +++-- .../src/main/scala/dispatch/Queues.scala | 182 ------------------ .../scala/dispatch/ThreadPoolBuilder.scala | 7 +- 4 files changed, 34 insertions(+), 227 deletions(-) delete mode 100644 akka-actor/src/main/scala/dispatch/Queues.scala diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 81d4c40b74..7777051ac1 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -199,10 +199,7 @@ trait ActorRef extends /** * This is a reference to the message currently being processed by the actor */ - protected[akka] var _currentMessage: Option[MessageInvocation] = None - - protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } - protected[akka] def currentMessage = guard.withGuard { _currentMessage } + @volatile protected[akka] var currentMessage: MessageInvocation = null /** * Comparison only takes uuid into account. @@ -1010,7 +1007,7 @@ class LocalActorRef private[akka]( if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { - currentMessage = Option(messageHandle) + currentMessage = messageHandle try { dispatch(messageHandle) } catch { @@ -1018,7 +1015,7 @@ class LocalActorRef private[akka]( Actor.log.error(e, "Could not invoke actor [%s]", this) throw e } finally { - currentMessage = None //TODO: Don't reset this, we might want to resend the message + currentMessage = null //TODO: Don't reset this, we might want to resend the message } } } @@ -1182,7 +1179,7 @@ class LocalActorRef private[akka]( } private def dispatch[T](messageHandle: MessageInvocation) = { - Actor.log.trace("Invoking actor with message:\n" + messageHandle) + Actor.log.trace("Invoking actor with message: %s\n",messageHandle) val message = messageHandle.message //serializeMessage(messageHandle.message) var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = @@ -1529,10 +1526,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent from another Actor, else None. */ def sender: Option[ActorRef] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.sender + if (msg eq null) None + else msg.sender } /** @@ -1540,10 +1536,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ def senderFuture(): Option[CompletableFuture[Any]] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.senderFuture + if (msg eq null) None + else msg.senderFuture } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 949d701a21..bd8416c95f 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -85,30 +85,15 @@ class ExecutorBasedEventDrivenDispatcher( */ trait ExecutableMailbox extends Runnable { self: MessageQueue => final def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - finishedBeforeMailboxEmpty = false //Reset this every run - if (dispatcherLock.tryLock()) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - finishedBeforeMailboxEmpty = try { - processMailbox() - } catch { - case e => - dispatcherLock.unlock() - if (!self.isEmpty) - registerForExecution(self) - throw e - } - dispatcherLock.unlock() - if (finishedBeforeMailboxEmpty) - registerForExecution(self) - } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty)) + val reschedule = try { + processMailbox() + } finally { + dispatcherLock.unlock() + } + + if (reschedule || !self.isEmpty) + registerForExecution(self) } /** @@ -144,6 +129,20 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } + protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { + if (mailbox.dispatcherLock.tryLock()) { + try { + executor execute mailbox + } catch { + case e: RejectedExecutionException => + mailbox.dispatcherLock.unlock() + throw e + } + } + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) + } + /** * @return the mailbox associated with the actor */ @@ -158,11 +157,6 @@ class ExecutorBasedEventDrivenDispatcher( new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox } - protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { - executor execute mailbox - } else { - log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) - } def start = if (!active) { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) @@ -186,6 +180,7 @@ class ExecutorBasedEventDrivenDispatcher( // FIXME: should we have an unbounded queue and not bounded as default ???? private[akka] def init = { withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + //withNewThreadPoolWithLinkedBlockingQueueWithCapacity(16) config(this) buildThreadPool } diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala deleted file mode 100644 index 8c75d6a42b..0000000000 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import concurrent.forkjoin.LinkedTransferQueue -import java.util.concurrent.{TimeUnit, Semaphore} -import java.util.Iterator -import se.scalablesolutions.akka.util.Logger - -class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { - val bounded = (capacity > 0) - - protected lazy val guard = new Semaphore(capacity) - - override def take(): E = { - if (!bounded) { - super.take - } else { - val e = super.take - if (e ne null) guard.release - e - } - } - - override def poll(): E = { - if (!bounded) { - super.poll - } else { - val e = super.poll - if (e ne null) guard.release - e - } - } - - override def poll(timeout: Long, unit: TimeUnit): E = { - if (!bounded) { - super.poll(timeout,unit) - } else { - val e = super.poll(timeout,unit) - if (e ne null) guard.release - e - } - } - - override def remainingCapacity: Int = { - if (!bounded) super.remainingCapacity - else guard.availablePermits - } - - override def remove(o: AnyRef): Boolean = { - if (!bounded) { - super.remove(o) - } else { - if (super.remove(o)) { - guard.release - true - } else false - } - } - - override def offer(e: E): Boolean = { - if (!bounded) { - super.offer(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (!bounded) { - super.offer(e,timeout,unit) - } else { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def add(e: E): Boolean = { - if (!bounded) { - super.add(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.add(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def put(e :E): Unit = { - if (!bounded) { - super.put(e) - } else { - guard.acquire - try { - super.put(e) - } catch { - case e => guard.release; throw e - } - } - } - - override def tryTransfer(e: E): Boolean = { - if (!bounded) { - super.tryTransfer(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (!bounded) { - super.tryTransfer(e,timeout,unit) - } else { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def transfer(e: E): Unit = { - if (!bounded) { - super.transfer(e) - } else { - if (guard.tryAcquire) { - try { - super.transfer(e) - } catch { - case e => guard.release; throw e - } - } - } - } - - override def iterator: Iterator[E] = { - val it = super.iterator - new Iterator[E] { - def hasNext = it.hasNext - def next = it.next - def remove { - it.remove - if (bounded) - guard.release //Assume remove worked if no exception was thrown - } - } - } -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index eb573cde70..5ad1b89aca 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -11,7 +11,6 @@ import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException import se.scalablesolutions.akka.util.{Logger, Logging} -import concurrent.forkjoin.LinkedTransferQueue trait ThreadPoolBuilder extends Logging { val name: String @@ -70,7 +69,7 @@ trait ThreadPoolBuilder extends Logging { def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedTransferQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) boundedExecutorBound = bound this @@ -79,7 +78,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedTransferQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this @@ -88,7 +87,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new BoundableTransferQueue[Runnable](capacity) + blockingQueue = new LinkedBlockingQueue[Runnable](capacity) threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this From 839c81d06ba601a39ee297a55282c903e7d8c2dd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 12 Sep 2010 21:00:50 +0200 Subject: [PATCH 8/9] Switching dispatching strategy to 1 runnable per mailbox and removing use of TransferQueue --- .../src/main/scala/actor/ActorRef.scala | 21 +- .../ExecutorBasedEventDrivenDispatcher.scala | 50 +++-- .../src/main/scala/dispatch/Queues.scala | 182 ------------------ .../scala/dispatch/ThreadPoolBuilder.scala | 7 +- 4 files changed, 33 insertions(+), 227 deletions(-) delete mode 100644 akka-actor/src/main/scala/dispatch/Queues.scala diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 81d4c40b74..7777051ac1 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -199,10 +199,7 @@ trait ActorRef extends /** * This is a reference to the message currently being processed by the actor */ - protected[akka] var _currentMessage: Option[MessageInvocation] = None - - protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } - protected[akka] def currentMessage = guard.withGuard { _currentMessage } + @volatile protected[akka] var currentMessage: MessageInvocation = null /** * Comparison only takes uuid into account. @@ -1010,7 +1007,7 @@ class LocalActorRef private[akka]( if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { - currentMessage = Option(messageHandle) + currentMessage = messageHandle try { dispatch(messageHandle) } catch { @@ -1018,7 +1015,7 @@ class LocalActorRef private[akka]( Actor.log.error(e, "Could not invoke actor [%s]", this) throw e } finally { - currentMessage = None //TODO: Don't reset this, we might want to resend the message + currentMessage = null //TODO: Don't reset this, we might want to resend the message } } } @@ -1182,7 +1179,7 @@ class LocalActorRef private[akka]( } private def dispatch[T](messageHandle: MessageInvocation) = { - Actor.log.trace("Invoking actor with message:\n" + messageHandle) + Actor.log.trace("Invoking actor with message: %s\n",messageHandle) val message = messageHandle.message //serializeMessage(messageHandle.message) var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = @@ -1529,10 +1526,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent from another Actor, else None. */ def sender: Option[ActorRef] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.sender + if (msg eq null) None + else msg.sender } /** @@ -1540,10 +1536,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ def senderFuture(): Option[CompletableFuture[Any]] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.senderFuture + if (msg eq null) None + else msg.senderFuture } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 949d701a21..6cabdec5e5 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -85,30 +85,15 @@ class ExecutorBasedEventDrivenDispatcher( */ trait ExecutableMailbox extends Runnable { self: MessageQueue => final def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - finishedBeforeMailboxEmpty = false //Reset this every run - if (dispatcherLock.tryLock()) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - finishedBeforeMailboxEmpty = try { - processMailbox() - } catch { - case e => - dispatcherLock.unlock() - if (!self.isEmpty) - registerForExecution(self) - throw e - } - dispatcherLock.unlock() - if (finishedBeforeMailboxEmpty) - registerForExecution(self) - } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty)) + val reschedule = try { + processMailbox() + } finally { + dispatcherLock.unlock() + } + + if (reschedule || !self.isEmpty) + registerForExecution(self) } /** @@ -144,6 +129,20 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } + protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { + if (mailbox.dispatcherLock.tryLock()) { + try { + executor execute mailbox + } catch { + case e: RejectedExecutionException => + mailbox.dispatcherLock.unlock() + throw e + } + } + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) + } + /** * @return the mailbox associated with the actor */ @@ -158,11 +157,6 @@ class ExecutorBasedEventDrivenDispatcher( new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox } - protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { - executor execute mailbox - } else { - log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) - } def start = if (!active) { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala deleted file mode 100644 index 8c75d6a42b..0000000000 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import concurrent.forkjoin.LinkedTransferQueue -import java.util.concurrent.{TimeUnit, Semaphore} -import java.util.Iterator -import se.scalablesolutions.akka.util.Logger - -class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { - val bounded = (capacity > 0) - - protected lazy val guard = new Semaphore(capacity) - - override def take(): E = { - if (!bounded) { - super.take - } else { - val e = super.take - if (e ne null) guard.release - e - } - } - - override def poll(): E = { - if (!bounded) { - super.poll - } else { - val e = super.poll - if (e ne null) guard.release - e - } - } - - override def poll(timeout: Long, unit: TimeUnit): E = { - if (!bounded) { - super.poll(timeout,unit) - } else { - val e = super.poll(timeout,unit) - if (e ne null) guard.release - e - } - } - - override def remainingCapacity: Int = { - if (!bounded) super.remainingCapacity - else guard.availablePermits - } - - override def remove(o: AnyRef): Boolean = { - if (!bounded) { - super.remove(o) - } else { - if (super.remove(o)) { - guard.release - true - } else false - } - } - - override def offer(e: E): Boolean = { - if (!bounded) { - super.offer(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (!bounded) { - super.offer(e,timeout,unit) - } else { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def add(e: E): Boolean = { - if (!bounded) { - super.add(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.add(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def put(e :E): Unit = { - if (!bounded) { - super.put(e) - } else { - guard.acquire - try { - super.put(e) - } catch { - case e => guard.release; throw e - } - } - } - - override def tryTransfer(e: E): Boolean = { - if (!bounded) { - super.tryTransfer(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (!bounded) { - super.tryTransfer(e,timeout,unit) - } else { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def transfer(e: E): Unit = { - if (!bounded) { - super.transfer(e) - } else { - if (guard.tryAcquire) { - try { - super.transfer(e) - } catch { - case e => guard.release; throw e - } - } - } - } - - override def iterator: Iterator[E] = { - val it = super.iterator - new Iterator[E] { - def hasNext = it.hasNext - def next = it.next - def remove { - it.remove - if (bounded) - guard.release //Assume remove worked if no exception was thrown - } - } - } -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index eb573cde70..5ad1b89aca 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -11,7 +11,6 @@ import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException import se.scalablesolutions.akka.util.{Logger, Logging} -import concurrent.forkjoin.LinkedTransferQueue trait ThreadPoolBuilder extends Logging { val name: String @@ -70,7 +69,7 @@ trait ThreadPoolBuilder extends Logging { def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedTransferQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) boundedExecutorBound = bound this @@ -79,7 +78,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedTransferQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this @@ -88,7 +87,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new BoundableTransferQueue[Runnable](capacity) + blockingQueue = new LinkedBlockingQueue[Runnable](capacity) threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this From f59bf050df3ec3f46dcc2362b1e44dc9ea2bb969 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 13 Sep 2010 14:00:24 +0200 Subject: [PATCH 9/9] Merge introduced old code --- .../main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index bd8416c95f..6cabdec5e5 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -180,7 +180,6 @@ class ExecutorBasedEventDrivenDispatcher( // FIXME: should we have an unbounded queue and not bounded as default ???? private[akka] def init = { withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - //withNewThreadPoolWithLinkedBlockingQueueWithCapacity(16) config(this) buildThreadPool }