diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index f9ffc219cf..803fd700cc 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.Config.config import net.lag.configgy.ConfigMap import se.scalablesolutions.akka.util.UUID import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} +import java.util.concurrent.TimeUnit /** * Scala API. Dispatcher factory. @@ -44,8 +45,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D * @author Jonas Bonér */ object Dispatchers extends Logging { - val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) + val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) lazy val defaultGlobalDispatcher = { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) @@ -75,6 +76,7 @@ object Dispatchers extends Logging { /** * Creates an thread based dispatcher serving a single actor through the same single thread. + * Uses the default timeout *

* E.g. each actor consumes its own thread. */ @@ -82,11 +84,19 @@ object Dispatchers extends Logging { /** * Creates an thread based dispatcher serving a single actor through the same single thread. + * Uses the default timeout *

* E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + *

+ * E.g. each actor consumes its own thread. + */ + def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeout: Long, pushTimeUnit: TimeUnit) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeout, pushTimeUnit) + /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 20c58c9975..395c572f0e 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -10,6 +10,7 @@ import se.scalablesolutions.akka.util.{HashCode, Logging} import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException} import org.multiverse.commitbarriers.CountDownCommitBarrier +import se.scalablesolutions.akka.AkkaException import java.util.concurrent.{ConcurrentSkipListSet} /** @@ -56,6 +57,8 @@ final class MessageInvocation(val receiver: ActorRef, } } +class MessageQueueAppendFailedException(message: String) extends AkkaException(message) + /** * @author Jonas Bonér */ diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala new file mode 100644 index 0000000000..5b5aa6683e --- /dev/null +++ b/akka-actor/src/main/scala/dispatch/Queues.scala @@ -0,0 +1,148 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.dispatch + +import concurrent.forkjoin.LinkedTransferQueue +import java.util.concurrent.{TimeUnit, Semaphore} +import java.util.Iterator +import se.scalablesolutions.akka.util.Logger + +class BoundedTransferQueue[E <: AnyRef]( + val capacity: Int, + val pushTimeout: Long, + val pushTimeUnit: TimeUnit) + extends LinkedTransferQueue[E] { + require(capacity > 0) + require(pushTimeout > 0) + require(pushTimeUnit ne null) + + protected val guard = new Semaphore(capacity) + + override def take(): E = { + val e = super.take + if (e ne null) guard.release + e + } + + override def poll(): E = { + val e = super.poll + if (e ne null) guard.release + e + } + + override def poll(timeout: Long, unit: TimeUnit): E = { + val e = super.poll(timeout,unit) + if (e ne null) guard.release + e + } + + override def remainingCapacity = guard.availablePermits + + override def remove(o: AnyRef): Boolean = { + if (super.remove(o)) { + guard.release + true + } else { + false + } + } + + override def offer(e: E): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.offer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } + + override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.offer(e,timeout,unit) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } + + override def add(e: E): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.add(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } + + override def put(e :E): Unit = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + try { + super.put(e) + } catch { + case e => guard.release; throw e + } + } + } + + override def tryTransfer(e: E): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.tryTransfer(e) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } + + override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + val result = try { + super.tryTransfer(e,timeout,unit) + } catch { + case e => guard.release; throw e + } + if (!result) guard.release + result + } else + false + } + + override def transfer(e: E): Unit = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + try { + super.transfer(e) + } catch { + case e => guard.release; throw e + } + } + } + + override def iterator: Iterator[E] = { + val it = super.iterator + new Iterator[E] { + def hasNext = it.hasNext + def next = it.next + def remove { + it.remove + guard.release //Assume remove worked if no exception was thrown + } + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index d33d0fb337..589500f413 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -4,27 +4,47 @@ package se.scalablesolutions.akka.dispatch -import java.util.concurrent.LinkedBlockingQueue import java.util.Queue import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.config.Config.config +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} +import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher { +class ThreadBasedDispatcher(private val actor: ActorRef, + val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY, + val pushTimeout: Long = 10000, + val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS + ) extends MessageDispatcher { def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java private val name = actor.getClass.getName + ":" + actor.uuid private val threadName = "akka:thread-based:dispatcher:" + name - private val queue = new BlockingMessageQueue(name, mailboxCapacity) private var selectorThread: Thread = _ @volatile private var active: Boolean = false - def dispatch(invocation: MessageInvocation) = queue.append(invocation) + if (actor.mailbox eq null) { + actor.mailbox = if (mailboxCapacity > 0) + new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue + else + new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue + } + + override def register(actorRef: ActorRef) = { + if(actorRef != actor) + throw new IllegalArgumentException("Cannot register to anyone but " + actor) + + super.register(actorRef) + } + + def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue] + + def dispatch(invocation: MessageInvocation) = mailbox append invocation def start = if (!active) { log.debug("Starting up %s", toString) @@ -33,7 +53,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In override def run = { while (active) { try { - actor.invoke(queue.take) + actor.invoke(mailbox.next) } catch { case e: InterruptedException => active = false } } } @@ -53,12 +73,14 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In override def toString = "ThreadBasedDispatcher[" + threadName + "]" } -// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher -class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue { - private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) - else new LinkedBlockingQueue[MessageInvocation] - def append(invocation: MessageInvocation) = queue.put(invocation) - def take: MessageInvocation = queue.take - def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException - def interrupt = throw new UnsupportedOperationException +trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] => + + final def append(invocation: MessageInvocation): Unit = { + if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer + if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting + throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") + } + } + + final def next: MessageInvocation = self.take } diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala deleted file mode 100644 index 7f16f7aa57..0000000000 --- a/akka-core/src/main/scala/dispatch/Queues.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import concurrent.forkjoin.LinkedTransferQueue -import java.util.concurrent.{TimeUnit, Semaphore} -import java.util.Iterator -import se.scalablesolutions.akka.util.Logger - -class BoundedTransferQueue[E <: AnyRef]( - val capacity: Int, - val pushTimeout: Long, - val pushTimeUnit: TimeUnit) - extends LinkedTransferQueue[E] { - require(capacity > 0) - require(pushTimeout > 0) - require(pushTimeUnit ne null) - - protected val guard = new Semaphore(capacity) - - //Enqueue an item within the push timeout (acquire Semaphore) - protected def enq(f: => Boolean): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { - val result = try { - f - } catch { - case e => - guard.release //If something broke, release - throw e - } - if (!result) guard.release //Didn't add anything - result - } else - false - } - - //Dequeue an item (release Semaphore) - protected def deq(e: E): E = { - if (e ne null) guard.release //Signal removal of item - e - } - - override def take(): E = deq(super.take) - override def poll(): E = deq(super.poll) - override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit)) - - override def remainingCapacity = guard.availablePermits - - override def remove(o: AnyRef): Boolean = { - if (super.remove(o)) { - guard.release - true - } else { - false - } - } - - override def offer(e: E): Boolean = - enq(super.offer(e)) - - override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = - enq(super.offer(e,timeout,unit)) - - override def add(e: E): Boolean = - enq(super.add(e)) - - override def put(e :E): Unit = - enq({ super.put(e); true }) - - override def tryTransfer(e: E): Boolean = - enq(super.tryTransfer(e)) - - override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = - enq(super.tryTransfer(e,timeout,unit)) - - override def transfer(e: E): Unit = - enq({ super.transfer(e); true }) - - override def iterator: Iterator[E] = { - val it = super.iterator - new Iterator[E] { - def hasNext = it.hasNext - def next = it.next - def remove { - it.remove - guard.release //Assume remove worked if no exception was thrown - } - } - } -} \ No newline at end of file