Removing the ChannelGroup from the ActiveRemoteClient since it's only going to have one channel at a time anyway

This commit is contained in:
Viktor Klang 2012-01-09 14:13:52 +01:00
parent 07b27ba3de
commit 602a036194

View file

@ -132,8 +132,6 @@ class ActiveRemoteClient private[akka] (
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var reconnectionTimeWindowStart = 0L
@ -165,7 +163,7 @@ class ActiveRemoteClient private[akka] (
def attemptReconnect(): Boolean = {
log.debug("Remote client reconnecting to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
@ -177,8 +175,6 @@ class ActiveRemoteClient private[akka] (
}
runSwitch switchOn {
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
executionHandler = new ExecutionHandler(remoteSupport.executor)
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
@ -189,9 +185,7 @@ class ActiveRemoteClient private[akka] (
log.debug("Starting remote client connection to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
@ -204,7 +198,6 @@ class ActiveRemoteClient private[akka] (
} match {
case true true
case false if reconnectIfAlreadyConnected
openChannels.remove(connection.getChannel)
connection.getChannel.close()
log.debug("Remote client reconnecting to [{}]", remoteAddress)
@ -219,13 +212,12 @@ class ActiveRemoteClient private[akka] (
log.debug("Shutting down remote client [{}]", name)
notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
openChannels.close.awaitUninterruptibly
openChannels = null
connection.getChannel.close()
connection = null
executionHandler.releaseExternalResources()
executionHandler = null
bootstrap.releaseExternalResources()
bootstrap = null
connection = null
log.debug("[{}] has been shut down", name)
}
@ -321,12 +313,8 @@ class ActiveRemoteClientHandler(
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn {
if (client.isWithinReconnectionTimeWindow) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
if (client.isRunning) {
client.openChannels.remove(event.getChannel)
client.connect(reconnectIfAlreadyConnected = true)
}
}
def run(timeout: Timeout) =
if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true)
}, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS)
} else runOnceNow {
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread