From b3d2b3fbf1756b9dc342ec8fe9c5f7d1477eed58 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sat, 23 Aug 2025 16:59:39 +0800 Subject: [PATCH] chore: disable batch if isVirtualized (#2046) --- .../pekko/dispatch/AbstractDispatcher.scala | 1 + .../apache/pekko/dispatch/Dispatcher.scala | 8 ++++++- .../ForkJoinExecutorConfigurator.scala | 24 +++++++------------ .../pekko/dispatch/ThreadPoolBuilder.scala | 1 + 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 5a0e98f1d1..274c58c53f 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -411,6 +411,7 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + override def isVirtualized: Boolean = true override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { import VirtualThreadSupport._ diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala index 01fe66f0a2..53e09b96f0 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala @@ -64,6 +64,7 @@ class Dispatcher( new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, threadFactory)) protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate + private val isVirtualized = executorServiceFactoryProvider.isVirtualized /** * INTERNAL API @@ -71,7 +72,12 @@ class Dispatcher( protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) - registerForExecution(mbox, true, false) + registerForExecution(mbox, hasMessageHint = true, hasSystemMessageHint = false) + } + + final override def batchable(runnable: Runnable): Boolean = { + // If this is a virtualized, we don't batch, otherwise, too much threadLocals. + if (isVirtualized) false else super.batchable(runnable) } /** diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 36d9820ae4..66d1e36918 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -73,6 +73,7 @@ object ForkJoinExecutorConfigurator { class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { import ForkJoinExecutorConfigurator._ + final override val isVirtualized: Boolean = config.getBoolean("virtualize") && JavaVersion.majorVersion >= 21 def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match { case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct @@ -86,30 +87,23 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int, val asyncMode: Boolean, - val maxPoolSize: Int, - val virtualize: Boolean) + val maxPoolSize: Int) extends ExecutorServiceFactory { def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int, asyncMode: Boolean, - maxPoolSize: Int, - virtualize: Boolean) = - this(null, threadFactory, parallelism, asyncMode, maxPoolSize, virtualize) + maxPoolSize: Int) = + this(null, threadFactory, parallelism, asyncMode, maxPoolSize) def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int, - asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false) - - def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, - parallelism: Int, - asyncMode: Boolean, - maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false) + asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap) def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true) def createExecutorService: ExecutorService = { - val tf = if (virtualize && JavaVersion.majorVersion >= 21) { + val tf = if (isVirtualized) { threadFactory match { // we need to use the thread factory to create carrier thread case m: MonitorableThreadFactory => new MonitorableCarrierThreadFactory(m.name) @@ -119,7 +113,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode) - if (virtualize && JavaVersion.majorVersion >= 21) { + if (isVirtualized) { // when virtualized, we need enhanced thread factory val factory: ThreadFactory = threadFactory match { case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => @@ -173,7 +167,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer config.getDouble("parallelism-factor"), config.getInt("parallelism-max")), asyncMode, - config.getInt("maximum-pool-size"), - config.getBoolean("virtualize")) + config.getInt("maximum-pool-size") + ) } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala index cf5a856ac2..b10cc46724 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala @@ -72,6 +72,7 @@ trait ExecutorServiceFactory { */ trait ExecutorServiceFactoryProvider { def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory + def isVirtualized: Boolean = false // can be overridden by implementations } /**