feat: Add virtualize support for thread-pool-executor (#2169)

* feat: Add virtualize support for thread-pool-executor

* Update actor/src/main/resources/reference.conf

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>

* Update actor/src/main/resources/reference.conf

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>

* Update actor/src/main/resources/reference.conf

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>

---------

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>
This commit is contained in:
He-Pin(kerr) 2025-09-08 00:17:14 +08:00 committed by GitHub
parent 7b96bb8385
commit 458ceed37e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 167 additions and 50 deletions

View file

@ -22,7 +22,6 @@ import com.typesafe.config.ConfigFactory
import org.apache.pekko import org.apache.pekko
import pekko.actor.{ Actor, Props } import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec } import pekko.testkit.{ ImplicitSender, PekkoSpec }
import pekko.util.JavaVersion
object ForkJoinPoolVirtualThreadSpec { object ForkJoinPoolVirtualThreadSpec {
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
@ -30,6 +29,7 @@ object ForkJoinPoolVirtualThreadSpec {
| task-dispatcher { | task-dispatcher {
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" | mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 5 | throughput = 5
| executor = "fork-join-executor"
| fork-join-executor { | fork-join-executor {
| parallelism-factor = 2 | parallelism-factor = 2
| parallelism-max = 2 | parallelism-max = 2

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.dispatch
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }
object ThreadPoolVirtualThreadSpec {
val config = ConfigFactory.parseString("""
|custom {
| task-dispatcher {
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 5
| executor = "thread-pool-executor"
| thread-pool-executor {
| fixed-pool-size = 4
| virtualize = on
| }
| }
|}
""".stripMargin)
class ThreadNameActor extends Actor {
override def receive = {
case "ping" =>
sender() ! Thread.currentThread().getName
}
}
}
class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender {
import ThreadPoolVirtualThreadSpec._
"ThreadPool" must {
"support virtualization with Virtual Thread" in {
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("custom.task-dispatcher"))
for (_ <- 1 to 1000) {
actor ! "ping"
expectMsgPF() { case name: String =>
name should include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-")
}
}
}
}
}

View file

@ -148,4 +148,8 @@ ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig") ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig#ThreadPoolExecutorServiceFactory.this")

View file

@ -482,7 +482,7 @@ pekko {
maximum-pool-size = 32767 maximum-pool-size = 32767
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw. # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
# Virtualize this dispatcher as a virtual-thread-executor # Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off` # Valid values are: `on`, `off`
# #
@ -543,6 +543,18 @@ pekko {
# Allow core threads to time out # Allow core threads to time out
allow-core-timeout = on allow-core-timeout = on
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
# Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off`
#
# Requirements:
# 1. JDK 21+
# 2. add options to the JVM:
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
# --add-opens=java.base/java.lang=ALL-UNNAMED
virtualize = off
} }
# This will be used if you have set "executor = "virtual-thread-executor" # This will be used if you have set "executor = "virtual-thread-executor"
@ -600,6 +612,17 @@ pekko {
thread-pool-executor { thread-pool-executor {
fixed-pool-size = 16 fixed-pool-size = 16
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
# Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off`
#
# Requirements:
# 1. JDK 21+
# 2. add options to the JVM:
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
# --add-opens=java.base/java.lang=ALL-UNNAMED
virtualize = off
} }
} }

View file

@ -28,7 +28,7 @@ import pekko.dispatch.affinity.AffinityPoolConfigurator
import pekko.dispatch.sysmsg._ import pekko.dispatch.sysmsg._
import pekko.event.EventStream import pekko.event.EventStream
import pekko.event.Logging.{ emptyMDC, Debug, Error, LogEventException, Warning } import pekko.event.Logging.{ emptyMDC, Debug, Error, LogEventException, Warning }
import pekko.util.{ unused, Index } import pekko.util.{ unused, Index, JavaVersion }
import com.typesafe.config.Config import com.typesafe.config.Config
@ -471,42 +471,49 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider { trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider {
def threadPoolConfig: ThreadPoolConfig def threadPoolConfig: ThreadPoolConfig
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
class ThreadPoolExecutorServiceFactory(threadFactory: ThreadFactory) extends ExecutorServiceFactory {
object ThreadPoolExecutorServiceFactory extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = { def createExecutorService: ExecutorService = {
val tf = threadFactory match {
case m: MonitorableThreadFactory => m.withName(m.name + "-" + id)
case _ => threadFactory
}
val poolThreadFactory = tf match {
case m: MonitorableThreadFactory if isVirtualized => m.withName(m.name + "-" + "CarrierThread")
case _ => tf
}
val config = threadPoolConfig val config = threadPoolConfig
val service: ThreadPoolExecutor = new ThreadPoolExecutor( val pool = new ThreadPoolExecutor(
config.corePoolSize, config.corePoolSize,
config.maxPoolSize, config.maxPoolSize,
config.threadTimeout.length, config.threadTimeout.length,
config.threadTimeout.unit, config.threadTimeout.unit,
config.queueFactory(), config.queueFactory(),
threadFactory, poolThreadFactory,
config.rejectionPolicy) with LoadMetrics { config.rejectionPolicy) with LoadMetrics {
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
} }
service.allowCoreThreadTimeOut(config.allowCorePoolTimeout) pool.allowCoreThreadTimeOut(config.allowCorePoolTimeout)
service
}
}
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { if (isVirtualized) {
val tf = threadFactory match { val prefixName = threadFactory match {
case m: MonitorableThreadFactory => case m: MonitorableThreadFactory => m.name + "-" + id
// add the dispatcher id to the thread names case _ => id
m.withName(m.name + "-" + id) }
case other => other createVirtualized(tf, pool, prefixName)
} else pool
} }
new ThreadPoolExecutorServiceFactory(tf)
} }
createExecutorServiceFactory(id, threadFactory) ThreadPoolExecutorServiceFactory
} }
} }
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) extends ExecutorServiceConfigurator(config, prerequisites)
with ThreadPoolExecutorServiceFactoryProvider { with ThreadPoolExecutorServiceFactoryProvider {
override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
override val isVirtualized: Boolean = threadPoolConfig.isVirtualized && JavaVersion.majorVersion >= 21
protected def createThreadPoolConfigBuilder( protected def createThreadPoolConfigBuilder(
config: Config, config: Config,
@ -516,6 +523,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
ThreadPoolConfigBuilder(ThreadPoolConfig()) ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time")) .setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout")) .setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout"))
.isVirtualized(config.getBoolean("virtualize"))
.configure(Some(config.getInt("task-queue-size")).flatMap { .configure(Some(config.getInt("task-queue-size")).flatMap {
case size if size > 0 => case size if size > 0 =>
Some(config.getString("task-queue-type")) Some(config.getString("task-queue-type"))

View file

@ -15,10 +15,9 @@ package org.apache.pekko.dispatch
import com.typesafe.config.Config import com.typesafe.config.Config
import org.apache.pekko import org.apache.pekko
import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory
import pekko.util.JavaVersion import pekko.util.JavaVersion
import java.util.concurrent.{ Executor, ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit } import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit }
object ForkJoinExecutorConfigurator { object ForkJoinExecutorConfigurator {
@ -114,28 +113,8 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode) val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode)
if (isVirtualized) { if (isVirtualized) {
// when virtualized, we need enhanced thread factory // we need to cast here,
val factory: ThreadFactory = threadFactory match { createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id)
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
new ThreadFactory {
private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler
override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
vt.setUncaughtExceptionHandler(exceptionHandler)
contextClassLoader.foreach(vt.setContextClassLoader)
vt
}
}
case _ => newVirtualThreadFactory(prerequisites.settings.name, pool); // use the pool as the scheduler
}
// wrap the pool with virtualized executor service
new VirtualizedExecutorService(
factory, // the virtual thread factory
pool, // the underlying pool
(_: Executor) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself
cascadeShutdown = true // cascade shutdown
)
} else { } else {
pool pool
} }

View file

@ -13,13 +13,16 @@
package org.apache.pekko.dispatch package org.apache.pekko.dispatch
import org.apache.pekko.annotation.InternalApi import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory
import java.util.Collection import java.util.Collection
import java.util.concurrent.{ import java.util.concurrent.{
ArrayBlockingQueue, ArrayBlockingQueue,
BlockingQueue, BlockingQueue,
Callable, Callable,
Executor,
ExecutorService, ExecutorService,
ForkJoinPool, ForkJoinPool,
ForkJoinWorkerThread, ForkJoinWorkerThread,
@ -74,6 +77,34 @@ trait ExecutorServiceFactory {
trait ExecutorServiceFactoryProvider { trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory
def isVirtualized: Boolean = false // can be overridden by implementations def isVirtualized: Boolean = false // can be overridden by implementations
protected def createVirtualized(
threadFactory: ThreadFactory,
pool: ExecutorService with LoadMetrics,
prefixName: String): ExecutorService = {
// when virtualized, we need enhanced thread factory
val factory: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
new ThreadFactory {
private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler
override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
vt.setUncaughtExceptionHandler(exceptionHandler)
contextClassLoader.foreach(vt.setContextClassLoader)
vt
}
}
case _ => newVirtualThreadFactory(prefixName, pool); // use the pool as the scheduler
}
// wrap the pool with virtualized executor service
new VirtualizedExecutorService(
factory, // the virtual thread factory
pool, // the underlying pool
(_: Executor) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself
cascadeShutdown = true // cascade shutdown
)
}
} }
/** /**
@ -88,7 +119,8 @@ final case class ThreadPoolConfig(
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(), queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) { rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy,
isVirtualized: Boolean = false) {
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra // Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
// context information on the config // context information on the config
@noinline @noinline
@ -98,9 +130,11 @@ final case class ThreadPoolConfig(
maxPoolSize: Int = maxPoolSize, maxPoolSize: Int = maxPoolSize,
threadTimeout: Duration = threadTimeout, threadTimeout: Duration = threadTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = queueFactory, queueFactory: ThreadPoolConfig.QueueFactory = queueFactory,
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy rejectionPolicy: RejectedExecutionHandler = rejectionPolicy,
isVirtualized: Boolean = isVirtualized
): ThreadPoolConfig = ): ThreadPoolConfig =
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy) ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
isVirtualized)
} }
/** /**
@ -156,6 +190,9 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder = def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory)) this.copy(config = config.copy(queueFactory = newQueueFactory))
def isVirtualized(isVirtualized: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(isVirtualized = isVirtualized))
def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder =
fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c)) fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c))
} }

View file

@ -27,6 +27,7 @@ import scala.util.control.NonFatal
@InternalApi @InternalApi
private[dispatch] object VirtualThreadSupport { private[dispatch] object VirtualThreadSupport {
val zero = java.lang.Long.valueOf(0L)
private val lookup = MethodHandles.publicLookup() private val lookup = MethodHandles.publicLookup()
/** /**
@ -67,7 +68,7 @@ private[dispatch] object VirtualThreadSupport {
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long])) MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
// TODO support replace scheduler when we drop Java 8 support // TODO support replace scheduler when we drop Java 8 support
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory])) val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L) builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>
@ -93,7 +94,6 @@ private[dispatch] object VirtualThreadSupport {
} }
val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long]) val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long])
val factoryMethod = builderClass.getDeclaredMethod("factory") val factoryMethod = builderClass.getDeclaredMethod("factory")
val zero = java.lang.Long.valueOf(0L)
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero) builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
} catch { } catch {