diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 8984b39812..7bc49e346a 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -242,7 +242,8 @@ akka { # This settings controls how long a system will be quarantined after # catastrophic communication failures that result in the loss of system # messages. Quarantining prevents communication with the remote system - # of a given UID. + # of a given UID. This function can be disabled by setting the value + # to "off". quarantine-systems-for = 60s # This setting defines the maximum number of unacknowledged system messages diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index fd57f31735..3b1beb6395 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -69,10 +69,11 @@ class RemoteSettings(val config: Config) { getInt("akka.remote.system-message-buffer-size") } requiring (_ > 0, "system-message-buffer-size must be > 0") - val QuarantineDuration: FiniteDuration = { - if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Zero - else Duration(getMilliseconds("akka.remote.quarantine-systems-for"), MILLISECONDS) - } requiring (_ >= Duration.Zero, "resend-interval must be > 0 or off") + val QuarantineDuration: Duration = { + if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Undefined + else Duration(getMilliseconds("akka.remote.quarantine-systems-for"), MILLISECONDS) requiring + (_ >= Duration.Zero, "quarantine-systems-for must be > 0 or off") + } val CommandAckTimeout: Timeout = { Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS)) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index d4f919e485..0e34c93e0f 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -381,20 +381,24 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Stop case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒ - if (settings.QuarantineDuration > Duration.Zero) { - log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " + - "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + - "from this situation.", remoteAddress, uid) - endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) + settings.QuarantineDuration match { + case d: FiniteDuration ⇒ + log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " + + "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + + "from this situation.", remoteAddress, uid) + endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) + case _ ⇒ // disabled } context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ - if (settings.QuarantineDuration > Duration.Zero) { - log.error("Association to [{}] with unknown UID is irrecoverably failed. " + - "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) - endpoints.markAsFailed(sender, Deadline.now + settings.QuarantineDuration) + settings.QuarantineDuration match { + case d: FiniteDuration ⇒ + log.error("Association to [{}] with unknown UID is irrecoverably failed. " + + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) + endpoints.markAsFailed(sender, Deadline.now + d) + case _ ⇒ } context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop @@ -434,6 +438,22 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends addressesPromise.success(transportsAndAddresses) case ManagementCommand(_) ⇒ sender ! ManagementCommandAck(false) + case Quarantine(address, uid) ⇒ + settings.QuarantineDuration match { + case d: FiniteDuration ⇒ + // Stop writers + endpoints.writableEndpointWithPolicyFor(address) match { + case Some(Pass(endpoint)) ⇒ context.stop(endpoint) + case _ ⇒ // nothing to stop + } + // Stop inbound read-only associations + endpoints.readOnlyEndpointFor(address) match { + case Some(endpoint) ⇒ context.stop(endpoint) + case _ ⇒ // nothing to stop + } + endpoints.markAsQuarantined(address, uid, Deadline.now + d) + case _ ⇒ // Ignore + } case StartupFinished ⇒ context.become(accepting) case ShutdownAndFlush ⇒