From 614caa298914b31130313f226154eb3bb8518985 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 2 Oct 2012 09:31:23 +0200 Subject: [PATCH] Include dispatcher id in thread name of pinned dispatcher, see #2585 * Additional tests * Moved id append from createExecutorServiceFactory in ThreadPoolExecutorConfigurator to ThreadPoolConfig --- .../akka/actor/dispatch/DispatchersSpec.scala | 29 +++++++++++++++++-- .../akka/dispatch/AbstractDispatcher.scala | 11 ++----- .../akka/dispatch/ThreadPoolBuilder.scala | 11 +++++-- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 96c24f3068..5abcdc7a0d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -25,6 +25,13 @@ object DispatchersSpec { thread-pool-dispatcher { executor = thread-pool-executor } + my-pinned-dispatcher { + executor = thread-pool-executor + type = PinnedDispatcher + } + balancing-dispatcher { + type = BalancingDispatcher + } } """ @@ -110,7 +117,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for fork-join-executor" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher")) ! "what's the name?" val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)".r - expectMsgPF(5 seconds) { + expectMsgPF(remaining) { case Expected(x) ⇒ } } @@ -118,7 +125,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for thread-pool-executor" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.thread-pool-dispatcher")) ! "what's the name?" val Expected = "(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)".r - expectMsgPF(5 seconds) { + expectMsgPF(remaining) { case Expected(x) ⇒ } } @@ -126,7 +133,23 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend "include system name and dispatcher id in thread names for default-dispatcher" in { system.actorOf(Props[ThreadNameEcho]) ! "what's the name?" val Expected = "(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*)".r - expectMsgPF(5 seconds) { + expectMsgPF(remaining) { + case Expected(x) ⇒ + } + } + + "include system name and dispatcher id in thread names for pinned dispatcher" in { + system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher")) ! "what's the name?" + val Expected = "(DispatchersSpec-myapp.my-pinned-dispatcher-[1-9][0-9]*)".r + expectMsgPF(remaining) { + case Expected(x) ⇒ + } + } + + "include system name and dispatcher id in thread names for balancing dispatcher" in { + system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.balancing-dispatcher")) ! "what's the name?" + val Expected = "(DispatchersSpec-myapp.balancing-dispatcher-[1-9][0-9]*)".r + expectMsgPF(remaining) { case Expected(x) ⇒ } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index eeff39f2e6..23fa51bb76 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -472,15 +472,8 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr })(queueFactory ⇒ _.setQueueFactory(queueFactory))) } - def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val tf = threadFactory match { - case m: MonitorableThreadFactory ⇒ - // add the dispatcher id to the thread names - m.copy(m.name + "-" + id) - case other ⇒ other - } - threadPoolConfig.createExecutorServiceFactory(id, tf) - } + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + threadPoolConfig.createExecutorServiceFactory(id, threadFactory) } object ForkJoinExecutorConfigurator { diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 2d38128abe..67b0aa33a5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -88,8 +88,15 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def service } } - final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = - new ThreadPoolExecutorServiceFactory(threadFactory) + final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val tf = threadFactory match { + case m: MonitorableThreadFactory ⇒ + // add the dispatcher id to the thread names + m.copy(m.name + "-" + id) + case other ⇒ other + } + new ThreadPoolExecutorServiceFactory(tf) + } } object ThreadPoolConfigBuilder {