diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala index 455a4e2f1f..1d4ca6db94 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala @@ -22,7 +22,6 @@ import com.typesafe.config.ConfigFactory import org.apache.pekko import pekko.actor.{ Actor, Props } import pekko.testkit.{ ImplicitSender, PekkoSpec } -import pekko.util.JavaVersion object ForkJoinPoolVirtualThreadSpec { val config = ConfigFactory.parseString(""" @@ -30,6 +29,7 @@ object ForkJoinPoolVirtualThreadSpec { | task-dispatcher { | mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" | throughput = 5 + | executor = "fork-join-executor" | fork-join-executor { | parallelism-factor = 2 | parallelism-max = 2 diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala new file mode 100644 index 0000000000..583f911877 --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import com.typesafe.config.ConfigFactory +import org.apache.pekko +import pekko.actor.{ Actor, Props } +import pekko.testkit.{ ImplicitSender, PekkoSpec } + +object ThreadPoolVirtualThreadSpec { + val config = ConfigFactory.parseString(""" + |custom { + | task-dispatcher { + | mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + | throughput = 5 + | executor = "thread-pool-executor" + | thread-pool-executor { + | fixed-pool-size = 4 + | virtualize = on + | } + | } + |} + """.stripMargin) + + class ThreadNameActor extends Actor { + + override def receive = { + case "ping" => + sender() ! Thread.currentThread().getName + } + } + +} + +class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender { + import ThreadPoolVirtualThreadSpec._ + + "ThreadPool" must { + + "support virtualization with Virtual Thread" in { + val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("custom.task-dispatcher")) + for (_ <- 1 to 1000) { + actor ! "ping" + expectMsgPF() { case name: String => + name should include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-") + } + } + } + + } +} diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index 02d94460c1..b71de5f6ba 100644 --- a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -148,4 +148,8 @@ ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI") ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory") - +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.unapply") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig#ThreadPoolExecutorServiceFactory.this") diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 0cb04fabcc..740c2975b4 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -482,7 +482,7 @@ pekko { maximum-pool-size = 32767 # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, - # When set to `on` but underlying runtime does not support virtual threads, an Exception will throw. + # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown. # Virtualize this dispatcher as a virtual-thread-executor # Valid values are: `on`, `off` # @@ -543,6 +543,18 @@ pekko { # Allow core threads to time out allow-core-timeout = on + + # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, + # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown. + # Virtualize this dispatcher as a virtual-thread-executor + # Valid values are: `on`, `off` + # + # Requirements: + # 1. JDK 21+ + # 2. add options to the JVM: + # --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED + # --add-opens=java.base/java.lang=ALL-UNNAMED + virtualize = off } # This will be used if you have set "executor = "virtual-thread-executor" @@ -600,6 +612,17 @@ pekko { thread-pool-executor { fixed-pool-size = 16 + # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, + # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown. + # Virtualize this dispatcher as a virtual-thread-executor + # Valid values are: `on`, `off` + # + # Requirements: + # 1. JDK 21+ + # 2. add options to the JVM: + # --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED + # --add-opens=java.base/java.lang=ALL-UNNAMED + virtualize = off } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 21c9e5a2f2..dcbac6387f 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -28,7 +28,7 @@ import pekko.dispatch.affinity.AffinityPoolConfigurator import pekko.dispatch.sysmsg._ import pekko.event.EventStream import pekko.event.Logging.{ emptyMDC, Debug, Error, LogEventException, Warning } -import pekko.util.{ unused, Index } +import pekko.util.{ unused, Index, JavaVersion } import com.typesafe.config.Config @@ -471,42 +471,49 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider { def threadPoolConfig: ThreadPoolConfig def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - class ThreadPoolExecutorServiceFactory(threadFactory: ThreadFactory) extends ExecutorServiceFactory { + + object ThreadPoolExecutorServiceFactory extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { + val tf = threadFactory match { + case m: MonitorableThreadFactory => m.withName(m.name + "-" + id) + case _ => threadFactory + } + val poolThreadFactory = tf match { + case m: MonitorableThreadFactory if isVirtualized => m.withName(m.name + "-" + "CarrierThread") + case _ => tf + } + val config = threadPoolConfig - val service: ThreadPoolExecutor = new ThreadPoolExecutor( + val pool = new ThreadPoolExecutor( config.corePoolSize, config.maxPoolSize, config.threadTimeout.length, config.threadTimeout.unit, config.queueFactory(), - threadFactory, + poolThreadFactory, config.rejectionPolicy) with LoadMetrics { def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize } - service.allowCoreThreadTimeOut(config.allowCorePoolTimeout) - service - } - } + pool.allowCoreThreadTimeOut(config.allowCorePoolTimeout) - def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val tf = threadFactory match { - case m: MonitorableThreadFactory => - // add the dispatcher id to the thread names - m.withName(m.name + "-" + id) - case other => other + if (isVirtualized) { + val prefixName = threadFactory match { + case m: MonitorableThreadFactory => m.name + "-" + id + case _ => id + } + createVirtualized(tf, pool, prefixName) + } else pool } - new ThreadPoolExecutorServiceFactory(tf) } - createExecutorServiceFactory(id, threadFactory) + ThreadPoolExecutorServiceFactory } } class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) with ThreadPoolExecutorServiceFactoryProvider { - override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config + override val isVirtualized: Boolean = threadPoolConfig.isVirtualized && JavaVersion.majorVersion >= 21 protected def createThreadPoolConfigBuilder( config: Config, @@ -516,6 +523,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr ThreadPoolConfigBuilder(ThreadPoolConfig()) .setKeepAliveTime(config.getMillisDuration("keep-alive-time")) .setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout")) + .isVirtualized(config.getBoolean("virtualize")) .configure(Some(config.getInt("task-queue-size")).flatMap { case size if size > 0 => Some(config.getString("task-queue-type")) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 66d1e36918..15aef6536b 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -15,10 +15,9 @@ package org.apache.pekko.dispatch import com.typesafe.config.Config import org.apache.pekko -import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory import pekko.util.JavaVersion -import java.util.concurrent.{ Executor, ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit } +import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit } object ForkJoinExecutorConfigurator { @@ -114,28 +113,8 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode) if (isVirtualized) { - // when virtualized, we need enhanced thread factory - val factory: ThreadFactory = threadFactory match { - case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => - new ThreadFactory { - private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler - - override def newThread(r: Runnable): Thread = { - val vt = vtFactory.newThread(r) - vt.setUncaughtExceptionHandler(exceptionHandler) - contextClassLoader.foreach(vt.setContextClassLoader) - vt - } - } - case _ => newVirtualThreadFactory(prerequisites.settings.name, pool); // use the pool as the scheduler - } - // wrap the pool with virtualized executor service - new VirtualizedExecutorService( - factory, // the virtual thread factory - pool, // the underlying pool - (_: Executor) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself - cascadeShutdown = true // cascade shutdown - ) + // we need to cast here, + createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id) } else { pool } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala index eff0acaf8b..de6c1377f4 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala @@ -13,13 +13,16 @@ package org.apache.pekko.dispatch -import org.apache.pekko.annotation.InternalApi +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory import java.util.Collection import java.util.concurrent.{ ArrayBlockingQueue, BlockingQueue, Callable, + Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, @@ -74,6 +77,34 @@ trait ExecutorServiceFactory { trait ExecutorServiceFactoryProvider { def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory def isVirtualized: Boolean = false // can be overridden by implementations + + protected def createVirtualized( + threadFactory: ThreadFactory, + pool: ExecutorService with LoadMetrics, + prefixName: String): ExecutorService = { + // when virtualized, we need enhanced thread factory + val factory: ThreadFactory = threadFactory match { + case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => + new ThreadFactory { + private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler + + override def newThread(r: Runnable): Thread = { + val vt = vtFactory.newThread(r) + vt.setUncaughtExceptionHandler(exceptionHandler) + contextClassLoader.foreach(vt.setContextClassLoader) + vt + } + } + case _ => newVirtualThreadFactory(prefixName, pool); // use the pool as the scheduler + } + // wrap the pool with virtualized executor service + new VirtualizedExecutorService( + factory, // the virtual thread factory + pool, // the underlying pool + (_: Executor) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself + cascadeShutdown = true // cascade shutdown + ) + } } /** @@ -88,7 +119,8 @@ final case class ThreadPoolConfig( maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(), - rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) { + rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy, + isVirtualized: Boolean = false) { // Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra // context information on the config @noinline @@ -98,9 +130,11 @@ final case class ThreadPoolConfig( maxPoolSize: Int = maxPoolSize, threadTimeout: Duration = threadTimeout, queueFactory: ThreadPoolConfig.QueueFactory = queueFactory, - rejectionPolicy: RejectedExecutionHandler = rejectionPolicy + rejectionPolicy: RejectedExecutionHandler = rejectionPolicy, + isVirtualized: Boolean = isVirtualized ): ThreadPoolConfig = - ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy) + ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy, + isVirtualized) } /** @@ -156,6 +190,9 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) { def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = newQueueFactory)) + def isVirtualized(isVirtualized: Boolean): ThreadPoolConfigBuilder = + this.copy(config = config.copy(isVirtualized = isVirtualized)) + def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c)) } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala index 86dea7d581..0396aad36e 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal @InternalApi private[dispatch] object VirtualThreadSupport { + val zero = java.lang.Long.valueOf(0L) private val lookup = MethodHandles.publicLookup() /** @@ -67,7 +68,7 @@ private[dispatch] object VirtualThreadSupport { MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long])) // TODO support replace scheduler when we drop Java 8 support val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory])) - builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L) + builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero) factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] } catch { case NonFatal(e) => @@ -93,7 +94,6 @@ private[dispatch] object VirtualThreadSupport { } val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long]) val factoryMethod = builderClass.getDeclaredMethod("factory") - val zero = java.lang.Long.valueOf(0L) builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero) factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] } catch {