diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index f7faa527e4..518be776cc 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } import scala.concurrent.{ ExecutionContext, Promise, Future } import scala.util.Random import scala.util.control.NonFatal +import akka.dispatch.ThreadPoolConfig object NettyTransportSettings { sealed trait Mode @@ -77,6 +78,16 @@ class NettyTransportSettings(config: Config) { val SslSettings: Option[SslSettings] = if (EnableSsl) Some(new SslSettings(config.getConfig("ssl"))) else None + val ServerSocketWorkerPoolSize: Int = computeWPS(config.getConfig("server-socket-worker-pool")) + + val ClientSocketWorkerPoolSize: Int = computeWPS(config.getConfig("client-socket-worker-pool")) + + private def computeWPS(config: Config): Int = + ThreadPoolConfig.scaledPoolSize( + config.getInt("pool-size-min"), + config.getDouble("pool-size-factor"), + config.getInt("pool-size-max")) + } trait HasTransport { @@ -162,19 +173,16 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s val channels = new DefaultDisposableChannelGroup("netty-transport-" + Random.nextString(20)) - private def executor: Executor = UseDispatcherForIo match { - case Some(dispatcherName) ⇒ system.dispatchers.lookup(dispatcherName) - case None ⇒ Executors.newCachedThreadPool() // FIXME: apply patch from #2659 when available - } + private def executor: Executor = UseDispatcherForIo.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() private val clientChannelFactory: ChannelFactory = TransportMode match { - case Tcp ⇒ new NioClientSocketChannelFactory(executor, executor) - case Udp ⇒ new NioDatagramChannelFactory(executor) + case Tcp ⇒ new NioClientSocketChannelFactory(executor, executor, ClientSocketWorkerPoolSize) + case Udp ⇒ new NioDatagramChannelFactory(executor, ClientSocketWorkerPoolSize) } private val serverChannelFactory: ChannelFactory = TransportMode match { - case Tcp ⇒ new NioServerSocketChannelFactory(executor, executor) - case Udp ⇒ new NioDatagramChannelFactory(executor) + case Tcp ⇒ new NioServerSocketChannelFactory(executor, executor, ServerSocketWorkerPoolSize) + case Udp ⇒ new NioDatagramChannelFactory(executor, ServerSocketWorkerPoolSize) } private def newPipeline: DefaultChannelPipeline = { diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index caa7fe3293..3bf8850339 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -45,6 +45,18 @@ object RemotingSpec { backlog = 4096 hostname = localhost enable-ssl = false + + server-socket-worker-pool { + pool-size-min = 2 + pool-size-factor = 1.0 + pool-size-max = 8 + } + + client-socket-worker-pool { + pool-size-min = 2 + pool-size-factor = 1.0 + pool-size-max = 8 + } } common-ssl-settings {