Fixing ticket #652 - Reaping expired futures
This commit is contained in:
parent
bc423fcc76
commit
83f9c4982f
3 changed files with 19 additions and 2 deletions
|
|
@ -16,6 +16,7 @@ object RemoteClientSettings {
|
|||
val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
|
||||
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
|
||||
val REAP_FUTURES_DELAY = Duration(config.getInt("akka.remote.client.reap-futures-delay", 5), TIME_UNIT)
|
||||
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -284,6 +284,19 @@ class ActiveRemoteClient private[akka] (
|
|||
log.slf4j.debug("Remote client connection failed", connection.getCause)
|
||||
false
|
||||
} else {
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
if(isRunning) {
|
||||
log.slf4j.debug("Reaping expired futures awaiting completion from [{}]", remoteAddress)
|
||||
val i = futures.entrySet.iterator
|
||||
while(i.hasNext) {
|
||||
val e = i.next
|
||||
if (e.getValue.isExpired)
|
||||
futures.remove(e.getKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, RemoteClientSettings.REAP_FUTURES_DELAY.length, RemoteClientSettings.REAP_FUTURES_DELAY.unit)
|
||||
notifyListeners(RemoteClientStarted(module, remoteAddress))
|
||||
true
|
||||
}
|
||||
|
|
@ -440,8 +453,10 @@ class ActiveRemoteClientHandler(
|
|||
if (client.isWithinReconnectionTimeWindow) {
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
client.openChannels.remove(event.getChannel)
|
||||
client.connect(reconnectIfAlreadyConnected = true)
|
||||
if (client.isRunning) {
|
||||
client.openChannels.remove(event.getChannel)
|
||||
client.connect(reconnectIfAlreadyConnected = true)
|
||||
}
|
||||
}
|
||||
}, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
|
||||
} else spawn { client.shutdown }
|
||||
|
|
|
|||
|
|
@ -135,6 +135,7 @@ akka {
|
|||
reconnect-delay = 5
|
||||
read-timeout = 10
|
||||
message-frame-size = 1048576
|
||||
reap-futures-delay = 5
|
||||
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue