diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index e21f965c51..6b16042b69 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -492,3 +492,39 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { } } } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FJDispatcherModelSpec extends ActorModelSpec { + import ActorModelSpec._ + + def newInterceptedDispatcher = + (new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput, + system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, + new ForkJoinPoolConfig(), system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor).asInstanceOf[MessageDispatcherInterceptor] + + def dispatcherType = "FJDispatcher" + + "A " + dispatcherType must { + "process messages in parallel" in { + implicit val dispatcher = newInterceptedDispatcher + val aStart, aStop, bParallel = new CountDownLatch(1) + val a, b = newTestActor(dispatcher) + + a ! Meet(aStart, aStop) + assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") + + b ! CountDown(bParallel) + assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel") + + aStop.countDown() + + a.stop + b.stop + + while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination + + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index ee28fd586e..1a40ee23cd 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -6,12 +6,9 @@ package akka.dispatch import akka.event.Logging.Warning import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } -import akka.actor.{ ActorCell, ActorKilledException } -import akka.actor.ActorSystem -import akka.event.EventStream -import akka.actor.Scheduler +import akka.actor.ActorCell import akka.util.Duration +import java.util.concurrent._ /** * Default settings are: @@ -156,4 +153,4 @@ abstract class PriorityGenerator extends java.util.Comparator[Envelope] { final def compare(thisMessage: Envelope, thatMessage: Envelope): Int = gen(thisMessage.message) - gen(thatMessage.message) -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index f543e5c016..f92be79b2d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -11,6 +11,9 @@ import akka.event.Logging.{ Warning, Error } import akka.actor.ActorSystem import java.util.concurrent._ import akka.event.EventStream +import concurrent.forkjoin.ForkJoinPool._ +import concurrent.forkjoin.{ ForkJoinTask, ForkJoinWorkerThread, ForkJoinPool } +import concurrent.forkjoin.ForkJoinTask._ object ThreadPoolConfig { type Bounds = Int @@ -184,6 +187,52 @@ class MonitorableThread(runnable: Runnable, name: String) } } +case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { + final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { + def createExecutorService: ExecutorService = { + new ForkJoinPool(targetParallelism) with ExecutorService { + setAsyncMode(true) + setMaintainsParallelism(true) + + override def execute(r: Runnable) { + r match { + case fjmbox: FJMailbox ⇒ + //fjmbox.fjTask.reinitialize() + Thread.currentThread match { + case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ + fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected + case _ ⇒ super.execute[Unit](fjmbox.fjTask) + } + case _ ⇒ + super.execute(r) + } + } + + import java.util.{ Collection ⇒ JCollection } + + def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = + throw new UnsupportedOperationException("invokeAny. NOT!") + + def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + throw new UnsupportedOperationException("invokeAny. NOT!") + + def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + throw new UnsupportedOperationException("invokeAny. NOT!") + } + } + } +} + +trait FJMailbox { self: Mailbox ⇒ + val fjTask = new ForkJoinTask[Unit] with Runnable { + var result: Unit = () + def getRawResult() = result + def setRawResult(v: Unit) { result = v } + def exec() = { self.run(); true } + def run() { invoke() } + } +} + /** * As the name says */