diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index cac020bf70..0157dbfe52 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -59,6 +59,9 @@ akka { # Reuse inbound connections for outbound messages use-passive-connections = on + # Whether any Threds created by the remoting should be daemons or not + daemonic = on + # accrual failure detection config failure-detector { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index ad0356c009..eae0741844 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -16,13 +16,14 @@ class RemoteSettings(val config: Config, val systemName: String) { import config._ val RemoteTransport = getString("akka.remote.transport") + val Daemonic = getBoolean("akka.remote.daemonic") val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") - val ShouldCompressData = config.getBoolean("akka.remote.use-compression") - val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) - val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) - val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) - val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) + val ShouldCompressData = getBoolean("akka.remote.use-compression") + val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) + val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) + val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) + val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) // TODO cluster config will go into akka-cluster/reference.conf when we enable that module val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { @@ -33,56 +34,56 @@ class RemoteSettings(val config: Config, val systemName: String) { val clientSettings = new RemoteClientSettings class RemoteClientSettings { - val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { + val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { case "" ⇒ None case cookie ⇒ Some(cookie) } - val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) - val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) - val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) - val MessageFrameSize = config.getBytes("akka.remote.client.message-frame-size").toInt + val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) + val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) + val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) + val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt } class RemoteServerSettings { import scala.collection.JavaConverters._ - val MessageFrameSize = config.getBytes("akka.remote.server.message-frame-size").toInt - val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { + val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt + val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { case "" ⇒ None case cookie ⇒ Some(cookie) } val RequireCookie = { - val requireCookie = config.getBoolean("akka.remote.server.require-cookie") + val requireCookie = getBoolean("akka.remote.server.require-cookie") if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") requireCookie } - val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections") + val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections") - val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode") - val Hostname = config.getString("akka.remote.server.hostname") match { + val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode") + val Hostname = getString("akka.remote.server.hostname") match { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } - val Port = config.getInt("akka.remote.server.port") - val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) + val Port = getInt("akka.remote.server.port") + val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) - val Backlog = config.getInt("akka.remote.server.backlog") + val Backlog = getInt("akka.remote.server.backlog") - val ExecutionPoolKeepAlive = Duration(config.getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) + val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) - val ExecutionPoolSize = config.getInt("akka.remote.server.execution-pool-size") match { + val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match { case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") case sz ⇒ sz } - val MaxChannelMemorySize = config.getBytes("akka.remote.server.max-channel-memory-size") match { + val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match { case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes") case sz ⇒ sz } - val MaxTotalMemorySize = config.getBytes("akka.remote.server.max-total-memory-size") match { + val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match { case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes") case sz ⇒ sz } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b9252bd9fc..ea240858d9 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -26,6 +26,7 @@ import akka.actor.ActorSystemImpl import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } import java.util.concurrent._ import locks.ReentrantReadWriteLock +import akka.dispatch.MonitorableThreadFactory class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) @@ -177,7 +178,9 @@ class ActiveRemoteClient private[akka] ( runSwitch switchOn { executionHandler = new ExecutionHandler(remoteSupport.executor) - bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) + bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(remoteSupport.threadFactory), + Executors.newCachedThreadPool(remoteSupport.threadFactory))) bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -366,14 +369,15 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre val serverSettings = remote.remoteSettings.serverSettings val clientSettings = remote.remoteSettings.clientSettings - + val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) val timer: HashedWheelTimer = new HashedWheelTimer val executor = new OrderedMemoryAwareThreadPoolExecutor( serverSettings.ExecutionPoolSize, serverSettings.MaxChannelMemorySize, serverSettings.MaxTotalMemorySize, serverSettings.ExecutionPoolKeepAlive.length, - serverSettings.ExecutionPoolKeepAlive.unit) + serverSettings.ExecutionPoolKeepAlive.unit, + threadFactory) private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -527,7 +531,9 @@ class NettyRemoteServer( val name = "NettyRemoteServer@" + address - private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) + private val factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(remoteSupport.threadFactory), + Executors.newCachedThreadPool(remoteSupport.threadFactory)) private val bootstrap = new ServerBootstrap(factory) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 03a343f3b1..49502b99ea 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -16,6 +16,7 @@ class RemoteConfigSpec extends AkkaSpec("") { getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) getMilliseconds("akka.remote.backoff-timeout") must equal(0) + getBoolean("akka.remote.daemonic") must equal(true) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) //akka.remote.server