From 657f62361cce417895d2bbc93b35206434adc91c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Aug 2010 16:31:41 +0200 Subject: [PATCH] Changed ThreadBasedDispatcher from LinkedBlockingQueue to TransferQueue --- .../src/main/scala/dispatch/Dispatchers.scala | 4 +-- .../main/scala/dispatch/MessageHandling.scala | 3 ++ .../dispatch/ThreadBasedDispatcher.scala | 35 +++++++++++++------ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index f9ffc219cf..f8aaba1653 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -44,8 +44,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D * @author Jonas Bonér */ object Dispatchers extends Logging { - val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) + val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) lazy val defaultGlobalDispatcher = { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 92926bb253..0f8a4881a8 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -12,6 +12,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce import java.util.concurrent.ConcurrentHashMap import org.multiverse.commitbarriers.CountDownCommitBarrier +import se.scalablesolutions.akka.AkkaException /** * @author Jonas Bonér @@ -57,6 +58,8 @@ final class MessageInvocation(val receiver: ActorRef, } } +class MessageQueueAppendFailedException(message: String) extends AkkaException(message) + /** * @author Jonas Bonér */ diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 684a38a97c..30c0884f26 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -4,23 +4,28 @@ package se.scalablesolutions.akka.dispatch -import java.util.concurrent.LinkedBlockingQueue import java.util.Queue import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.config.Config.config +import concurrent.forkjoin.LinkedTransferQueue +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher { +class ThreadBasedDispatcher(private val actor: ActorRef, + val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY, + val pushTimeout: Long = 1000, + val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS + ) extends MessageDispatcher { def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java private val name = actor.getClass.getName + ":" + actor.uuid private val threadName = "akka:thread-based:dispatcher:" + name - private val queue = new BlockingMessageQueue(name, mailboxCapacity) + private val queue = new BlockingMessageTransferQueue(name, mailboxCapacity,pushTimeout,pushTimeoutUnit) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -53,12 +58,20 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In override def toString = "ThreadBasedDispatcher[" + threadName + "]" } -// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher -class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue { - private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) - else new LinkedBlockingQueue[MessageInvocation] - def append(invocation: MessageInvocation) = queue.put(invocation) - def take: MessageInvocation = queue.take - def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException - def interrupt = throw new UnsupportedOperationException +// FIXME: configure the LinkedTransferQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher +class BlockingMessageTransferQueue(name: String, + mailboxCapacity: Int, + pushTimeout: Long, + pushTimeoutUnit: TimeUnit) extends MessageQueue { + private val queue = if (mailboxCapacity > 0) new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) + else new LinkedTransferQueue[MessageInvocation] + + final def append(invocation: MessageInvocation): Unit = { + if(!queue.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer + if(!queue.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting + throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") + } + } + + final def take: MessageInvocation = queue.take }