Adding support for ForkJoinPoolConfig so you can use ForkJoin

This commit is contained in:
Viktor Klang 2011-12-01 17:03:30 +01:00
parent e3e694d1dc
commit ef27f865d4
3 changed files with 88 additions and 6 deletions

View file

@ -492,3 +492,39 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FJDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher =
(new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput,
system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType,
new ForkJoinPoolConfig(), system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor).asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "FJDispatcher"
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")
aStop.countDown()
a.stop
b.stop
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
}
}

View file

@ -6,12 +6,9 @@ package akka.dispatch
import akka.event.Logging.Warning
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue }
import akka.actor.{ ActorCell, ActorKilledException }
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
import akka.actor.ActorCell
import akka.util.Duration
import java.util.concurrent._
/**
* Default settings are:

View file

@ -11,6 +11,9 @@ import akka.event.Logging.{ Warning, Error }
import akka.actor.ActorSystem
import java.util.concurrent._
import akka.event.EventStream
import concurrent.forkjoin.ForkJoinPool._
import concurrent.forkjoin.{ ForkJoinTask, ForkJoinWorkerThread, ForkJoinPool }
import concurrent.forkjoin.ForkJoinTask._
object ThreadPoolConfig {
type Bounds = Int
@ -184,6 +187,52 @@ class MonitorableThread(runnable: Runnable, name: String)
}
}
case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider {
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
new ForkJoinPool(targetParallelism) with ExecutorService {
setAsyncMode(true)
setMaintainsParallelism(true)
override def execute(r: Runnable) {
r match {
case fjmbox: FJMailbox
//fjmbox.fjTask.reinitialize()
Thread.currentThread match {
case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this
fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected
case _ super.execute[Unit](fjmbox.fjTask)
}
case _
super.execute(r)
}
}
import java.util.{ Collection JCollection }
def invokeAny[T](callables: JCollection[_ <: Callable[T]]) =
throw new UnsupportedOperationException("invokeAny. NOT!")
def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
throw new UnsupportedOperationException("invokeAny. NOT!")
def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
throw new UnsupportedOperationException("invokeAny. NOT!")
}
}
}
}
trait FJMailbox { self: Mailbox
val fjTask = new ForkJoinTask[Unit] with Runnable {
var result: Unit = ()
def getRawResult() = result
def setRawResult(v: Unit) { result = v }
def exec() = { self.run(); true }
def run() { invoke() }
}
}
/**
* As the name says
*/