From 6db3e59ce1e8f034083994f5e5549e2822633a50 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 19 Jan 2012 15:06:58 +0100 Subject: [PATCH] Restructuring ActiveRemoteClient, moving to a shared NioClientChannelFactory, making the timer optionally a daemon, switching to channel groups --- .../remote/netty/NettyRemoteSupport.scala | 89 +++++++++++-------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index ea240858d9..db58b6b23d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -9,8 +9,7 @@ import akka.remote._ import RemoteProtocol._ import akka.util._ import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } @@ -121,6 +120,7 @@ class PassiveRemoteClient(val currentChannel: Channel, class ActiveRemoteClient private[akka] ( remoteSupport: NettyRemoteSupport, remoteAddress: RemoteNettyAddress, + localAddress: RemoteSystemAddress[ParsedTransportAddress], val loader: Option[ClassLoader] = None) extends RemoteClient(remoteSupport, remoteAddress) { @@ -132,7 +132,11 @@ class ActiveRemoteClient private[akka] ( @volatile private var bootstrap: ClientBootstrap = _ @volatile - private[remote] var connection: ChannelFuture = _ + private var connection: ChannelFuture = _ + @volatile + private[remote] var openChannels: DefaultChannelGroup = _ + @volatile + private var executionHandler: ExecutionHandler = _ @volatile private var reconnectionTimeWindowStart = 0L @@ -141,10 +145,6 @@ class ActiveRemoteClient private[akka] ( def currentChannel = connection.getChannel - private val senderRemoteAddress = remoteSupport.remote.remoteAddress - @volatile - private var executionHandler: ExecutionHandler = _ - /** * Connect to remote server. */ @@ -154,9 +154,9 @@ class ActiveRemoteClient private[akka] ( val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(senderRemoteAddress.system) - .setHostname(senderRemoteAddress.transport.host) - .setPort(senderRemoteAddress.transport.port) + .setSystem(localAddress.system) + .setHostname(localAddress.transport.host) + .setPort(localAddress.transport.port) .build) connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build)) } @@ -164,7 +164,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -176,11 +176,11 @@ class ActiveRemoteClient private[akka] ( } runSwitch switchOn { + openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) + executionHandler = new ExecutionHandler(remoteSupport.executor) - bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(remoteSupport.threadFactory), - Executors.newCachedThreadPool(remoteSupport.threadFactory))) + bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory) bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -188,7 +188,8 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. + + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -202,6 +203,7 @@ class ActiveRemoteClient private[akka] ( case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ connection.getChannel.close() + openChannels.remove(connection.getChannel) log.debug("Remote client reconnecting to [{}]", remoteAddress) attemptReconnect() @@ -219,13 +221,11 @@ class ActiveRemoteClient private[akka] ( if ((connection ne null) && (connection.getChannel ne null)) connection.getChannel.close() } finally { - connection = null - executionHandler = null - //Do not do this: executionHandler.releaseExternalResources(), since it's shutting down the shared threadpool try { - bootstrap.releaseExternalResources() + if (openChannels ne null) openChannels.close.awaitUninterruptibly() } finally { - bootstrap = null + connection = null + executionHandler = null } } @@ -324,7 +324,10 @@ class ActiveRemoteClientHandler( if (client.isWithinReconnectionTimeWindow) { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = - if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true) + if (client.isRunning) { + client.openChannels.remove(event.getChannel) + client.connect(reconnectIfAlreadyConnected = true) + } }, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) } else runOnceNow { client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread @@ -369,8 +372,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre val serverSettings = remote.remoteSettings.serverSettings val clientSettings = remote.remoteSettings.clientSettings + val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) - val timer: HashedWheelTimer = new HashedWheelTimer + val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory) + val executor = new OrderedMemoryAwareThreadPoolExecutor( serverSettings.ExecutionPoolSize, serverSettings.MaxChannelMemorySize, @@ -379,6 +384,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre serverSettings.ExecutionPoolKeepAlive.unit, threadFactory) + val clientChannelFactory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(threadFactory), + Executors.newCachedThreadPool(threadFactory)) + private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -411,7 +420,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, recipientAddress, loader) + val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader) client.connect() remoteClients += recipientAddress -> client client @@ -479,26 +488,20 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre /** * Server section */ - private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) + @volatile + private var currentServer: NettyRemoteServer = _ - def name = currentServer.get match { - case Some(server) ⇒ server.name - case None ⇒ remote.remoteAddress.toString + def name = currentServer match { + case null ⇒ remote.remoteAddress.toString + case server ⇒ server.name } private val _isRunning = new Switch(false) def isRunning = _isRunning.isOn - def start(loader: Option[ClassLoader] = None): Unit = { - _isRunning switchOn { - try { - currentServer.set(Some(new NettyRemoteServer(this, loader, address))) - } catch { - case e: Exception ⇒ notifyListeners(RemoteServerError(e, this)) - } - } - } + def start(loader: Option[ClassLoader] = None): Unit = + _isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) } /** * Common section @@ -512,9 +515,19 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } finally { clientsLock.writeLock().unlock() try { - currentServer.getAndSet(None) foreach { _.shutdown() } + val s = currentServer + currentServer = null + s.shutdown() } finally { - try { timer.stop() } finally { executor.shutdown() } + try { + timer.stop() + } finally { + try { + clientChannelFactory.releaseExternalResources() + } finally { + executor.shutdown() + } + } } } }