diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index f33d3b1b24..b4ef1f1f44 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -5,12 +5,11 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config.config import net.lag.configgy.ConfigMap -import se.scalablesolutions.akka.util.UUID import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.util.{Duration, Logging, UUID} /** * Scala API. Dispatcher factory. @@ -47,6 +46,11 @@ import java.util.concurrent.TimeUnit object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) + val MAILBOX_BOUNDS = BoundedMailbox( + Dispatchers.MAILBOX_CAPACITY, + config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms"). + map(Duration(_,TimeUnit.MILLISECONDS)) + ) lazy val defaultGlobalDispatcher = { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) @@ -54,7 +58,7 @@ object Dispatchers extends Logging { object globalHawtDispatcher extends HawtDispatcher - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_BOUNDS) { override def register(actor: ActorRef) = { if (isShutdown) init super.register(actor) @@ -95,7 +99,7 @@ object Dispatchers extends Logging { *
* E.g. each actor consumes its own thread. */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeout: Long, pushTimeUnit: TimeUnit) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeout, pushTimeUnit) + def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, BoundedMailbox(mailboxCapacity,Option(pushTimeOut))) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -196,10 +200,16 @@ object Dispatchers extends Logging { }) } + lazy val mailboxBounds: BoundedMailbox = { + val capacity = cfg.getInt("mailbox-capacity",Dispatchers.MAILBOX_CAPACITY) + val timeout = cfg.getInt("mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)) + BoundedMailbox(capacity,timeout) + } + val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name) case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig) - case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig) + case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),mailboxBounds,threadPoolConfig) case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig) case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index c3ecf5ded7..3ed81ff740 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,14 +65,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} class ExecutorBasedEventDrivenDispatcher( _name: String, throughput: Int = Dispatchers.THROUGHPUT, - capacity: Int = Dispatchers.MAILBOX_CAPACITY, + mailboxBounds: BoundedMailbox = Dispatchers.MAILBOX_BOUNDS, config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { + def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,BoundedMailbox(capacity,None)) def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage - mailboxCapacity = capacity + mailboxCapacity = mailboxBounds.capacity @volatile private var active: Boolean = false @@ -92,8 +93,14 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation] - else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) + if (mailboxCapacity <= 0) + new ConcurrentLinkedQueue[MessageInvocation] + else if (mailboxBounds.pushTimeOut.isDefined) { + val timeout = mailboxBounds.pushTimeOut.get + new BoundedTransferQueue[MessageInvocation](mailboxCapacity,timeout.length,timeout.unit) + } + else + new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) } def dispatch(receiver: ActorRef): Unit = if (active) { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 640ded8039..41691b4326 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -6,12 +6,12 @@ package se.scalablesolutions.akka.dispatch import java.util.List -import se.scalablesolutions.akka.util.{HashCode, Logging} import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException} import org.multiverse.commitbarriers.CountDownCommitBarrier import se.scalablesolutions.akka.AkkaException import java.util.concurrent.{ConcurrentSkipListSet} +import se.scalablesolutions.akka.util.{Duration, HashCode, Logging} /** * @author Jonas Bonér @@ -66,6 +66,10 @@ trait MessageQueue { def append(handle: MessageInvocation) } +/* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout + */ +case class BoundedMailbox(capacity: Int, pushTimeOut: Option[Duration]) + /** * @author Jonas Bonér */ diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala index 5b5aa6683e..303b6499d7 100644 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ b/akka-actor/src/main/scala/dispatch/Queues.scala @@ -17,7 +17,7 @@ class BoundedTransferQueue[E <: AnyRef]( require(capacity > 0) require(pushTimeout > 0) require(pushTimeUnit ne null) - + protected val guard = new Semaphore(capacity) override def take(): E = { diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 65ee9ed845..5c06f058f9 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -8,8 +8,8 @@ import java.util.Queue import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.config.Config.config -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} +import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -17,10 +17,9 @@ import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} * @author Jonas Bonér */ class ThreadBasedDispatcher(private val actor: ActorRef, - val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY, - val pushTimeout: Long = 10000, - val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS + val mailboxBounds: BoundedMailbox ) extends MessageDispatcher { + def this(actor: ActorRef, capacity: Int) = this(actor,BoundedMailbox(capacity,None)) def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java private val name = actor.getClass.getName + ":" + actor.uuid @@ -29,10 +28,14 @@ class ThreadBasedDispatcher(private val actor: ActorRef, @volatile private var active: Boolean = false override def createMailbox(actorRef: ActorRef): AnyRef = { - if (mailboxCapacity > 0) - new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue + if (mailboxBounds.capacity <= 0) + new LinkedTransferQueue[MessageInvocation] with ThreadMessageBlockingQueue + else if (mailboxBounds.pushTimeOut.isDefined) { + val timeout = mailboxBounds.pushTimeOut.get + new BoundedTransferQueue[MessageInvocation](mailboxBounds.capacity, timeout.length, timeout.unit) with ThreadMessageBlockingQueue + } else - new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue + new LinkedBlockingQueue[MessageInvocation](mailboxBounds.capacity) with ThreadMessageBlockingQueue } override def register(actorRef: ActorRef) = { @@ -42,7 +45,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, super.register(actorRef) } - def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue] + def mailbox = actor.mailbox.asInstanceOf[ThreadMessageBlockingQueue] def mailboxSize(a: ActorRef) = mailbox.size @@ -75,13 +78,16 @@ class ThreadBasedDispatcher(private val actor: ActorRef, override def toString = "ThreadBasedDispatcher[" + threadName + "]" } -trait ThreadMessageQueue extends MessageQueue with TransferQueue[MessageInvocation] { - final def append(invocation: MessageInvocation): Unit = { +trait ThreadMessageBlockingQueue extends MessageQueue with BlockingQueue[MessageInvocation] { + final def next: MessageInvocation = take + def append(invocation: MessageInvocation): Unit = put(invocation) +} + +trait ThreadMessageTransferQueue extends ThreadMessageBlockingQueue with TransferQueue[MessageInvocation] { + final override def append(invocation: MessageInvocation): Unit = { if(!tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") } } - - final def next: MessageInvocation = take } diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala index 0165808ce9..ff10cabb31 100644 --- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala @@ -50,11 +50,11 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { assert(pojo != null) } - scenario("get a executor-event-driven-dispatcher with bounded-linked-blocking-queue with unbounded capacity from context") { + scenario("get a executor-event-driven-dispatcher with blocking-queue with unbounded capacity from context") { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val dispatcher = context.getBean("executor-event-driven-dispatcher-2").asInstanceOf[ExecutorBasedEventDrivenDispatcher] val executor = getThreadPoolExecutorAndAssert(dispatcher) - assert(executor.getQueue().isInstanceOf[LinkedBlockingQueue[Runnable]]) + assert(executor.getQueue().isInstanceOf[BlockingQueue[Runnable]]) assert(executor.getQueue().remainingCapacity() === Integer.MAX_VALUE) assert(dispatcher.name === EVENT_DRIVEN_PREFIX + "dispatcher-2") } @@ -66,8 +66,8 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { val actorRef = UntypedActor.actorOf(classOf[PingActor]) actorRef.dispatcher = dispatcher actorRef.start - assert(actorRef.mailbox.isInstanceOf[LinkedBlockingQueue[MessageInvocation]]) - assert((actorRef.mailbox.asInstanceOf[LinkedBlockingQueue[MessageInvocation]]).remainingCapacity === 1000) + assert(actorRef.mailbox.isInstanceOf[BlockingQueue[MessageInvocation]]) + assert((actorRef.mailbox.asInstanceOf[BlockingQueue[MessageInvocation]]).remainingCapacity === 1000) } scenario("get a executor-event-driven-dispatcher with unbounded-linked-blocking-queue with bounded capacity from context") { @@ -75,7 +75,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { val dispatcher = context.getBean("executor-event-driven-dispatcher-4").asInstanceOf[ExecutorBasedEventDrivenDispatcher] assert(dispatcher.name === EVENT_DRIVEN_PREFIX + "dispatcher-4") val executor = getThreadPoolExecutorAndAssert(dispatcher) - assert(executor.getQueue().isInstanceOf[LinkedBlockingQueue[Runnable]]) + assert(executor.getQueue().isInstanceOf[BlockingQueue[Runnable]]) assert(executor.getQueue().remainingCapacity() === 55) } @@ -84,7 +84,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { val dispatcher = context.getBean("executor-event-driven-dispatcher-5").asInstanceOf[ExecutorBasedEventDrivenDispatcher] assert(dispatcher.name === EVENT_DRIVEN_PREFIX + "dispatcher-5") val executor = getThreadPoolExecutorAndAssert(dispatcher) - assert(executor.getQueue().isInstanceOf[LinkedBlockingQueue[Runnable]]) + assert(executor.getQueue().isInstanceOf[BlockingQueue[Runnable]]) assert(executor.getQueue().remainingCapacity() === Integer.MAX_VALUE) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 45a9864f5b..a66e12be0a 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -49,6 +49,10 @@ akka { # If positive then a bounded mailbox is used and the capacity is set using the property # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, # could lead to deadlock, use with care + # + # The following are only used for ExecutorBasedEventDriven + # and only if mailbox-capacity > 0 + mailbox-push-timeout-ms = 10000 # Specifies the timeout (in milliseconds) to add a new message to a mailbox that is full } }