Connection retries to shutdown node, see #3326
* In EC2 connection time out is around 1 minute. A few messages were sent after quarantining and these caused endless restarts, and connect attempts with 1 minute interval. * This change makes sure that the endpoint is stopped after the first failed connection attempt. * Changed default settings for netty connection-timeout, and matching retry window to allow for 3 restarts
This commit is contained in:
parent
d0ed7385b2
commit
3d1c0a7325
4 changed files with 16 additions and 7 deletions
|
|
@ -233,8 +233,8 @@ akka {
|
||||||
# connections. The settings below together control the maximum number of
|
# connections. The settings below together control the maximum number of
|
||||||
# reattempts in a given time window. The number of reattempts during
|
# reattempts in a given time window. The number of reattempts during
|
||||||
# a window of "retry-window" will be maximum "maximum-retries-in-window".
|
# a window of "retry-window" will be maximum "maximum-retries-in-window".
|
||||||
retry-window = 3 s
|
retry-window = 60 s
|
||||||
maximum-retries-in-window = 5
|
maximum-retries-in-window = 3
|
||||||
|
|
||||||
# The length of time to gate an address whose name lookup has failed.
|
# The length of time to gate an address whose name lookup has failed.
|
||||||
# No connection attempts will be made to an address while it remains
|
# No connection attempts will be made to an address while it remains
|
||||||
|
|
@ -324,7 +324,7 @@ akka {
|
||||||
|
|
||||||
# Sets the connectTimeoutMillis of all outbound connections,
|
# Sets the connectTimeoutMillis of all outbound connections,
|
||||||
# i.e. how long a connect may take until it is timed out
|
# i.e. how long a connect may take until it is timed out
|
||||||
connection-timeout = 120s
|
connection-timeout = 15 s
|
||||||
|
|
||||||
# If set to "<id.of.dispatcher>" then the specified dispatcher
|
# If set to "<id.of.dispatcher>" then the specified dispatcher
|
||||||
# will be used to accept inbound connections, and perform IO. If "" then
|
# will be used to accept inbound connections, and perform IO. If "" then
|
||||||
|
|
|
||||||
|
|
@ -454,7 +454,13 @@ private[remote] class EndpointWriter(
|
||||||
case Event(Status.Failure(e: InvalidAssociationException), _) ⇒
|
case Event(Status.Failure(e: InvalidAssociationException), _) ⇒
|
||||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
||||||
case Event(Status.Failure(e), _) ⇒
|
case Event(Status.Failure(e), _) ⇒
|
||||||
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
|
refuseUid match {
|
||||||
|
case Some(uid) ⇒
|
||||||
|
// don't try again when endpoint is quarantined
|
||||||
|
publishAndThrow(new QuarantinedUidException(uid, remoteAddress))
|
||||||
|
case None ⇒
|
||||||
|
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
|
||||||
|
}
|
||||||
case Event(inboundHandle: AkkaProtocolHandle, _) ⇒
|
case Event(inboundHandle: AkkaProtocolHandle, _) ⇒
|
||||||
refuseUid match {
|
refuseUid match {
|
||||||
case Some(uid) if inboundHandle.handshakeInfo.uid == uid ⇒
|
case Some(uid) if inboundHandle.handshakeInfo.uid == uid ⇒
|
||||||
|
|
|
||||||
|
|
@ -374,7 +374,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
else None
|
else None
|
||||||
|
|
||||||
override val supervisorStrategy =
|
override val supervisorStrategy =
|
||||||
OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) {
|
OneForOneStrategy(loggingEnabled = false) {
|
||||||
case InvalidAssociation(localAddress, remoteAddress, _) ⇒
|
case InvalidAssociation(localAddress, remoteAddress, _) ⇒
|
||||||
log.error("Tried to associate with invalid remote address [{}]. " +
|
log.error("Tried to associate with invalid remote address [{}]. " +
|
||||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||||
|
|
@ -467,6 +467,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
case Some(endpoint) ⇒ context.stop(endpoint)
|
case Some(endpoint) ⇒ context.stop(endpoint)
|
||||||
case _ ⇒ // nothing to stop
|
case _ ⇒ // nothing to stop
|
||||||
}
|
}
|
||||||
|
log.info("Address [{}] is now quarantined, all messages to this address will be delivered to dead letters.",
|
||||||
|
address)
|
||||||
endpoints.markAsQuarantined(address, uid, Deadline.now + d)
|
endpoints.markAsQuarantined(address, uid, Deadline.now + d)
|
||||||
case _ ⇒ // Ignore
|
case _ ⇒ // Ignore
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@ class RemoteConfigSpec extends AkkaSpec(
|
||||||
RetryGateClosedFor must be(Duration.Zero)
|
RetryGateClosedFor must be(Duration.Zero)
|
||||||
UnknownAddressGateClosedFor must be(1 minute)
|
UnknownAddressGateClosedFor must be(1 minute)
|
||||||
UsePassiveConnections must be(true)
|
UsePassiveConnections must be(true)
|
||||||
MaximumRetriesInWindow must be(5)
|
MaximumRetriesInWindow must be(3)
|
||||||
RetryWindow must be(3 seconds)
|
RetryWindow must be(60 seconds)
|
||||||
BackoffPeriod must be(10 millis)
|
BackoffPeriod must be(10 millis)
|
||||||
SysMsgAckTimeout must be(0.3 seconds)
|
SysMsgAckTimeout must be(0.3 seconds)
|
||||||
SysResendTimeout must be(1 seconds)
|
SysResendTimeout must be(1 seconds)
|
||||||
|
|
@ -81,6 +81,7 @@ class RemoteConfigSpec extends AkkaSpec(
|
||||||
val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp")
|
val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp")
|
||||||
|
|
||||||
c.getBytes("maximum-frame-size") must be(128000)
|
c.getBytes("maximum-frame-size") must be(128000)
|
||||||
|
c.getMilliseconds("connection-timeout") must be(15000)
|
||||||
}
|
}
|
||||||
|
|
||||||
"contain correct socket worker pool configuration values in reference.conf" in {
|
"contain correct socket worker pool configuration values in reference.conf" in {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue