diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index a74f8a8452..02d94460c1 100644 --- a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -145,3 +145,7 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.Un ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchEquals") ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchAny") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory") + diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 63cd39db62..21c9e5a2f2 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -23,7 +23,7 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.actor._ -import pekko.annotation.InternalStableApi +import pekko.annotation.{ InternalApi, InternalStableApi } import pekko.dispatch.affinity.AffinityPoolConfigurator import pekko.dispatch.sysmsg._ import pekko.event.EventStream @@ -464,10 +464,49 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis } } -class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) - extends ExecutorServiceConfigurator(config, prerequisites) { +/** + * INTERNAL API + */ +@InternalApi +trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider { + def threadPoolConfig: ThreadPoolConfig + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + class ThreadPoolExecutorServiceFactory(threadFactory: ThreadFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = { + val config = threadPoolConfig + val service: ThreadPoolExecutor = new ThreadPoolExecutor( + config.corePoolSize, + config.maxPoolSize, + config.threadTimeout.length, + config.threadTimeout.unit, + config.queueFactory(), + threadFactory, + config.rejectionPolicy) with LoadMetrics { + def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize + } + service.allowCoreThreadTimeOut(config.allowCorePoolTimeout) + service + } + } - val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val tf = threadFactory match { + case m: MonitorableThreadFactory => + // add the dispatcher id to the thread names + m.withName(m.name + "-" + id) + case other => other + } + new ThreadPoolExecutorServiceFactory(tf) + } + createExecutorServiceFactory(id, threadFactory) + } +} + +class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ExecutorServiceConfigurator(config, prerequisites) + with ThreadPoolExecutorServiceFactoryProvider { + + override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config protected def createThreadPoolConfigBuilder( config: Config, @@ -505,9 +544,6 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr else builder.setFixedPoolSize(config.getInt("fixed-pool-size")) } - - def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = - threadPoolConfig.createExecutorServiceFactory(id, threadFactory) } class DefaultExecutorServiceConfigurator( diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/PinnedDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/PinnedDispatcher.scala index fd8b9b95f7..2e619f8a7e 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/PinnedDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/PinnedDispatcher.scala @@ -36,7 +36,9 @@ class PinnedDispatcher( _id, Int.MaxValue, Duration.Zero, - _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1), + new ThreadPoolExecutorServiceFactoryProvider() { + override def threadPoolConfig: ThreadPoolConfig = _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1) + }, _shutdownTimeout) { @volatile diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala index b10cc46724..eff0acaf8b 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala @@ -13,6 +13,8 @@ package org.apache.pekko.dispatch +import org.apache.pekko.annotation.InternalApi + import java.util.Collection import java.util.concurrent.{ ArrayBlockingQueue, @@ -30,7 +32,6 @@ import java.util.concurrent.{ TimeUnit } import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } - import scala.concurrent.{ BlockContext, CanAwait } import scala.concurrent.duration.Duration @@ -76,16 +77,18 @@ trait ExecutorServiceFactoryProvider { } /** - * A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher + * INTERNAL API + * + * Configuration object for ThreadPoolExecutor */ +@InternalApi final case class ThreadPoolConfig( allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(), - rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) - extends ExecutorServiceFactoryProvider { + rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) { // Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra // context information on the config @noinline @@ -98,32 +101,6 @@ final case class ThreadPoolConfig( rejectionPolicy: RejectedExecutionHandler = rejectionPolicy ): ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy) - - class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = { - val service: ThreadPoolExecutor = new ThreadPoolExecutor( - corePoolSize, - maxPoolSize, - threadTimeout.length, - threadTimeout.unit, - queueFactory(), - threadFactory, - rejectionPolicy) with LoadMetrics { - def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize - } - service.allowCoreThreadTimeOut(allowCorePoolTimeout) - service - } - } - def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val tf = threadFactory match { - case m: MonitorableThreadFactory => - // add the dispatcher id to the thread names - m.withName(m.name + "-" + id) - case other => other - } - new ThreadPoolExecutorServiceFactory(tf) - } } /**