#1703 & #1693 - moving daemonicity to one place, and in doing so creating a thread factory in ActorSystem

This commit is contained in:
Viktor Klang 2012-01-20 12:30:19 +01:00
parent 03bc15feb1
commit 8dfe619140
14 changed files with 57 additions and 48 deletions

View file

@ -23,6 +23,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getString("akka.version") must equal("2.0-SNAPSHOT") getString("akka.version") must equal("2.0-SNAPSHOT")
settings.ConfigVersion must equal("2.0-SNAPSHOT") settings.ConfigVersion must equal("2.0-SNAPSHOT")
getBoolean("akka.daemonic") must equal(false)
getString("akka.actor.default-dispatcher.type") must equal("Dispatcher") getString("akka.actor.default-dispatcher.type") must equal("Dispatcher")
getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000) getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000)
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0) getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0)
@ -45,9 +47,6 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getMilliseconds("akka.scheduler.tickDuration") must equal(100) getMilliseconds("akka.scheduler.tickDuration") must equal(100)
settings.SchedulerTickDuration must equal(100 millis) settings.SchedulerTickDuration must equal(100 millis)
getBoolean("akka.scheduler.daemonic") must equal(true)
settings.SchedulerDaemonicity must equal(true)
} }
} }
} }

View file

@ -31,10 +31,7 @@ object RoutingSpec {
""" """
class TestActor extends Actor { class TestActor extends Actor {
def receive = { def receive = { case _ }
case _
println("Hello")
}
} }
class Echo extends Actor { class Echo extends Actor {

View file

@ -33,6 +33,9 @@ akka {
# See the Akka Documentation for more info about Extensions # See the Akka Documentation for more info about Extensions
extensions = [] extensions = []
# Toggles whether the threads created by this ActorSystem should be daemons or not
daemonic = off
actor { actor {
provider = "akka.actor.LocalActorRefProvider" provider = "akka.actor.LocalActorRefProvider"
@ -155,9 +158,6 @@ akka {
# parameters # parameters
type = "Dispatcher" type = "Dispatcher"
# Toggles whether the threads created by this dispatcher should be daemons or not
daemonic = off
# Keep alive time for threads # Keep alive time for threads
keep-alive-time = 60s keep-alive-time = 60s
@ -271,6 +271,5 @@ akka {
# For more information see: http://www.jboss.org/netty/ # For more information see: http://www.jboss.org/netty/
tickDuration = 100ms tickDuration = 100ms
ticksPerWheel = 512 ticksPerWheel = 512
daemonic = on
} }
} }

View file

@ -96,7 +96,7 @@ object ActorSystem {
final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
final val SchedulerDaemonicity = getBoolean("akka.scheduler.daemonic") final val Daemonicity = getBoolean("akka.daemonic")
if (ConfigVersion != Version) if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
@ -275,6 +275,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
import ActorSystem._ import ActorSystem._
final val settings = new Settings(applicationConfig, name) final val settings = new Settings(applicationConfig, name)
final val threadFactory = new MonitorableThreadFactory(name, settings.Daemonicity)
def logConfiguration(): Unit = log.info(settings.toString) def logConfiguration(): Unit = log.info(settings.toString)
@ -361,7 +362,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
} }
} }
val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler))
val dispatcher = dispatchers.defaultGlobalDispatcher val dispatcher = dispatchers.defaultGlobalDispatcher
def terminationFuture: Future[Unit] = provider.terminationFuture def terminationFuture: Future[Unit] = provider.terminationFuture
@ -410,7 +411,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
*/ */
protected def createScheduler(): Scheduler = { protected def createScheduler(): Scheduler = {
val hwt = new HashedWheelTimer(log, val hwt = new HashedWheelTimer(log,
new MonitorableThreadFactory("DefaultScheduler", settings.SchedulerDaemonicity), threadFactory.copy(threadFactory.name + "-scheduler"),
settings.SchedulerTickDuration, settings.SchedulerTickDuration,
settings.SchedulerTicksPerWheel) settings.SchedulerTicksPerWheel)
// note that dispatcher is by-name parameter in DefaultScheduler constructor, // note that dispatcher is by-name parameter in DefaultScheduler constructor,

View file

@ -342,7 +342,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
//Apply the following options to the config if they are present in the config //Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(daemonic = config getBoolean "daemonic")) ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig())
.setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS)) .setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout") .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max") .setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")

View file

