#2778 - make thread names unique per system
The MonitorableThreadFactory.copy method does not take into account the counter:AtomicLong field, which then always starts out at zero for every new dispatcher; the PinnedDispatcher creates one dispatcher per actor, hence resulting in identical thread names if used by multiple actors. Solution: add the counter to the case class’ arguments
This commit is contained in:
parent
cd0fa5aee2
commit
6450831e01
5 changed files with 9 additions and 7 deletions
|
|
@ -613,7 +613,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
|||
protected def createScheduler(): Scheduler =
|
||||
new DefaultScheduler(
|
||||
new HashedWheelTimer(log,
|
||||
threadFactory.copy(threadFactory.name + "-scheduler"),
|
||||
threadFactory.withName(threadFactory.name + "-scheduler"),
|
||||
settings.SchedulerTickDuration,
|
||||
settings.SchedulerTicksPerWheel),
|
||||
log)
|
||||
|
|
|
|||
|
|
@ -527,7 +527,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
|
|||
val tf = threadFactory match {
|
||||
case m: MonitorableThreadFactory ⇒
|
||||
// add the dispatcher id to the thread names
|
||||
m.copy(m.name + "-" + id)
|
||||
m.withName(m.name + "-" + id)
|
||||
case other ⇒ other
|
||||
}
|
||||
new ForkJoinExecutorServiceFactory(
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class PinnedDispatcher(
|
|||
_id: String,
|
||||
_mailboxType: MailboxType,
|
||||
_shutdownTimeout: FiniteDuration,
|
||||
_threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig())
|
||||
_threadPoolConfig: ThreadPoolConfig)
|
||||
extends Dispatcher(_prerequisites,
|
||||
_id,
|
||||
Int.MaxValue,
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
|
|||
val tf = threadFactory match {
|
||||
case m: MonitorableThreadFactory ⇒
|
||||
// add the dispatcher id to the thread names
|
||||
m.copy(m.name + "-" + id)
|
||||
m.withName(m.name + "-" + id)
|
||||
case other ⇒ other
|
||||
}
|
||||
new ThreadPoolExecutorServiceFactory(tf)
|
||||
|
|
@ -183,9 +183,9 @@ object MonitorableThreadFactory {
|
|||
case class MonitorableThreadFactory(name: String,
|
||||
daemonic: Boolean,
|
||||
contextClassLoader: Option[ClassLoader],
|
||||
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
|
||||
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing,
|
||||
protected val counter: AtomicLong = new AtomicLong)
|
||||
extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
|
||||
protected val counter = new AtomicLong
|
||||
|
||||
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
|
||||
val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool))
|
||||
|
|
@ -196,6 +196,8 @@ case class MonitorableThreadFactory(name: String,
|
|||
|
||||
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + "-" + counter.incrementAndGet()))
|
||||
|
||||
def withName(newName: String): MonitorableThreadFactory = copy(newName)
|
||||
|
||||
protected def wire[T <: Thread](t: T): T = {
|
||||
t.setUncaughtExceptionHandler(exceptionHandler)
|
||||
t.setDaemon(daemonic)
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
new DefaultScheduler(
|
||||
new HashedWheelTimer(log,
|
||||
system.threadFactory match {
|
||||
case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler")
|
||||
case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler")
|
||||
case tf ⇒ tf
|
||||
},
|
||||
SchedulerTickDuration,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue