diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 6a8245e8b8..e20de6b9d0 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -181,41 +181,43 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade extends Logging with ListenerManagement { val name = "RemoteClient@" + hostname + "::" + port - @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] private val supervisors = new ConcurrentHashMap[String, ActorRef] - private val channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool, - Executors.newCachedThreadPool) - - private val bootstrap = new ClientBootstrap(channelFactory) - private val timer = new HashedWheelTimer private val remoteAddress = new InetSocketAddress(hostname, port) - private[remote] var connection: ChannelFuture = _ - private[remote] val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName); + @volatile private[remote] var isRunning = false + @volatile private var bootstrap: ClientBootstrap = _ + @volatile private var reconnectionTimeWindowStart = 0L + @volatile private[remote] var connection: ChannelFuture = _ + @volatile private[remote] var openChannels: DefaultChannelGroup = _ + @volatile private var timer: HashedWheelTimer = _ private val reconnectionTimeWindow = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis - @volatile private var reconnectionTimeWindowStart = 0L - - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) def connect = synchronized { if (!isRunning) { + openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) + timer = new HashedWheelTimer + bootstrap = new ClientBootstrap( + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool,Executors.newCachedThreadPool + ) + ) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) connection = bootstrap.connect(remoteAddress) log.info("Starting remote client connection to [%s:%s]", hostname, port) // Wait until the connection attempt succeeds or fails. val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - foreachListener(l => l ! RemoteClientError(connection.getCause, this)) + foreachListener(_ ! RemoteClientError(connection.getCause, this)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } - foreachListener(l => l ! RemoteClientStarted(this)) + foreachListener(_ ! RemoteClientStarted(this)) isRunning = true } } @@ -224,10 +226,14 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade log.info("Shutting down %s", name) if (isRunning) { isRunning = false - foreachListener(l => l ! RemoteClientShutdown(this)) + foreachListener(_ ! RemoteClientShutdown(this)) timer.stop + timer = null openChannels.close.awaitUninterruptibly + openChannels = null bootstrap.releaseExternalResources + bootstrap = null + connection = null log.info("%s has been shut down", name) } } @@ -257,7 +263,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - foreachListener(l => l ! RemoteClientError(exception, this)) + foreachListener(_ ! RemoteClientError(exception, this)) throw exception } @@ -373,12 +379,12 @@ class RemoteClientHandler( futures.remove(reply.getId) } else { val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.foreachListener(l => l ! RemoteClientError(exception, client)) + client.foreachListener(_ ! RemoteClientError(exception, client)) throw exception } } catch { case e: Exception => - client.foreachListener(l => l ! RemoteClientError(e, client)) + client.foreachListener(_ ! RemoteClientError(e, client)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -393,8 +399,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.foreachListener(l => - l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client)) + client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -404,7 +409,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.foreachListener(l => l ! RemoteClientConnected(client)) + client.foreachListener(_ ! RemoteClientConnected(client)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -421,12 +426,12 @@ class RemoteClientHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.foreachListener(l => l ! RemoteClientDisconnected(client)) + client.foreachListener(_ ! RemoteClientDisconnected(client)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.foreachListener(l => l ! RemoteClientError(event.getCause, client)) + client.foreachListener(_ ! RemoteClientError(event.getCause, client)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close }