diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 2ebba03928..7e7904ec29 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -44,9 +44,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID} * @author Jonas Bonér */ object Dispatchers extends Logging { - val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) - val MAILBOX_CONFIG = MailboxConfig( + val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val THROUGHPUT_DEADLINE_MS = config.getInt("akka.actor.throughput-deadline-ms",-1) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) + val MAILBOX_CONFIG = MailboxConfig( capacity = Dispatchers.MAILBOX_CAPACITY, pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)), blockingDequeue = false @@ -58,7 +59,7 @@ object Dispatchers extends Logging { object globalHawtDispatcher extends HawtDispatcher - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) { + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,THROUGHPUT_DEADLINE_MS,MAILBOX_CONFIG) { override def register(actor: ActorRef) = { if (isShutdown) init super.register(actor) @@ -116,14 +117,14 @@ object Dispatchers extends Logging { *
* Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity) + def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxCapacity) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false)) + def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false)) /** @@ -198,13 +199,28 @@ object Dispatchers extends Logging { } val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { - case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig) - case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),mailboxBounds,threadPoolConfig) - case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) - case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher - case "GlobalHawt" => globalHawtDispatcher + case "ExecutorBasedEventDrivenWorkStealing" => + new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig) + + case "ExecutorBasedEventDriven" => + new ExecutorBasedEventDrivenDispatcher( + name, + cfg.getInt("throughput",THROUGHPUT), + cfg.getInt("throughput-deadline-ms",THROUGHPUT_DEADLINE_MS), + mailboxBounds, + threadPoolConfig) - case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) + case "Hawt" => + new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) + + case "GlobalExecutorBasedEventDriven" => + globalExecutorBasedEventDrivenDispatcher + + case "GlobalHawt" => + globalHawtDispatcher + + case unknown => + throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) } dispatcher diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 6cabdec5e5..63ce310848 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,12 +65,13 @@ import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, class ExecutorBasedEventDrivenDispatcher( _name: String, val throughput: Int = Dispatchers.THROUGHPUT, + val throughputDeadlineMs: Int = Dispatchers.THROUGHPUT_DEADLINE_MS, mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG, config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { - def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false)) - def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage - def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage + def this(_name: String, throughput: Int, throughputDeadlineMs: Int, capacity: Int) = this(_name,throughput,throughputDeadlineMs,MailboxConfig(capacity,None,false)) + def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_MS, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage + def this(_name: String) = this(_name,Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_MS,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage //FIXME remove this from ThreadPoolBuilder mailboxCapacity = mailboxConfig.capacity @@ -102,24 +103,28 @@ class ExecutorBasedEventDrivenDispatcher( * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox(): Boolean = { - val throttle = throughput > 0 - var processedMessages = 0 - var nextMessage = self.dequeue - if (nextMessage ne null) { - do { - nextMessage.invoke + var nextMessage = self.dequeue + if (nextMessage ne null) { + val throttle = throughput > 0 + var processedMessages = 0 + val isDeadlineEnabled = throttle && throughputDeadlineMs > 0 + val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - if(throttle) { //Will be elided when false - processedMessages += 1 - if (processedMessages >= throughput) //If we're throttled, break out - return !self.isEmpty - } - nextMessage = self.dequeue - } - while (nextMessage ne null) - } + do { + nextMessage.invoke - false + if(throttle) { //Will be elided when false + processedMessages += 1 + if ((processedMessages >= throughput) + || (isDeadlineEnabled && (System.currentTimeMillis - started) >= throughputDeadlineMs)) //If we're throttled, break out + return !self.isEmpty + } + nextMessage = self.dequeue + } + while (nextMessage ne null) + } + + false } } diff --git a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala index 9cdf43682e..56d98071d7 100644 --- a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala @@ -3,9 +3,10 @@ package se.scalablesolutions.akka.actor.dispatch import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.Test -import se.scalablesolutions.akka.dispatch.Dispatchers +import se.scalablesolutions.akka.dispatch.{Dispatchers,ExecutorBasedEventDrivenDispatcher} import se.scalablesolutions.akka.actor.Actor import Actor._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} object ExecutorBasedEventDrivenDispatcherActorSpec { class TestActor extends Actor { @@ -65,4 +66,70 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { } actor.stop } + + @Test def shouldRespectThroughput { + val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_CONFIG, (e) => { + e.setCorePoolSize(1) + }) + + val works = new AtomicBoolean(true) + val latch = new CountDownLatch(100) + val start = new CountDownLatch(1) + val fastOne = actorOf( + new Actor { + self.dispatcher = throughputDispatcher + def receive = { case "sabotage" => works.set(false) } + }).start + + val slowOne = actorOf( + new Actor { + self.dispatcher = throughputDispatcher + def receive = { + case "hogexecutor" => start.await + case "ping" => if (works.get) latch.countDown + } + }).start + + slowOne ! "hogexecutor" + (1 to 100) foreach { _ => slowOne ! "ping"} + fastOne ! "sabotage" + start.countDown + val result = latch.await(3,TimeUnit.SECONDS) + fastOne.stop + slowOne.stop + throughputDispatcher.shutdown + assert(result === true) + } + + @Test def shouldRespectThroughputDeadline { + val deadlineMs = 100 + val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_CONFIG, (e) => { + e.setCorePoolSize(1) + }) + + val works = new AtomicBoolean(true) + val latch = new CountDownLatch(1) + val start = new CountDownLatch(1) + val fastOne = actorOf( + new Actor { + self.dispatcher = throughputDispatcher + def receive = { case "ping" => if(works.get) latch.countDown; self.stop } + }).start + + val slowOne = actorOf( + new Actor { + self.dispatcher = throughputDispatcher + def receive = { + case "hogexecutor" => start.await + case "ping" => works.set(false); self.stop + } + }).start + + slowOne ! "hogexecutor" + slowOne ! "ping" + fastOne ! "ping" + Thread.sleep(deadlineMs) + start.countDown + assert(latch.await(10,TimeUnit.SECONDS) === true) + } }