diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 61d0da3555..8986d2ed7d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -259,18 +259,16 @@ abstract class ActorModelSpec extends JUnitSuite { val counter = new CountDownLatch(200) a.start() - def start = spawn { for (i ← 1 to 20) { a ! WaitAck(1, counter) } } - for (i ← 1 to 10) { start } + for (i ← 1 to 10) { spawn { for (i ← 1 to 20) { a ! WaitAck(1, counter) } } } assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) a.stop() } - def spawn(f: ⇒ Unit) = { - val thread = new Thread { override def run { f } } + def spawn(f: ⇒ Unit) { + val thread = new Thread { override def run { try { f } catch { case e ⇒ e.printStackTrace } } } thread.start() - thread } @Test @@ -369,3 +367,8 @@ class BalancingDispatcherModelTest extends ActorModelSpec { def newInterceptedDispatcher = new BalancingDispatcher("foo") with MessageDispatcherInterceptor } + +class FJDispatcherModelTest extends ActorModelSpec { + def newInterceptedDispatcher = + new FJDispatcher("foo") with MessageDispatcherInterceptor +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index d00b579610..4516597acf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -67,7 +67,7 @@ class Dispatcher( val throughput: Int = Dispatchers.THROUGHPUT, val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - val config: ThreadPoolConfig = ThreadPoolConfig()) + executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends MessageDispatcher { def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = @@ -79,16 +79,16 @@ class Dispatcher( def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - def this(_name: String, _config: ThreadPoolConfig) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + def this(_name: String, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _executorServiceFactoryProvider) def this(_name: String) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage val name = "akka:event-driven:dispatcher:" + _name - private[akka] val threadFactory = new MonitorableThreadFactory(name) - private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) + private[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) + private[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) private[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) @@ -134,7 +134,7 @@ class Dispatcher( private[akka] def start {} private[akka] def shutdown { - val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) + val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) if (old ne null) { old.shutdownNow() } @@ -160,6 +160,8 @@ class Dispatcher( private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = registerForExecution(mbox) + private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = () + protected override def cleanUpMailboxFor(actorRef: ActorRef) { val m = getMailbox(actorRef) if (!m.isEmpty) { @@ -201,8 +203,11 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ finally { dispatcherLock.unlock() } + if (!self.isEmpty) dispatcher.reRegisterForExecution(this) + + dispatcher.doneProcessingMailbox(this) } /** @@ -271,7 +276,7 @@ class PriorityDispatcher( throughput: Int = Dispatchers.THROUGHPUT, throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { + executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) with PriorityMailbox { def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage @@ -282,8 +287,8 @@ class PriorityDispatcher( def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) + def this(name: String, comparator: java.util.Comparator[MessageInvocation], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, executorServiceFactoryProvider) def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage diff --git a/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala new file mode 100644 index 0000000000..a2eb391b07 --- /dev/null +++ b/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala @@ -0,0 +1,108 @@ +package akka.dispatch + +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +import akka.actor.ActorRef +import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinPool, ForkJoinTask } +import java.util.concurrent._ +import java.lang.UnsupportedOperationException + +/** + * A Dispatcher that uses the ForkJoin library in scala.concurrent.forkjoin + */ +class FJDispatcher( + name: String, + throughput: Int = Dispatchers.THROUGHPUT, + throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + forkJoinPoolConfig: ForkJoinPoolConfig = ForkJoinPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, forkJoinPoolConfig) { + + def this(name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(name, throughput, throughputDeadlineTime, mailboxType, ForkJoinPoolConfig()) // Needed for Java API usage + + def this(name: String, throughput: Int, mailboxType: MailboxType) = + this(name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = + this(name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], forkJoinPoolConfig: ForkJoinPoolConfig) = + this(name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, forkJoinPoolConfig) + + def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = + this(name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + override def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { + case b: UnboundedMailbox ⇒ + new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox with FJMailbox { + @inline + final def dispatcher = FJDispatcher.this + @inline + final def enqueue(m: MessageInvocation) = this.add(m) + @inline + final def dequeue(): MessageInvocation = this.poll() + } + case b: BoundedMailbox ⇒ + new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox with FJMailbox { + @inline + final def dispatcher = FJDispatcher.this + } + } + + override private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = { + super.doneProcessingMailbox(mbox) + if (FJDispatcher.isCurrentThreadFJThread) + ForkJoinTask.helpQuiesce() + } +} + +object FJDispatcher { + def isCurrentThreadFJThread = Thread.currentThread.isInstanceOf[ForkJoinWorkerThread] +} + +case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { + final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { + def createExecutorService: ExecutorService = { + new ForkJoinPool(targetParallelism) with ExecutorService { + setAsyncMode(true) + setMaintainsParallelism(true) + + override def execute(r: Runnable) { + r match { + case fjmbox: FJMailbox ⇒ + fjmbox.fjTask.reinitialize() + if (FJDispatcher.isCurrentThreadFJThread) fjmbox.fjTask.fork() + else super.execute[Unit](fjmbox.fjTask) + case _ ⇒ super.execute(r) + } + } + + import java.util.{ Collection ⇒ JCollection } + + def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = + throw new UnsupportedOperationException("invokeAny. NOT!") + + def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + throw new UnsupportedOperationException("invokeAny. NOT!") + + def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + throw new UnsupportedOperationException("invokeAny. NOT!") + } + } + } +} + +trait FJMailbox { self: ExecutableMailbox ⇒ + val fjTask = new ForkJoinTask[Unit] with Runnable { + var result: Unit = () + def getRawResult() = result + def setRawResult(v: Unit) { result = v } + def exec() = { + self.run() + true + } + def run() { invoke() } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index b1c0f6e747..d6d33255a5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration import akka.event.EventHandler +import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinTask, ForkJoinPool } object ThreadPoolConfig { type Bounds = Int @@ -51,18 +52,24 @@ object ThreadPoolConfig { } } +trait ExecutorServiceFactory { + def createExecutorService: ExecutorService +} + +trait ExecutorServiceFactoryProvider { + def createExecutorServiceFactory(name: String): ExecutorServiceFactory +} + case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler, - queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) { - - final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService = - new LazyExecutorServiceWrapper(createExecutorService(threadFactory)) - - final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = { - flowHandler match { + queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) + extends ExecutorServiceFactoryProvider { + final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { + val threadFactory = new MonitorableThreadFactory(name) + def createExecutorService: ExecutorService = flowHandler match { case Left(rejectHandler) ⇒ val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler) service.allowCoreThreadTimeOut(allowCorePoolTimeout)