@ -32,7 +32,14 @@ class Dispatcher(
val shutdownTimeout: Duration) val shutdownTimeout: Duration)
extends MessageDispatcher(_prerequisites) { extends MessageDispatcher(_prerequisites) {
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id) protected[akka] val executorServiceFactory: ExecutorServiceFactory =
executorServiceFactoryProvider.createExecutorServiceFactory(
id,
prerequisites.threadFactory match {
case m: MonitorableThreadFactory m.copy(m.name + "-" + id)
case other other
})
protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService lazy val executor = executorServiceFactory.createExecutorService
}) })

View file

@ -4,8 +4,6 @@
package akka.dispatch package akka.dispatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.ConcurrentHashMap
import akka.actor.newUuid import akka.actor.newUuid
import akka.util.{ Duration, ReflectiveAccess } import akka.util.{ Duration, ReflectiveAccess }
import akka.actor.ActorSystem import akka.actor.ActorSystem
@ -17,14 +15,17 @@ import com.typesafe.config.ConfigFactory
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.event.Logging.Warning import akka.event.Logging.Warning
import akka.actor.Props import akka.actor.Props
import java.util.concurrent.{ ThreadFactory, TimeUnit, ConcurrentHashMap }
trait DispatcherPrerequisites { trait DispatcherPrerequisites {
def threadFactory: ThreadFactory
def eventStream: EventStream def eventStream: EventStream
def deadLetterMailbox: Mailbox def deadLetterMailbox: Mailbox
def scheduler: Scheduler def scheduler: Scheduler
} }
case class DefaultDispatcherPrerequisites( case class DefaultDispatcherPrerequisites(
val threadFactory: ThreadFactory,
val eventStream: EventStream, val eventStream: EventStream,
val deadLetterMailbox: Mailbox, val deadLetterMailbox: Mailbox,
val scheduler: Scheduler) extends DispatcherPrerequisites val scheduler: Scheduler) extends DispatcherPrerequisites

View file

