chore: Remove ExecutorServiceFactoryProvider from ThreadPoolConfig (#2175)
This commit is contained in:
parent
8d9450b19e
commit
7b96bb8385
4 changed files with 57 additions and 38 deletions
|
|
@ -145,3 +145,7 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.Un
|
||||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchEquals")
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchEquals")
|
||||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchAny")
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchAny")
|
||||||
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI")
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI")
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory")
|
||||||
|
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ import scala.util.control.NonFatal
|
||||||
|
|
||||||
import org.apache.pekko
|
import org.apache.pekko
|
||||||
import pekko.actor._
|
import pekko.actor._
|
||||||
import pekko.annotation.InternalStableApi
|
import pekko.annotation.{ InternalApi, InternalStableApi }
|
||||||
import pekko.dispatch.affinity.AffinityPoolConfigurator
|
import pekko.dispatch.affinity.AffinityPoolConfigurator
|
||||||
import pekko.dispatch.sysmsg._
|
import pekko.dispatch.sysmsg._
|
||||||
import pekko.event.EventStream
|
import pekko.event.EventStream
|
||||||
|
|
@ -464,10 +464,49 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
|
/**
|
||||||
extends ExecutorServiceConfigurator(config, prerequisites) {
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider {
|
||||||
|
def threadPoolConfig: ThreadPoolConfig
|
||||||
|
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
|
||||||
|
class ThreadPoolExecutorServiceFactory(threadFactory: ThreadFactory) extends ExecutorServiceFactory {
|
||||||
|
def createExecutorService: ExecutorService = {
|
||||||
|
val config = threadPoolConfig
|
||||||
|
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
|
||||||
|
config.corePoolSize,
|
||||||
|
config.maxPoolSize,
|
||||||
|
config.threadTimeout.length,
|
||||||
|
config.threadTimeout.unit,
|
||||||
|
config.queueFactory(),
|
||||||
|
threadFactory,
|
||||||
|
config.rejectionPolicy) with LoadMetrics {
|
||||||
|
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
|
||||||
|
}
|
||||||
|
service.allowCoreThreadTimeOut(config.allowCorePoolTimeout)
|
||||||
|
service
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
|
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
|
||||||
|
val tf = threadFactory match {
|
||||||
|
case m: MonitorableThreadFactory =>
|
||||||
|
// add the dispatcher id to the thread names
|
||||||
|
m.withName(m.name + "-" + id)
|
||||||
|
case other => other
|
||||||
|
}
|
||||||
|
new ThreadPoolExecutorServiceFactory(tf)
|
||||||
|
}
|
||||||
|
createExecutorServiceFactory(id, threadFactory)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
|
||||||
|
extends ExecutorServiceConfigurator(config, prerequisites)
|
||||||
|
with ThreadPoolExecutorServiceFactoryProvider {
|
||||||
|
|
||||||
|
override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
|
||||||
|
|
||||||
protected def createThreadPoolConfigBuilder(
|
protected def createThreadPoolConfigBuilder(
|
||||||
config: Config,
|
config: Config,
|
||||||
|
|
@ -505,9 +544,6 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
|
||||||
else
|
else
|
||||||
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
|
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
|
||||||
}
|
}
|
||||||
|
|
||||||
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
|
|
||||||
threadPoolConfig.createExecutorServiceFactory(id, threadFactory)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class DefaultExecutorServiceConfigurator(
|
class DefaultExecutorServiceConfigurator(
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,9 @@ class PinnedDispatcher(
|
||||||
_id,
|
_id,
|
||||||
Int.MaxValue,
|
Int.MaxValue,
|
||||||
Duration.Zero,
|
Duration.Zero,
|
||||||
_threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
|
new ThreadPoolExecutorServiceFactoryProvider() {
|
||||||
|
override def threadPoolConfig: ThreadPoolConfig = _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1)
|
||||||
|
},
|
||||||
_shutdownTimeout) {
|
_shutdownTimeout) {
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,8 @@
|
||||||
|
|
||||||
package org.apache.pekko.dispatch
|
package org.apache.pekko.dispatch
|
||||||
|
|
||||||
|
import org.apache.pekko.annotation.InternalApi
|
||||||
|
|
||||||
import java.util.Collection
|
import java.util.Collection
|
||||||
import java.util.concurrent.{
|
import java.util.concurrent.{
|
||||||
ArrayBlockingQueue,
|
ArrayBlockingQueue,
|
||||||
|
|
@ -30,7 +32,6 @@ import java.util.concurrent.{
|
||||||
TimeUnit
|
TimeUnit
|
||||||
}
|
}
|
||||||
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
|
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
|
||||||
|
|
||||||
import scala.concurrent.{ BlockContext, CanAwait }
|
import scala.concurrent.{ BlockContext, CanAwait }
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
|
|
||||||
|
|
@ -76,16 +77,18 @@ trait ExecutorServiceFactoryProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
|
* INTERNAL API
|
||||||
|
*
|
||||||
|
* Configuration object for ThreadPoolExecutor
|
||||||
*/
|
*/
|
||||||
|
@InternalApi
|
||||||
final case class ThreadPoolConfig(
|
final case class ThreadPoolConfig(
|
||||||
allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
|
allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
|
||||||
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
|
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
|
||||||
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
||||||
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
|
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
|
||||||
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
|
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
|
||||||
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
|
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) {
|
||||||
extends ExecutorServiceFactoryProvider {
|
|
||||||
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
|
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
|
||||||
// context information on the config
|
// context information on the config
|
||||||
@noinline
|
@noinline
|
||||||
|
|
@ -98,32 +101,6 @@ final case class ThreadPoolConfig(
|
||||||
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy
|
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy
|
||||||
): ThreadPoolConfig =
|
): ThreadPoolConfig =
|
||||||
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
|
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
|
||||||
|
|
||||||
class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
|
|
||||||
def createExecutorService: ExecutorService = {
|
|
||||||
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
|
|
||||||
corePoolSize,
|
|
||||||
maxPoolSize,
|
|
||||||
threadTimeout.length,
|
|
||||||
threadTimeout.unit,
|
|
||||||
queueFactory(),
|
|
||||||
threadFactory,
|
|
||||||
rejectionPolicy) with LoadMetrics {
|
|
||||||
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
|
|
||||||
}
|
|
||||||
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
|
||||||
service
|
|
||||||
}
|
|
||||||
}
|
|
||||||
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
|
|
||||||
val tf = threadFactory match {
|
|
||||||
case m: MonitorableThreadFactory =>
|
|
||||||
// add the dispatcher id to the thread names
|
|
||||||
m.withName(m.name + "-" + id)
|
|
||||||
case other => other
|
|
||||||
}
|
|
||||||
new ThreadPoolExecutorServiceFactory(tf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue