Proper thread names for fork-join-executor. See #1805

This commit is contained in:
Patrik Nordwall 2012-02-08 11:53:55 +01:00
parent b25d25e6f7
commit 1f988889c2
5 changed files with 68 additions and 19 deletions

View file

@ -7,8 +7,12 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
import scala.reflect.{ Manifest }
import akka.dispatch._
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import scala.collection.JavaConverters._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.Props
import akka.util.duration._
object DispatchersSpec {
val config = """
@ -16,13 +20,22 @@ object DispatchersSpec {
mydispatcher {
throughput = 17
}
thread-pool-dispatcher {
executor = thread-pool-executor
}
}
"""
class ThreadNameEcho extends Actor {
def receive = {
case _ sender ! Thread.currentThread.getName
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) {
class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSender {
import DispatchersSpec._
val df = system.dispatchers
import df._
@ -92,6 +105,30 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) {
d1 must be === d2
}
"include system name and dispatcher id in thread names for fork-join-executor" in {
system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher")) ! "what's the name?"
val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*-worker-[1-9][0-9]*)".r
expectMsgPF(5 seconds) {
case Expected(x)
}
}
"include system name and dispatcher id in thread names for thread-pool-executor" in {
system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.thread-pool-dispatcher")) ! "what's the name?"
val Expected = "(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)".r
expectMsgPF(5 seconds) {
case Expected(x)
}
}
"include system name and dispatcher id in thread names for default-dispatcher" in {
system.actorOf(Props[ThreadNameEcho]) ! "what's the name?"
val Expected = "(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*-worker-[1-9][0-9]*)".r
expectMsgPF(5 seconds) {
case Expected(x)
}
}
}
}

View file

@ -420,8 +420,13 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
})(queueFactory _.setQueueFactory(queueFactory)))
}
def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
threadPoolConfig.createExecutorServiceFactory(name, threadFactory)
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory m.copy(m.name + "-" + id)
case other other
}
threadPoolConfig.createExecutorServiceFactory(id, tf)
}
}
object ForkJoinExecutorConfigurator {
@ -460,7 +465,7 @@ object ForkJoinExecutorConfigurator {
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match {
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory correct
case x throw new IllegalStateException("The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!")
}
@ -469,11 +474,16 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
val parallelism: Int) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
}
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory m.copy(m.name + "-" + id)
case other other
}
new ForkJoinExecutorServiceFactory(
validate(threadFactory),
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")))
}
}

View file

@ -33,12 +33,7 @@ class Dispatcher(
extends MessageDispatcher(_prerequisites) {
protected[akka] val executorServiceFactory: ExecutorServiceFactory =
executorServiceFactoryProvider.createExecutorServiceFactory(
id,
prerequisites.threadFactory match {
case m: MonitorableThreadFactory m.copy(m.name + "-" + id)
case other other
})
executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)
protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService

View file

@ -66,7 +66,7 @@ trait ExecutorServiceFactory {
* Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired
*/
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory
}
/**
@ -93,7 +93,7 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
service
}
}
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
new ThreadPoolExecutorServiceFactory(threadFactory)
}
@ -170,9 +170,14 @@ case class MonitorableThreadFactory(name: String,
extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
protected val counter = new AtomicLong
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val t = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
// Name of the threads for the ForkJoinPool are not customizable. Change it here.
if (t.getName.startsWith("ForkJoinPool-")) t.setName(name + "-" + t.getName.substring("ForkJoinPool-".length))
t
}
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + counter.incrementAndGet()))
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + "-" + counter.incrementAndGet()))
protected def wire[T <: Thread](t: T): T = {
t.setUncaughtExceptionHandler(exceptionHandler)

View file

@ -59,6 +59,8 @@ class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with
output.reset()
}
val sourceThreadRegex = "sourceThread=\\[Slf4jEventHandlerSpec-akka.actor.default-dispatcher-[1-9][0-9]*-worker-[1-9][0-9]*\\]"
"Slf4jEventHandler" must {
"log error with stackTrace" in {
@ -69,7 +71,7 @@ class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with
s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]")
s must include("level=[ERROR]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]")
s must include regex ("sourceThread=\\[ForkJoinPool-[1-9][0-9]*-worker-[1-9][0-9]*\\]")
s must include regex (sourceThreadRegex)
s must include("msg=[Simulated error]")
s must include("java.lang.RuntimeException: Simulated error")
s must include("at akka.event.slf4j.Slf4jEventHandlerSpec")
@ -83,7 +85,7 @@ class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with
s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]")
s must include("level=[INFO]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]")
s must include regex ("sourceThread=\\[ForkJoinPool-[1-9][0-9]*-worker-[1-9][0-9]*\\]")
s must include regex (sourceThreadRegex)
s must include("msg=[test x=3 y=17]")
}