Changed ThreadBasedDispatcher from LinkedBlockingQueue to TransferQueue

This commit is contained in:
Viktor Klang 2010-08-26 16:31:41 +02:00
parent 1d96584e35
commit 657f62361c
3 changed files with 29 additions and 13 deletions

View file

@ -44,8 +44,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)

View file

@ -12,6 +12,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce
import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier
import se.scalablesolutions.akka.AkkaException
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -57,6 +58,8 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -4,23 +4,28 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.LinkedTransferQueue
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher {
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY,
val pushTimeout: Long = 1000,
val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS
) extends MessageDispatcher {
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
private val queue = new BlockingMessageQueue(name, mailboxCapacity)
private val queue = new BlockingMessageTransferQueue(name, mailboxCapacity,pushTimeout,pushTimeoutUnit)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@ -53,12 +58,20 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue {
private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
else new LinkedBlockingQueue[MessageInvocation]
def append(invocation: MessageInvocation) = queue.put(invocation)
def take: MessageInvocation = queue.take
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
def interrupt = throw new UnsupportedOperationException
// FIXME: configure the LinkedTransferQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
class BlockingMessageTransferQueue(name: String,
mailboxCapacity: Int,
pushTimeout: Long,
pushTimeoutUnit: TimeUnit) extends MessageQueue {
private val queue = if (mailboxCapacity > 0) new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit)
else new LinkedTransferQueue[MessageInvocation]
final def append(invocation: MessageInvocation): Unit = {
if(!queue.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
if(!queue.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
}
}
final def take: MessageInvocation = queue.take
}