diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala index c38bf8395a..cb52129007 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -16,6 +16,7 @@ object RemoteClientSettings { val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) + val REAP_FUTURES_DELAY = Duration(config.getInt("akka.remote.client.reap-futures-delay", 5), TIME_UNIT) val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 80ac2f08a5..4fbfad511c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -284,6 +284,19 @@ class ActiveRemoteClient private[akka] ( log.slf4j.debug("Remote client connection failed", connection.getCause) false } else { + timer.newTimeout(new TimerTask() { + def run(timeout: Timeout) = { + if(isRunning) { + log.slf4j.debug("Reaping expired futures awaiting completion from [{}]", remoteAddress) + val i = futures.entrySet.iterator + while(i.hasNext) { + val e = i.next + if (e.getValue.isExpired) + futures.remove(e.getKey) + } + } + } + }, RemoteClientSettings.REAP_FUTURES_DELAY.length, RemoteClientSettings.REAP_FUTURES_DELAY.unit) notifyListeners(RemoteClientStarted(module, remoteAddress)) true } @@ -440,8 +453,10 @@ class ActiveRemoteClientHandler( if (client.isWithinReconnectionTimeWindow) { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { - client.openChannels.remove(event.getChannel) - client.connect(reconnectIfAlreadyConnected = true) + if (client.isRunning) { + client.openChannels.remove(event.getChannel) + client.connect(reconnectIfAlreadyConnected = true) + } } }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) } else spawn { client.shutdown } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 4b0397571d..dda07c8030 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -135,6 +135,7 @@ akka { reconnect-delay = 5 read-timeout = 10 message-frame-size = 1048576 + reap-futures-delay = 5 reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for } }