From 657f62361cce417895d2bbc93b35206434adc91c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Aug 2010 16:31:41 +0200 Subject: [PATCH 1/4] Changed ThreadBasedDispatcher from LinkedBlockingQueue to TransferQueue --- .../src/main/scala/dispatch/Dispatchers.scala | 4 +-- .../main/scala/dispatch/MessageHandling.scala | 3 ++ .../dispatch/ThreadBasedDispatcher.scala | 35 +++++++++++++------ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index f9ffc219cf..f8aaba1653 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -44,8 +44,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D * @author Jonas Bonér */ object Dispatchers extends Logging { - val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) + val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) lazy val defaultGlobalDispatcher = { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 92926bb253..0f8a4881a8 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -12,6 +12,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce import java.util.concurrent.ConcurrentHashMap import org.multiverse.commitbarriers.CountDownCommitBarrier +import se.scalablesolutions.akka.AkkaException /** * @author Jonas Bonér @@ -57,6 +58,8 @@ final class MessageInvocation(val receiver: ActorRef, } } +class MessageQueueAppendFailedException(message: String) extends AkkaException(message) + /** * @author Jonas Bonér */ diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 684a38a97c..30c0884f26 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -4,23 +4,28 @@ package se.scalablesolutions.akka.dispatch -import java.util.concurrent.LinkedBlockingQueue import java.util.Queue import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.config.Config.config +import concurrent.forkjoin.LinkedTransferQueue +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher { +class ThreadBasedDispatcher(private val actor: ActorRef, + val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY, + val pushTimeout: Long = 1000, + val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS + ) extends MessageDispatcher { def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java private val name = actor.getClass.getName + ":" + actor.uuid private val threadName = "akka:thread-based:dispatcher:" + name - private val queue = new BlockingMessageQueue(name, mailboxCapacity) + private val queue = new BlockingMessageTransferQueue(name, mailboxCapacity,pushTimeout,pushTimeoutUnit) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -53,12 +58,20 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In override def toString = "ThreadBasedDispatcher[" + threadName + "]" } -// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher -class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue { - private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) - else new LinkedBlockingQueue[MessageInvocation] - def append(invocation: MessageInvocation) = queue.put(invocation) - def take: MessageInvocation = queue.take - def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException - def interrupt = throw new UnsupportedOperationException +// FIXME: configure the LinkedTransferQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher +class BlockingMessageTransferQueue(name: String, + mailboxCapacity: Int, + pushTimeout: Long, + pushTimeoutUnit: TimeUnit) extends MessageQueue { + private val queue = if (mailboxCapacity > 0) new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) + else new LinkedTransferQueue[MessageInvocation] + + final def append(invocation: MessageInvocation): Unit = { + if(!queue.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer + if(!queue.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting + throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") + } + } + + final def take: MessageInvocation = queue.take } From 13e4ad49b2c7352ebda5922f301469591d6f4fb1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 27 Aug 2010 17:56:09 +0200 Subject: [PATCH 2/4] Added boilerplate to improve BoundedTransferQueue performance --- .../src/main/scala/dispatch/Queues.scala | 128 +++++++++++++----- 1 file changed, 92 insertions(+), 36 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala index 7f16f7aa57..5b5aa6683e 100644 --- a/akka-core/src/main/scala/dispatch/Queues.scala +++ b/akka-core/src/main/scala/dispatch/Queues.scala @@ -20,31 +20,23 @@ class BoundedTransferQueue[E <: AnyRef]( protected val guard = new Semaphore(capacity) - //Enqueue an item within the push timeout (acquire Semaphore) - protected def enq(f: => Boolean): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { - val result = try { - f - } catch { - case e => - guard.release //If something broke, release - throw e - } - if (!result) guard.release //Didn't add anything - result - } else - false - } - - //Dequeue an item (release Semaphore) - protected def deq(e: E): E = { - if (e ne null) guard.release //Signal removal of item + override def take(): E = { + val e = super.take + if (e ne null) guard.release e } - override def take(): E = deq(super.take) - override def poll(): E = deq(super.poll) - override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit)) + override def poll(): E = { + val e = super.poll + if (e ne null) guard.release + e + } + + override def poll(timeout: Long, unit: TimeUnit): E = { + val e = super.poll(timeout,unit) + if (e ne null) guard.release + e + } override def remainingCapacity = guard.availablePermits @@ -57,26 +49,90 @@ class BoundedTransferQueue[E <: AnyRef]( } } - override def offer(e: E): Boolean = - enq(super.offer(e)) + override def offer(e: E): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.offer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } - override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = - enq(super.offer(e,timeout,unit)) + override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.offer(e,timeout,unit) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } - override def add(e: E): Boolean = - enq(super.add(e)) + override def add(e: E): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.add(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } - override def put(e :E): Unit = - enq({ super.put(e); true }) + override def put(e :E): Unit = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + try { + super.put(e) + } catch { + case e => guard.release; throw e + } + } + } - override def tryTransfer(e: E): Boolean = - enq(super.tryTransfer(e)) + override def tryTransfer(e: E): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.tryTransfer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } - override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = - enq(super.tryTransfer(e,timeout,unit)) + override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.tryTransfer(e,timeout,unit) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } - override def transfer(e: E): Unit = - enq({ super.transfer(e); true }) + override def transfer(e: E): Unit = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + try { + super.transfer(e) + } catch { + case e => guard.release; throw e + } + } + } override def iterator: Iterator[E] = { val it = super.iterator From 09ab5a5554ae688896c6b7c803426ebf2341f6a5 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 30 Aug 2010 15:53:56 +0200 Subject: [PATCH 3/4] Moving Queues into akka-actor --- {akka-core => akka-actor}/src/main/scala/dispatch/Queues.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {akka-core => akka-actor}/src/main/scala/dispatch/Queues.scala (100%) diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala similarity index 100% rename from akka-core/src/main/scala/dispatch/Queues.scala rename to akka-actor/src/main/scala/dispatch/Queues.scala From 4df0b66fbde21353f96fbd29154c0bb545c0cf14 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 31 Aug 2010 09:33:34 +0200 Subject: [PATCH 4/4] Increased the default timeout for ThreadBasedDispatcher to 10 seconds --- akka-actor/src/main/scala/dispatch/Dispatchers.scala | 10 ++++++++++ .../main/scala/dispatch/ThreadBasedDispatcher.scala | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index f8aaba1653..803fd700cc 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.Config.config import net.lag.configgy.ConfigMap import se.scalablesolutions.akka.util.UUID import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} +import java.util.concurrent.TimeUnit /** * Scala API. Dispatcher factory. @@ -75,6 +76,7 @@ object Dispatchers extends Logging { /** * Creates an thread based dispatcher serving a single actor through the same single thread. + * Uses the default timeout *

* E.g. each actor consumes its own thread. */ @@ -82,11 +84,19 @@ object Dispatchers extends Logging { /** * Creates an thread based dispatcher serving a single actor through the same single thread. + * Uses the default timeout *

* E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + *

+ * E.g. each actor consumes its own thread. + */ + def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeout: Long, pushTimeUnit: TimeUnit) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeout, pushTimeUnit) + /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index e72ea8316e..589500f413 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -18,7 +18,7 @@ import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} */ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY, - val pushTimeout: Long = 1000, + val pushTimeout: Long = 10000, val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS ) extends MessageDispatcher { def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java