Closing ticket #979

This commit is contained in:
Viktor Klang 2011-06-30 17:03:26 +02:00
parent 6d38a41a89
commit 64717cea65

View file

@ -576,18 +576,25 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
}
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
import RemoteServerSettings._
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host, port)
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
private val bootstrap = new ServerBootstrap(factory)
private val executor = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit))
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, serverModule)
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, serverModule)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("backlog", RemoteServerSettings.BACKLOG)
bootstrap.setOption("child.tcpNoDelay", true)
@ -611,6 +618,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources()
executor.releaseExternalResources()
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception
@ -740,6 +748,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
@ -753,16 +762,9 @@ class RemoteServerPipelineFactory(
case "zlib" (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ (Nil, Nil)
}
val execution = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit))
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: authenticator ::: remoteServer :: Nil
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
}