chore: disable batch if isVirtualized (#2046)

This commit is contained in:
He-Pin(kerr) 2025-08-23 16:59:39 +08:00 committed by GitHub
parent c346c14660
commit b3d2b3fbf1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 18 additions and 16 deletions

View file

@ -411,6 +411,7 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
override def isVirtualized: Boolean = true
override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
import VirtualThreadSupport._

View file

@ -64,6 +64,7 @@ class Dispatcher(
new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, threadFactory))
protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate
private val isVirtualized = executorServiceFactoryProvider.isVirtualized
/**
* INTERNAL API
@ -71,7 +72,12 @@ class Dispatcher(
protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
registerForExecution(mbox, hasMessageHint = true, hasSystemMessageHint = false)
}
final override def batchable(runnable: Runnable): Boolean = {
// If this is a virtualized, we don't batch, otherwise, too much threadLocals.
if (isVirtualized) false else super.batchable(runnable)
}
/**

View file

@ -73,6 +73,7 @@ object ForkJoinExecutorConfigurator {
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
final override val isVirtualized: Boolean = config.getBoolean("virtualize") && JavaVersion.majorVersion >= 21
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
@ -86,30 +87,23 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
val maxPoolSize: Int,
val virtualize: Boolean)
val maxPoolSize: Int)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean,
maxPoolSize: Int,
virtualize: Boolean) =
this(null, threadFactory, parallelism, asyncMode, maxPoolSize, virtualize)
maxPoolSize: Int) =
this(null, threadFactory, parallelism, asyncMode, maxPoolSize)
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false)
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean,
maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false)
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService = {
val tf = if (virtualize && JavaVersion.majorVersion >= 21) {
val tf = if (isVirtualized) {
threadFactory match {
// we need to use the thread factory to create carrier thread
case m: MonitorableThreadFactory => new MonitorableCarrierThreadFactory(m.name)
@ -119,7 +113,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode)
if (virtualize && JavaVersion.majorVersion >= 21) {
if (isVirtualized) {
// when virtualized, we need enhanced thread factory
val factory: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
@ -173,7 +167,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode,
config.getInt("maximum-pool-size"),
config.getBoolean("virtualize"))
config.getInt("maximum-pool-size")
)
}
}

View file

@ -72,6 +72,7 @@ trait ExecutorServiceFactory {
*/
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory
def isVirtualized: Boolean = false // can be overridden by implementations
}
/**