From 1f988889c21984efad2ad396523f05c2fcc4e924 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 8 Feb 2012 11:53:55 +0100 Subject: [PATCH] Proper thread names for fork-join-executor. See #1805 --- .../akka/actor/dispatch/DispatchersSpec.scala | 41 ++++++++++++++++++- .../akka/dispatch/AbstractDispatcher.scala | 20 ++++++--- .../main/scala/akka/dispatch/Dispatcher.scala | 7 +--- .../akka/dispatch/ThreadPoolBuilder.scala | 13 ++++-- .../event/slf4j/Slf4jEventHandlerSpec.scala | 6 ++- 5 files changed, 68 insertions(+), 19 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 5d6dd65529..0556a1762e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -7,8 +7,12 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } import scala.reflect.{ Manifest } import akka.dispatch._ import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender import scala.collection.JavaConverters._ import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.Props +import akka.util.duration._ object DispatchersSpec { val config = """ @@ -16,13 +20,22 @@ object DispatchersSpec { mydispatcher { throughput = 17 } + thread-pool-dispatcher { + executor = thread-pool-executor + } } """ + + class ThreadNameEcho extends Actor { + def receive = { + case _ ⇒ sender ! Thread.currentThread.getName + } + } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { - +class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSender { + import DispatchersSpec._ val df = system.dispatchers import df._ @@ -92,6 +105,30 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) { d1 must be === d2 } + "include system name and dispatcher id in thread names for fork-join-executor" in { + system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher")) ! "what's the name?" + val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*-worker-[1-9][0-9]*)".r + expectMsgPF(5 seconds) { + case Expected(x) ⇒ + } + } + + "include system name and dispatcher id in thread names for thread-pool-executor" in { + system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.thread-pool-dispatcher")) ! "what's the name?" + val Expected = "(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)".r + expectMsgPF(5 seconds) { + case Expected(x) ⇒ + } + } + + "include system name and dispatcher id in thread names for default-dispatcher" in { + system.actorOf(Props[ThreadNameEcho]) ! "what's the name?" + val Expected = "(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*-worker-[1-9][0-9]*)".r + expectMsgPF(5 seconds) { + case Expected(x) ⇒ + } + } + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e32631108c..cdafb37dc0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -420,8 +420,13 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr })(queueFactory ⇒ _.setQueueFactory(queueFactory))) } - def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = - threadPoolConfig.createExecutorServiceFactory(name, threadFactory) + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val tf = threadFactory match { + case m: MonitorableThreadFactory ⇒ m.copy(m.name + "-" + id) + case other ⇒ other + } + threadPoolConfig.createExecutorServiceFactory(id, tf) + } } object ForkJoinExecutorConfigurator { @@ -460,7 +465,7 @@ object ForkJoinExecutorConfigurator { class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { import ForkJoinExecutorConfigurator._ - def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match { + def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match { case correct: ForkJoinPool.ForkJoinWorkerThreadFactory ⇒ correct case x ⇒ throw new IllegalStateException("The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!") } @@ -469,11 +474,16 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val parallelism: Int) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing) } - final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val tf = threadFactory match { + case m: MonitorableThreadFactory ⇒ m.copy(m.name + "-" + id) + case other ⇒ other + } new ForkJoinExecutorServiceFactory( - validate(threadFactory), + validate(tf), ThreadPoolConfig.scaledPoolSize( config.getInt("parallelism-min"), config.getDouble("parallelism-factor"), config.getInt("parallelism-max"))) + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 906c160dce..a735ea367e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -33,12 +33,7 @@ class Dispatcher( extends MessageDispatcher(_prerequisites) { protected[akka] val executorServiceFactory: ExecutorServiceFactory = - executorServiceFactoryProvider.createExecutorServiceFactory( - id, - prerequisites.threadFactory match { - case m: MonitorableThreadFactory ⇒ m.copy(m.name + "-" + id) - case other ⇒ other - }) + executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 5be5f1b0e1..4b95ab2931 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -66,7 +66,7 @@ trait ExecutorServiceFactory { * Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired */ trait ExecutorServiceFactoryProvider { - def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory } /** @@ -93,7 +93,7 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def service } } - final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = new ThreadPoolExecutorServiceFactory(threadFactory) } @@ -170,9 +170,14 @@ case class MonitorableThreadFactory(name: String, extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { protected val counter = new AtomicLong - def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)) + def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { + val t = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)) + // Name of the threads for the ForkJoinPool are not customizable. Change it here. + if (t.getName.startsWith("ForkJoinPool-")) t.setName(name + "-" + t.getName.substring("ForkJoinPool-".length)) + t + } - def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + counter.incrementAndGet())) + def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + "-" + counter.incrementAndGet())) protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) diff --git a/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jEventHandlerSpec.scala b/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jEventHandlerSpec.scala index 17af919fcc..f66a06c6ab 100644 --- a/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jEventHandlerSpec.scala +++ b/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jEventHandlerSpec.scala @@ -59,6 +59,8 @@ class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with output.reset() } + val sourceThreadRegex = "sourceThread=\\[Slf4jEventHandlerSpec-akka.actor.default-dispatcher-[1-9][0-9]*-worker-[1-9][0-9]*\\]" + "Slf4jEventHandler" must { "log error with stackTrace" in { @@ -69,7 +71,7 @@ class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]") s must include("level=[ERROR]") s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]") - s must include regex ("sourceThread=\\[ForkJoinPool-[1-9][0-9]*-worker-[1-9][0-9]*\\]") + s must include regex (sourceThreadRegex) s must include("msg=[Simulated error]") s must include("java.lang.RuntimeException: Simulated error") s must include("at akka.event.slf4j.Slf4jEventHandlerSpec") @@ -83,7 +85,7 @@ class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]") s must include("level=[INFO]") s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]") - s must include regex ("sourceThread=\\[ForkJoinPool-[1-9][0-9]*-worker-[1-9][0-9]*\\]") + s must include regex (sourceThreadRegex) s must include("msg=[test x=3 y=17]") }