@ -16,6 +16,7 @@ object ThreadPoolConfig {
val defaultCorePoolSize: Int = 16 val defaultCorePoolSize: Int = 16
val defaultMaxPoolSize: Int = 128 val defaultMaxPoolSize: Int = 128
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS) val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
val defaultRejectionPolicy: RejectedExecutionHandler = new SaneRejectedExecutionHandler()
def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = { def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = {
import scala.math.{ min, max } import scala.math.{ min, max }
@ -54,7 +55,7 @@ trait ExecutorServiceFactory {
* Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired * Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired
*/ */
trait ExecutorServiceFactoryProvider { trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(name: String): ExecutorServiceFactory def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory
} }
/** /**
@ -65,16 +66,24 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(), queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
daemonic: Boolean = false) rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
extends ExecutorServiceFactoryProvider { extends ExecutorServiceFactoryProvider {
class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = { def createExecutorService: ExecutorService = {
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, new SaneRejectedExecutionHandler) val service = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
threadTimeout.length,
threadTimeout.unit,
queueFactory(),
threadFactory,
rejectionPolicy)
service.allowCoreThreadTimeOut(allowCorePoolTimeout) service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service service
} }
} }
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ThreadPoolExecutorServiceFactory(new MonitorableThreadFactory(name, daemonic)) final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
new ThreadPoolExecutorServiceFactory(threadFactory)
} }
trait DispatcherBuilder { trait DispatcherBuilder {
@ -143,16 +152,20 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c)) def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
} }
class MonitorableThreadFactory(val name: String, val daemonic: Boolean = false) extends ThreadFactory { object MonitorableThreadFactory {
val doNothing: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () }
}
case class MonitorableThreadFactory(name: String,
daemonic: Boolean,
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
extends ThreadFactory {
protected val counter = new AtomicLong protected val counter = new AtomicLong
protected val doNothing: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) = {}
}
def newThread(runnable: Runnable) = { def newThread(runnable: Runnable) = {
val t = new Thread(runnable, name + counter.incrementAndGet()) val t = new Thread(runnable, name + counter.incrementAndGet())
t.setUncaughtExceptionHandler(doNothing) t.setUncaughtExceptionHandler(exceptionHandler)
t.setDaemon(daemonic) t.setDaemon(daemonic)
t t
} }

View file

@ -45,7 +45,7 @@ class SquarerImpl(val name: String) extends Squarer {
//#typed-actor-impl-methods //#typed-actor-impl-methods
} }
//#typed-actor-impl //#typed-actor-impl
import java.lang.Integer.{ parseInt println } //Mr funny man avoids printing to stdout AND keeping docs alright
//#typed-actor-supercharge //#typed-actor-supercharge
trait Foo { trait Foo {
def doFoo(times: Int): Unit = println("doFoo(" + times + ")") def doFoo(times: Int): Unit = println("doFoo(" + times + ")")

View file

@ -22,8 +22,6 @@ object DispatcherDocSpec {
my-dispatcher { my-dispatcher {
# Dispatcher is the name of the event-based dispatcher # Dispatcher is the name of the event-based dispatcher
type = Dispatcher type = Dispatcher
# Toggles whether the threads created by this dispatcher should be daemons or not
daemonic = off
# minimum number of threads to cap factor-based core number to # minimum number of threads to cap factor-based core number to
core-pool-size-min = 2 core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor) # No of core threads ... ceil(available processors * factor)

View file

@ -59,9 +59,6 @@ akka {
# Reuse inbound connections for outbound messages # Reuse inbound connections for outbound messages
use-passive-connections = on use-passive-connections = on
# Whether any Threds created by the remoting should be daemons or not
daemonic = on
# accrual failure detection config # accrual failure detection config
failure-detector { failure-detector {
@ -84,13 +81,11 @@ akka {
compute-grid-dispatcher { compute-grid-dispatcher {
# defaults to same settings as default-dispatcher # defaults to same settings as default-dispatcher
name = ComputeGridDispatcher name = ComputeGridDispatcher
daemonic = on
} }
# The dispatcher used for the system actor "network-event-sender" # The dispatcher used for the system actor "network-event-sender"
network-event-sender-dispatcher { network-event-sender-dispatcher {
type = PinnedDispatcher type = PinnedDispatcher
daemonic = on
} }
server { server {

View file

@ -16,7 +16,6 @@ class RemoteSettings(val config: Config, val systemName: String) {
import config._ import config._
val RemoteTransport = getString("akka.remote.transport") val RemoteTransport = getString("akka.remote.transport")
val Daemonic = getBoolean("akka.remote.daemonic")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = getBoolean("akka.remote.use-compression") val ShouldCompressData = getBoolean("akka.remote.use-compression")

View file

@ -369,7 +369,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
val serverSettings = remote.remoteSettings.serverSettings val serverSettings = remote.remoteSettings.serverSettings
val clientSettings = remote.remoteSettings.clientSettings val clientSettings = remote.remoteSettings.clientSettings
val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) val threadFactory = _system.threadFactory.copy(_system.threadFactory.name + "-remote")
val timer: HashedWheelTimer = new HashedWheelTimer val timer: HashedWheelTimer = new HashedWheelTimer
val executor = new OrderedMemoryAwareThreadPoolExecutor( val executor = new OrderedMemoryAwareThreadPoolExecutor(
serverSettings.ExecutionPoolSize, serverSettings.ExecutionPoolSize,
@ -535,23 +535,24 @@ class NettyRemoteServer(
Executors.newCachedThreadPool(remoteSupport.threadFactory), Executors.newCachedThreadPool(remoteSupport.threadFactory),
Executors.newCachedThreadPool(remoteSupport.threadFactory)) Executors.newCachedThreadPool(remoteSupport.threadFactory))
private val bootstrap = new ServerBootstrap(factory)
private val executionHandler = new ExecutionHandler(remoteSupport.executor) private val executionHandler = new ExecutionHandler(remoteSupport.executor)
// group of open channels, used for clean-up // group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport) val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport)
bootstrap.setPipelineFactory(pipelineFactory) private val bootstrap: ServerBootstrap = {
bootstrap.setOption("backlog", Backlog) val b = new ServerBootstrap(factory)
bootstrap.setOption("child.tcpNoDelay", true) b.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.keepAlive", true) b.setOption("backlog", Backlog)
bootstrap.setOption("child.reuseAddress", true) b.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) b.setOption("child.keepAlive", true)
b.setOption("child.reuseAddress", true)
b.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis)
b
}
openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port))) openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port)))
remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport))
def shutdown() { def shutdown() {
try { try {

View file

@ -16,7 +16,6 @@ class RemoteConfigSpec extends AkkaSpec("") {
getString("akka.remote.secure-cookie") must equal("") getString("akka.remote.secure-cookie") must equal("")
getBoolean("akka.remote.use-passive-connections") must equal(true) getBoolean("akka.remote.use-passive-connections") must equal(true)
getMilliseconds("akka.remote.backoff-timeout") must equal(0) getMilliseconds("akka.remote.backoff-timeout") must equal(0)
getBoolean("akka.remote.daemonic") must equal(true)
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
//akka.remote.server //akka.remote.server