From e4baf1d82ea52bc61534f88be87acf005abb40e6 Mon Sep 17 00:00:00 2001 From: hepin Date: Fri, 8 May 2015 14:07:06 +0800 Subject: [PATCH] +act #17274 make ForkJoinPool asyncMode configurable (cherry picked from commit 05f156bdc0eae121aa122028a582084bc4bb22dc) --- .../test/scala/akka/config/ConfigSpec.scala | 14 +++--- akka-actor/src/main/resources/reference.conf | 4 ++ .../akka/dispatch/AbstractDispatcher.scala | 46 ++++++++++++------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 021983154b..7d19f4f893 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -4,14 +4,15 @@ package akka.config -import language.postfixOps -import akka.testkit.AkkaSpec -import com.typesafe.config.ConfigFactory -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit + import akka.actor.ActorSystem import akka.event.Logging.DefaultLogger -import java.util.concurrent.TimeUnit +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ +import scala.language.postfixOps import akka.event.DefaultLoggingFilter @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -95,6 +96,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin pool.getInt("parallelism-min") should ===(8) pool.getDouble("parallelism-factor") should ===(3.0) pool.getInt("parallelism-max") should ===(64) + pool.getString("task-peeking-mode") should be("FIFO") } //Thread pool executor config diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 7d1adcac02..7eab4660ec 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -291,6 +291,10 @@ akka { # Max number of threads to cap factor-based parallelism number to parallelism-max = 64 + + # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack + # like peeking mode which "pop". + task-peeking-mode = "FIFO" } # This will be used if you have set "executor = "thread-pool-executor"" diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 095c4f2aac..abb5495b45 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -5,21 +5,19 @@ package akka.dispatch import java.util.concurrent._ -import akka.event.Logging.{ Debug, Error, LogEventException } +import java.{ util ⇒ ju } + import akka.actor._ import akka.dispatch.sysmsg._ import akka.event.EventStream -import com.typesafe.config.{ ConfigFactory, Config } -import akka.util.{ Unsafe, Index } +import akka.event.Logging.{ Debug, Error, LogEventException } +import akka.util.{ Index, Unsafe } +import com.typesafe.config.Config import scala.annotation.tailrec -import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } -import scala.concurrent.duration.Duration -import scala.concurrent.ExecutionContext -import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinTask } import scala.util.control.NonFatal -import scala.util.Try -import java.{ util ⇒ ju } final case class Envelope private (val message: Any, val sender: ActorRef) @@ -84,8 +82,8 @@ private[akka] object MessageDispatcher { abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContextExecutor { - import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } + import MessageDispatcher._ import configurator.prerequisites val mailboxes = prerequisites.mailboxes @@ -375,8 +373,13 @@ object ForkJoinExecutorConfigurator { */ final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, - unhandledExceptionHandler: Thread.UncaughtExceptionHandler) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { + unhandledExceptionHandler: Thread.UncaughtExceptionHandler, + asyncMode: Boolean) + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics { + def this(parallelism: Int, + threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true) + override def execute(r: Runnable): Unit = if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) @@ -414,9 +417,12 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer } class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, - val parallelism: Int) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing) + val parallelism: Int, + val asyncMode: Boolean) extends ExecutorServiceFactory { + def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true) + def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) } + final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { val tf = threadFactory match { case m: MonitorableThreadFactory ⇒ @@ -424,12 +430,20 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer m.withName(m.name + "-" + id) case other ⇒ other } + + val asyncMode = config.getString("task-peeking-mode") match { + case "FIFO" ⇒ true + case "LIFO" ⇒ false + case unsupported ⇒ throw new IllegalArgumentException(s"""Cannot instantiate ForkJoinExecutorServiceFactory. "task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LILO".""") + } + new ForkJoinExecutorServiceFactory( validate(tf), ThreadPoolConfig.scaledPoolSize( config.getInt("parallelism-min"), config.getDouble("parallelism-factor"), - config.getInt("parallelism-max"))) + config.getInt("parallelism-max")), + asyncMode) } }