+act #17274 make ForkJoinPool asyncMode configurable

(cherry picked from commit 05f156bdc0eae121aa122028a582084bc4bb22dc)
This commit is contained in:
hepin 2015-05-08 14:07:06 +08:00 committed by Patrik Nordwall
parent 2d70599d22
commit e4baf1d82e
3 changed files with 42 additions and 22 deletions

View file

@ -4,14 +4,15 @@
package akka.config package akka.config
import language.postfixOps import java.util.concurrent.TimeUnit
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.Logging.DefaultLogger import akka.event.Logging.DefaultLogger
import java.util.concurrent.TimeUnit import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.event.DefaultLoggingFilter import akka.event.DefaultLoggingFilter
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -95,6 +96,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
pool.getInt("parallelism-min") should ===(8) pool.getInt("parallelism-min") should ===(8)
pool.getDouble("parallelism-factor") should ===(3.0) pool.getDouble("parallelism-factor") should ===(3.0)
pool.getInt("parallelism-max") should ===(64) pool.getInt("parallelism-max") should ===(64)
pool.getString("task-peeking-mode") should be("FIFO")
} }
//Thread pool executor config //Thread pool executor config

View file

@ -291,6 +291,10 @@ akka {
# Max number of threads to cap factor-based parallelism number to # Max number of threads to cap factor-based parallelism number to
parallelism-max = 64 parallelism-max = 64
# Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack
# like peeking mode which "pop".
task-peeking-mode = "FIFO"
} }
# This will be used if you have set "executor = "thread-pool-executor"" # This will be used if you have set "executor = "thread-pool-executor""

View file

@ -5,21 +5,19 @@
package akka.dispatch package akka.dispatch
import java.util.concurrent._ import java.util.concurrent._
import akka.event.Logging.{ Debug, Error, LogEventException } import java.{ util ju }
import akka.actor._ import akka.actor._
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.event.EventStream import akka.event.EventStream
import com.typesafe.config.{ ConfigFactory, Config } import akka.event.Logging.{ Debug, Error, LogEventException }
import akka.util.{ Unsafe, Index } import akka.util.{ Index, Unsafe }
import com.typesafe.config.Config
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.concurrent.duration.Duration import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.ExecutionContext import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinTask }
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.Try
import java.{ util ju }
final case class Envelope private (val message: Any, val sender: ActorRef) final case class Envelope private (val message: Any, val sender: ActorRef)
@ -84,8 +82,8 @@ private[akka] object MessageDispatcher {
abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContextExecutor { abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContextExecutor {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
import MessageDispatcher._
import configurator.prerequisites import configurator.prerequisites
val mailboxes = prerequisites.mailboxes val mailboxes = prerequisites.mailboxes
@ -375,8 +373,13 @@ object ForkJoinExecutorConfigurator {
*/ */
final class AkkaForkJoinPool(parallelism: Int, final class AkkaForkJoinPool(parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler) unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics {
def this(parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true)
override def execute(r: Runnable): Unit = override def execute(r: Runnable): Unit =
if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r))
@ -414,9 +417,12 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
} }
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int) extends ExecutorServiceFactory { val parallelism: Int,
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing) val asyncMode: Boolean) extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
} }
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match { val tf = threadFactory match {
case m: MonitorableThreadFactory case m: MonitorableThreadFactory
@ -424,12 +430,20 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
m.withName(m.name + "-" + id) m.withName(m.name + "-" + id)
case other other case other other
} }
val asyncMode = config.getString("task-peeking-mode") match {
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException(s"""Cannot instantiate ForkJoinExecutorServiceFactory. "task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LILO".""")
}
new ForkJoinExecutorServiceFactory( new ForkJoinExecutorServiceFactory(
validate(tf), validate(tf),
ThreadPoolConfig.scaledPoolSize( ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"), config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"), config.getDouble("parallelism-factor"),
config.getInt("parallelism-max"))) config.getInt("parallelism-max")),
asyncMode)
} }
} }