#1656 - Adding support for configuringthreads created by remoting as daemons

This commit is contained in:
Viktor Klang 2012-01-18 18:10:56 +01:00
parent 82d009fab5
commit 7a3cbdf86f
4 changed files with 38 additions and 27 deletions

View file

@ -59,6 +59,9 @@ akka {
# Reuse inbound connections for outbound messages # Reuse inbound connections for outbound messages
use-passive-connections = on use-passive-connections = on
# Whether any Threds created by the remoting should be daemons or not
daemonic = on
# accrual failure detection config # accrual failure detection config
failure-detector { failure-detector {

View file

@ -16,13 +16,14 @@ class RemoteSettings(val config: Config, val systemName: String) {
import config._ import config._
val RemoteTransport = getString("akka.remote.transport") val RemoteTransport = getString("akka.remote.transport")
val Daemonic = getBoolean("akka.remote.daemonic")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = config.getBoolean("akka.remote.use-compression") val ShouldCompressData = getBoolean("akka.remote.use-compression")
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module // TODO cluster config will go into akka-cluster/reference.conf when we enable that module
val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
@ -33,56 +34,56 @@ class RemoteSettings(val config: Config, val systemName: String) {
val clientSettings = new RemoteClientSettings val clientSettings = new RemoteClientSettings
class RemoteClientSettings { class RemoteClientSettings {
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match {
case "" None case "" None
case cookie Some(cookie) case cookie Some(cookie)
} }
val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
val MessageFrameSize = config.getBytes("akka.remote.client.message-frame-size").toInt val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt
} }
class RemoteServerSettings { class RemoteServerSettings {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
val MessageFrameSize = config.getBytes("akka.remote.server.message-frame-size").toInt val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match {
case "" None case "" None
case cookie Some(cookie) case cookie Some(cookie)
} }
val RequireCookie = { val RequireCookie = {
val requireCookie = config.getBoolean("akka.remote.server.require-cookie") val requireCookie = getBoolean("akka.remote.server.require-cookie")
if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie requireCookie
} }
val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections") val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections")
val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode") val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode")
val Hostname = config.getString("akka.remote.server.hostname") match { val Hostname = getString("akka.remote.server.hostname") match {
case "" InetAddress.getLocalHost.getHostAddress case "" InetAddress.getLocalHost.getHostAddress
case value value case value value
} }
val Port = config.getInt("akka.remote.server.port") val Port = getInt("akka.remote.server.port")
val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
val Backlog = config.getInt("akka.remote.server.backlog") val Backlog = getInt("akka.remote.server.backlog")
val ExecutionPoolKeepAlive = Duration(config.getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS)
val ExecutionPoolSize = config.getInt("akka.remote.server.execution-pool-size") match { val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match {
case sz if sz < 1 throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") case sz if sz < 1 throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
case sz sz case sz sz
} }
val MaxChannelMemorySize = config.getBytes("akka.remote.server.max-channel-memory-size") match { val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match {
case sz if sz < 0 throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes") case sz if sz < 0 throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes")
case sz sz case sz sz
} }
val MaxTotalMemorySize = config.getBytes("akka.remote.server.max-total-memory-size") match { val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match {
case sz if sz < 0 throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes") case sz if sz < 0 throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes")
case sz sz case sz sz
} }

View file

@ -26,6 +26,7 @@ import akka.actor.ActorSystemImpl
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
import java.util.concurrent._ import java.util.concurrent._
import locks.ReentrantReadWriteLock import locks.ReentrantReadWriteLock
import akka.dispatch.MonitorableThreadFactory
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null) def this(msg: String) = this(msg, null)
@ -177,7 +178,9 @@ class ActiveRemoteClient private[akka] (
runSwitch switchOn { runSwitch switchOn {
executionHandler = new ExecutionHandler(remoteSupport.executor) executionHandler = new ExecutionHandler(remoteSupport.executor)
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(remoteSupport.threadFactory),
Executors.newCachedThreadPool(remoteSupport.threadFactory)))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this))
bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true) bootstrap.setOption("keepAlive", true)
@ -366,14 +369,15 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
val serverSettings = remote.remoteSettings.serverSettings val serverSettings = remote.remoteSettings.serverSettings
val clientSettings = remote.remoteSettings.clientSettings val clientSettings = remote.remoteSettings.clientSettings
val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic)
val timer: HashedWheelTimer = new HashedWheelTimer val timer: HashedWheelTimer = new HashedWheelTimer
val executor = new OrderedMemoryAwareThreadPoolExecutor( val executor = new OrderedMemoryAwareThreadPoolExecutor(
serverSettings.ExecutionPoolSize, serverSettings.ExecutionPoolSize,
serverSettings.MaxChannelMemorySize, serverSettings.MaxChannelMemorySize,
serverSettings.MaxTotalMemorySize, serverSettings.MaxTotalMemorySize,
serverSettings.ExecutionPoolKeepAlive.length, serverSettings.ExecutionPoolKeepAlive.length,
serverSettings.ExecutionPoolKeepAlive.unit) serverSettings.ExecutionPoolKeepAlive.unit,
threadFactory)
private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock private val clientsLock = new ReentrantReadWriteLock
@ -527,7 +531,9 @@ class NettyRemoteServer(
val name = "NettyRemoteServer@" + address val name = "NettyRemoteServer@" + address
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(remoteSupport.threadFactory),
Executors.newCachedThreadPool(remoteSupport.threadFactory))
private val bootstrap = new ServerBootstrap(factory) private val bootstrap = new ServerBootstrap(factory)

View file

@ -16,6 +16,7 @@ class RemoteConfigSpec extends AkkaSpec("") {
getString("akka.remote.secure-cookie") must equal("") getString("akka.remote.secure-cookie") must equal("")
getBoolean("akka.remote.use-passive-connections") must equal(true) getBoolean("akka.remote.use-passive-connections") must equal(true)
getMilliseconds("akka.remote.backoff-timeout") must equal(0) getMilliseconds("akka.remote.backoff-timeout") must equal(0)
getBoolean("akka.remote.daemonic") must equal(true)
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
//akka.remote.server //akka.remote.server