diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e7eb7d6b95..b81640afe4 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -576,18 +576,25 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with } class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { - + import RemoteServerSettings._ val name = "NettyRemoteServer@" + host + ":" + port val address = new InetSocketAddress(host, port) private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) private val bootstrap = new ServerBootstrap(factory) + private val executor = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor( + EXECUTION_POOL_SIZE, + MAX_CHANNEL_MEMORY_SIZE, + MAX_TOTAL_MEMORY_SIZE, + EXECUTION_POOL_KEEPALIVE.length, + EXECUTION_POOL_KEEPALIVE.unit)) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, serverModule) + val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, serverModule) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", RemoteServerSettings.BACKLOG) bootstrap.setOption("child.tcpNoDelay", true) @@ -611,6 +618,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, openChannels.disconnect openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources() + executor.releaseExternalResources() serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { case e: Exception ⇒ @@ -740,6 +748,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, + val executor: ExecutionHandler, val loader: Option[ClassLoader], val server: NettyRemoteServerModule) extends ChannelPipelineFactory { import RemoteServerSettings._ @@ -753,16 +762,9 @@ class RemoteServerPipelineFactory( case "zlib" ⇒ (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) case _ ⇒ (Nil, Nil) } - val execution = new ExecutionHandler( - new OrderedMemoryAwareThreadPoolExecutor( - EXECUTION_POOL_SIZE, - MAX_CHANNEL_MEMORY_SIZE, - MAX_TOTAL_MEMORY_SIZE, - EXECUTION_POOL_KEEPALIVE.length, - EXECUTION_POOL_KEEPALIVE.unit)) val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) - val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: authenticator ::: remoteServer :: Nil + val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } }