From 928fa634111d7c9367f7d59c565b6a5ed013a6c7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 7 Sep 2010 18:32:50 +0200 Subject: [PATCH] Adding support for safe mailboxes --- ...actReactorBasedEventDrivenDispatcher.scala | 14 +++-- .../src/main/scala/dispatch/Dispatchers.scala | 16 ++--- .../ExecutorBasedEventDrivenDispatcher.scala | 17 ++---- .../main/scala/dispatch/MessageHandling.scala | 59 +++++++++++++++++-- .../src/main/scala/dispatch/Queues.scala | 35 +++++------ .../dispatch/ThreadBasedDispatcher.scala | 39 +++--------- 6 files changed, 98 insertions(+), 82 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 24c566b48c..26fc38116f 100644 --- a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -15,7 +15,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten protected var selectorThread: Thread = _ protected val guard = new Object - def dispatch(invocation: MessageInvocation) = queue.append(invocation) + def dispatch(invocation: MessageInvocation) = queue enqueue invocation def shutdown = if (active) { log.debug("Shutting down %s", toString) @@ -34,14 +34,20 @@ class ReactiveMessageQueue(name: String) extends MessageQueue { private[akka] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation] @volatile private var interrupted = false - def append(handle: MessageInvocation) = queue.synchronized { - queue.offer(handle) + def enqueue(handle: MessageInvocation) = queue.synchronized { + queue offer handle queue.notifyAll } + def dequeue(): MessageInvocation = queue.synchronized { + val result = queue.poll + queue.notifyAll + result + } + def read(destination: List[MessageInvocation]) = queue.synchronized { while (queue.isEmpty && !interrupted) queue.wait - if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove) + if (!interrupted) while (!queue.isEmpty) destination add queue.remove else interrupted = false } diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index b4ef1f1f44..be592cde5c 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -46,10 +46,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID} object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) - val MAILBOX_BOUNDS = BoundedMailbox( - Dispatchers.MAILBOX_CAPACITY, - config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms"). - map(Duration(_,TimeUnit.MILLISECONDS)) + val MAILBOX_CONFIG = MailboxConfig( + capacity = Dispatchers.MAILBOX_CAPACITY, + pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)), + blockingDequeue = false ) lazy val defaultGlobalDispatcher = { @@ -58,7 +58,7 @@ object Dispatchers extends Logging { object globalHawtDispatcher extends HawtDispatcher - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_BOUNDS) { + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) { override def register(actor: ActorRef) = { if (isShutdown) init super.register(actor) @@ -99,7 +99,7 @@ object Dispatchers extends Logging { *

* E.g. each actor consumes its own thread. */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, BoundedMailbox(mailboxCapacity,Option(pushTimeOut))) + def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, MailboxConfig(mailboxCapacity,Option(pushTimeOut),true)) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -200,10 +200,10 @@ object Dispatchers extends Logging { }) } - lazy val mailboxBounds: BoundedMailbox = { + lazy val mailboxBounds: MailboxConfig = { val capacity = cfg.getInt("mailbox-capacity",Dispatchers.MAILBOX_CAPACITY) val timeout = cfg.getInt("mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)) - BoundedMailbox(capacity,timeout) + MailboxConfig(capacity,timeout,false) } val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 3ed81ff740..4962962f09 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,15 +65,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} class ExecutorBasedEventDrivenDispatcher( _name: String, throughput: Int = Dispatchers.THROUGHPUT, - mailboxBounds: BoundedMailbox = Dispatchers.MAILBOX_BOUNDS, + mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG, config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { - def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,BoundedMailbox(capacity,None)) + def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false)) def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage - mailboxCapacity = mailboxBounds.capacity + mailboxCapacity = mailboxConfig.capacity @volatile private var active: Boolean = false @@ -92,16 +92,7 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxCapacity <= 0) - new ConcurrentLinkedQueue[MessageInvocation] - else if (mailboxBounds.pushTimeOut.isDefined) { - val timeout = mailboxBounds.pushTimeOut.get - new BoundedTransferQueue[MessageInvocation](mailboxCapacity,timeout.length,timeout.unit) - } - else - new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) - } + override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity) def dispatch(receiver: ActorRef): Unit = if (active) { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index c14ff00c3c..ba8d9170f1 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -4,14 +4,14 @@ package se.scalablesolutions.akka.dispatch -import java.util.List - import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException} import org.multiverse.commitbarriers.CountDownCommitBarrier import se.scalablesolutions.akka.AkkaException -import java.util.concurrent.{ConcurrentSkipListSet} import se.scalablesolutions.akka.util.{Duration, HashCode, Logging} +import java.util.{Queue, List} +import java.util.concurrent._ +import concurrent.forkjoin.LinkedTransferQueue /** * @author Jonas Bonér @@ -63,16 +63,63 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m * @author Jonas Bonér */ trait MessageQueue { - def append(handle: MessageInvocation) + def enqueue(handle: MessageInvocation) + def dequeue(): MessageInvocation } /* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout * (If capacity > 0) */ -case class BoundedMailbox(capacity: Int, pushTimeOut: Option[Duration]) +case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingDequeue: Boolean) { + + /** + * Creates a MessageQueue (Mailbox) with the specified properties + * bounds = whether the mailbox should be bounded (< 0 means unbounded) + * pushTime = only used if bounded, indicates if and how long an enqueue should block + * blockDequeue = whether dequeues should block or not + * + * The bounds + pushTime generates a MessageQueueAppendFailedException if enqueue times out + */ + def newMailbox(bounds: Int = capacity, + pushTime: Option[Duration] = pushTimeOut, + blockDequeue: Boolean = blockingDequeue) : MessageQueue = { + if (bounds <= 0) { + new LinkedTransferQueue[MessageInvocation] with MessageQueue { + def enqueue(handle: MessageInvocation): Unit = this add handle + def dequeue(): MessageInvocation = { + if(blockDequeue) this.take() + else this.poll() + } + } + } + else if (pushTime.isDefined) { + val time = pushTime.get + new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue { + def enqueue(handle: MessageInvocation) { + if (!this.offer(handle,time.length,time.unit)) + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString) + } + + def dequeue(): MessageInvocation = { + if (blockDequeue) this.take() + else this.poll() + } + } + } + else { + new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue { + def enqueue(handle: MessageInvocation): Unit = this put handle + def dequeue(): MessageInvocation = { + if(blockDequeue) this.take() + else this.poll() + } + } + } + } +} /** - * @author Jonas Bonér + * @author Jonas Bonér */ trait MessageDispatcher extends Logging { protected val uuids = new ConcurrentSkipListSet[String] diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala index 303b6499d7..2ba88f25c3 100644 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ b/akka-actor/src/main/scala/dispatch/Queues.scala @@ -9,14 +9,8 @@ import java.util.concurrent.{TimeUnit, Semaphore} import java.util.Iterator import se.scalablesolutions.akka.util.Logger -class BoundedTransferQueue[E <: AnyRef]( - val capacity: Int, - val pushTimeout: Long, - val pushTimeUnit: TimeUnit) - extends LinkedTransferQueue[E] { +class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { require(capacity > 0) - require(pushTimeout > 0) - require(pushTimeUnit ne null) protected val guard = new Semaphore(capacity) @@ -50,7 +44,7 @@ class BoundedTransferQueue[E <: AnyRef]( } override def offer(e: E): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + if (guard.tryAcquire) { val result = try { super.offer(e) } catch { @@ -63,9 +57,9 @@ class BoundedTransferQueue[E <: AnyRef]( } override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + if (guard.tryAcquire(timeout,unit)) { val result = try { - super.offer(e,timeout,unit) + super.offer(e) } catch { case e => guard.release; throw e } @@ -76,7 +70,7 @@ class BoundedTransferQueue[E <: AnyRef]( } override def add(e: E): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + if (guard.tryAcquire) { val result = try { super.add(e) } catch { @@ -89,17 +83,16 @@ class BoundedTransferQueue[E <: AnyRef]( } override def put(e :E): Unit = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { - try { - super.put(e) - } catch { - case e => guard.release; throw e - } + guard.acquire + try { + super.put(e) + } catch { + case e => guard.release; throw e } } override def tryTransfer(e: E): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + if (guard.tryAcquire) { val result = try { super.tryTransfer(e) } catch { @@ -112,9 +105,9 @@ class BoundedTransferQueue[E <: AnyRef]( } override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + if (guard.tryAcquire(timeout,unit)) { val result = try { - super.tryTransfer(e,timeout,unit) + super.tryTransfer(e) } catch { case e => guard.release; throw e } @@ -125,7 +118,7 @@ class BoundedTransferQueue[E <: AnyRef]( } override def transfer(e: E): Unit = { - if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { + if (guard.tryAcquire) { try { super.transfer(e) } catch { diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 599bac87fd..eda5a86a9e 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -9,7 +9,7 @@ import java.util.Queue import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.config.Config.config import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} -import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue} +import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -17,9 +17,9 @@ import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue} * @author Jonas Bonér */ class ThreadBasedDispatcher(private val actor: ActorRef, - val mailboxBounds: BoundedMailbox + val mailboxConfig: MailboxConfig ) extends MessageDispatcher { - def this(actor: ActorRef, capacity: Int) = this(actor,BoundedMailbox(capacity,None)) + def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true)) def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java private val name = actor.getClass.getName + ":" + actor.uuid @@ -27,17 +27,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef, private var selectorThread: Thread = _ @volatile private var active: Boolean = false - override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxBounds.capacity <= 0) - new LinkedTransferQueue[MessageInvocation] with ThreadMessageBlockingQueue - else if (mailboxBounds.pushTimeOut.isDefined) { - val timeout = mailboxBounds.pushTimeOut.get - new BoundedTransferQueue[MessageInvocation](mailboxBounds.capacity, timeout.length, timeout.unit) with ThreadMessageBlockingQueue - } - else - new LinkedBlockingQueue[MessageInvocation](mailboxBounds.capacity) with ThreadMessageBlockingQueue - } - + override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true) + override def register(actorRef: ActorRef) = { if(actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) @@ -45,11 +36,11 @@ class ThreadBasedDispatcher(private val actor: ActorRef, super.register(actorRef) } - def mailbox = actor.mailbox.asInstanceOf[ThreadMessageBlockingQueue] + def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue] def mailboxSize(a: ActorRef) = mailbox.size - def dispatch(invocation: MessageInvocation) = mailbox append invocation + def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation def start = if (!active) { log.debug("Starting up %s", toString) @@ -58,7 +49,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, override def run = { while (active) { try { - actor.invoke(mailbox.next) + actor.invoke(mailbox.dequeue) } catch { case e: InterruptedException => active = false } } } @@ -76,16 +67,4 @@ class ThreadBasedDispatcher(private val actor: ActorRef, } override def toString = "ThreadBasedDispatcher[" + threadName + "]" -} - -trait ThreadMessageBlockingQueue extends MessageQueue with BlockingQueue[MessageInvocation] { - final def next: MessageInvocation = take - def append(invocation: MessageInvocation): Unit = put(invocation) -} - -trait ThreadMessageTransferQueue extends ThreadMessageBlockingQueue with TransferQueue[MessageInvocation] { - final override def append(invocation: MessageInvocation): Unit = { - if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting - throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") - } -} +} \ No newline at end of file