Merge pull request #1424 from akka/wip-3326-connection-retries-patriknw

Connection retries to shutdown node, see #3326
This commit is contained in:
Patrik Nordwall 2013-05-14 11:19:09 -07:00
commit 05ba6df5ac
4 changed files with 16 additions and 7 deletions

View file

@ -233,8 +233,8 @@ akka {
# connections. The settings below together control the maximum number of
# reattempts in a given time window. The number of reattempts during
# a window of "retry-window" will be maximum "maximum-retries-in-window".
retry-window = 3 s
maximum-retries-in-window = 5
retry-window = 60 s
maximum-retries-in-window = 3
# The length of time to gate an address whose name lookup has failed.
# No connection attempts will be made to an address while it remains
@ -324,7 +324,7 @@ akka {
# Sets the connectTimeoutMillis of all outbound connections,
# i.e. how long a connect may take until it is timed out
connection-timeout = 120s
connection-timeout = 15 s
# If set to "<id.of.dispatcher>" then the specified dispatcher
# will be used to accept inbound connections, and perform IO. If "" then

View file

@ -454,7 +454,13 @@ private[remote] class EndpointWriter(
case Event(Status.Failure(e: InvalidAssociationException), _)
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
case Event(Status.Failure(e), _)
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
refuseUid match {
case Some(uid)
// don't try again when endpoint is quarantined
publishAndThrow(new QuarantinedUidException(uid, remoteAddress))
case None
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
}
case Event(inboundHandle: AkkaProtocolHandle, _)
refuseUid match {
case Some(uid) if inboundHandle.handshakeInfo.uid == uid

View file

@ -374,7 +374,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
else None
override val supervisorStrategy =
OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) {
OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, _)
log.error("Tried to associate with invalid remote address [{}]. " +
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
@ -467,6 +467,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Some(endpoint) context.stop(endpoint)
case _ // nothing to stop
}
log.info("Address [{}] is now quarantined, all messages to this address will be delivered to dead letters.",
address)
endpoints.markAsQuarantined(address, uid, Deadline.now + d)
case _ // Ignore
}

View file

@ -34,8 +34,8 @@ class RemoteConfigSpec extends AkkaSpec(
RetryGateClosedFor must be(Duration.Zero)
UnknownAddressGateClosedFor must be(1 minute)
UsePassiveConnections must be(true)
MaximumRetriesInWindow must be(5)
RetryWindow must be(3 seconds)
MaximumRetriesInWindow must be(3)
RetryWindow must be(60 seconds)
BackoffPeriod must be(10 millis)
SysMsgAckTimeout must be(0.3 seconds)
SysResendTimeout must be(1 seconds)
@ -81,6 +81,7 @@ class RemoteConfigSpec extends AkkaSpec(
val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp")
c.getBytes("maximum-frame-size") must be(128000)
c.getMilliseconds("connection-timeout") must be(15000)
}
"contain correct socket worker pool configuration values in reference.conf" in {