From 6450831e01fb9ce8ffd74c8afec1e54eeb5ac6b3 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 6 Dec 2012 22:50:40 +0100 Subject: [PATCH] #2778 - make thread names unique per system MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The MonitorableThreadFactory.copy method does not take into account the counter:AtomicLong field, which then always starts out at zero for every new dispatcher; the PinnedDispatcher creates one dispatcher per actor, hence resulting in identical thread names if used by multiple actors. Solution: add the counter to the case class’ arguments --- akka-actor/src/main/scala/akka/actor/ActorSystem.scala | 2 +- .../src/main/scala/akka/dispatch/AbstractDispatcher.scala | 2 +- .../src/main/scala/akka/dispatch/PinnedDispatcher.scala | 2 +- .../src/main/scala/akka/dispatch/ThreadPoolBuilder.scala | 8 +++++--- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 8bada6e0ba..45025f1887 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -613,7 +613,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, protected def createScheduler(): Scheduler = new DefaultScheduler( new HashedWheelTimer(log, - threadFactory.copy(threadFactory.name + "-scheduler"), + threadFactory.withName(threadFactory.name + "-scheduler"), settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel), log) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8f13e5fa11..59f79a1b8e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -527,7 +527,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val tf = threadFactory match { case m: MonitorableThreadFactory ⇒ // add the dispatcher id to the thread names - m.copy(m.name + "-" + id) + m.withName(m.name + "-" + id) case other ⇒ other } new ForkJoinExecutorServiceFactory( diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index 52d5587597..eb5b2686c3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -20,7 +20,7 @@ class PinnedDispatcher( _id: String, _mailboxType: MailboxType, _shutdownTimeout: FiniteDuration, - _threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig()) + _threadPoolConfig: ThreadPoolConfig) extends Dispatcher(_prerequisites, _id, Int.MaxValue, diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 9d06a7b74c..f93b112a92 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -92,7 +92,7 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def val tf = threadFactory match { case m: MonitorableThreadFactory ⇒ // add the dispatcher id to the thread names - m.copy(m.name + "-" + id) + m.withName(m.name + "-" + id) case other ⇒ other } new ThreadPoolExecutorServiceFactory(tf) @@ -183,9 +183,9 @@ object MonitorableThreadFactory { case class MonitorableThreadFactory(name: String, daemonic: Boolean, contextClassLoader: Option[ClassLoader], - exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing) + exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing, + protected val counter: AtomicLong = new AtomicLong) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { - protected val counter = new AtomicLong def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool)) @@ -196,6 +196,8 @@ case class MonitorableThreadFactory(name: String, def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + "-" + counter.incrementAndGet())) + def withName(newName: String): MonitorableThreadFactory = copy(newName) + protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index e362b4ac34..60c48d7023 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -95,7 +95,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { new DefaultScheduler( new HashedWheelTimer(log, system.threadFactory match { - case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") + case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler") case tf ⇒ tf }, SchedulerTickDuration,