diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index dafb703e41..98c6c99695 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -132,8 +132,6 @@ class ActiveRemoteClient private[akka] ( private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ - @volatile - private[remote] var openChannels: DefaultChannelGroup = _ @volatile private var reconnectionTimeWindowStart = 0L @@ -165,7 +163,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. + connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -177,8 +175,6 @@ class ActiveRemoteClient private[akka] ( } runSwitch switchOn { - openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) - executionHandler = new ExecutionHandler(remoteSupport.executor) bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) @@ -189,9 +185,7 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - - val channel = connection.awaitUninterruptibly.getChannel - openChannels.add(channel) + connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -204,7 +198,6 @@ class ActiveRemoteClient private[akka] ( } match { case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ - openChannels.remove(connection.getChannel) connection.getChannel.close() log.debug("Remote client reconnecting to [{}]", remoteAddress) @@ -219,13 +212,12 @@ class ActiveRemoteClient private[akka] ( log.debug("Shutting down remote client [{}]", name) notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress)) - openChannels.close.awaitUninterruptibly - openChannels = null + connection.getChannel.close() + connection = null executionHandler.releaseExternalResources() executionHandler = null bootstrap.releaseExternalResources() bootstrap = null - connection = null log.debug("[{}] has been shut down", name) } @@ -321,12 +313,8 @@ class ActiveRemoteClientHandler( override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { if (client.isWithinReconnectionTimeWindow) { timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = { - if (client.isRunning) { - client.openChannels.remove(event.getChannel) - client.connect(reconnectIfAlreadyConnected = true) - } - } + def run(timeout: Timeout) = + if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true) }, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) } else runOnceNow { client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread