diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index ef276f5a83..2bca934cdb 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -102,19 +102,31 @@ private[remote] class EndpointException(msg: String, cause: Throwable) extends A def this(msg: String) = this(msg, null) } +/** + * INTERNAL API + */ +private[remote] trait AssociationProblem + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[remote] case class ShutDownAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) + extends EndpointException("Shut down address: " + remoteAddress, cause) with AssociationProblem + /** * INTERNAL API */ @SerialVersionUID(1L) private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) - extends EndpointException("Invalid address: " + remoteAddress, cause) + extends EndpointException("Invalid address: " + remoteAddress, cause) with AssociationProblem /** * INTERNAL API */ @SerialVersionUID(1L) private[remote] case class HopelessAssociation(localAddress: Address, remoteAddress: Address, uid: Option[Int], cause: Throwable) - extends EndpointException("Catastrophic association error.") + extends EndpointException("Catastrophic association error.") with AssociationProblem /** * INTERNAL API @@ -128,13 +140,6 @@ private[remote] class EndpointDisassociatedException(msg: String) extends Endpoi @SerialVersionUID(1L) private[remote] class EndpointAssociationException(msg: String, cause: Throwable) extends EndpointException(msg, cause) -/** - * INTERNAL API - */ -@SerialVersionUID(1L) -private[remote] class QuarantinedUidException(uid: Int, remoteAddress: Address) - extends EndpointException(s"Refused association to [$remoteAddress] because its UID [$uid] is quarantined.") - /** * INTERNAL API */ @@ -178,7 +183,7 @@ private[remote] class ReliableDeliverySupervisor( def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) { - case e @ (_: InvalidAssociation | _: HopelessAssociation | _: QuarantinedUidException) ⇒ Escalate + case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ uidConfirmed = false // Need confirmation of UID again if (retryGateEnabled) { @@ -790,7 +795,7 @@ private[remote] class EndpointReader( case AssociationHandle.Unknown ⇒ context.stop(self) case AssociationHandle.Shutdown ⇒ - throw InvalidAssociation( + throw ShutDownAssociation( localAddress, remoteAddress, InvalidAssociationException("The remote system terminated the association because it is shutting down.")) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index e4924ec761..deae9e63d0 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -332,6 +332,10 @@ private[remote] object EndpointManager { case _ ⇒ false } + /** + * Marking an endpoint as failed means that we will not try to connect to the remote system within + * the gated period but it is ok for the remote system to try to connect to us. + */ def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit = if (isWritable(endpoint)) { addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease) @@ -392,7 +396,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends OneForOneStrategy(loggingEnabled = false) { case InvalidAssociation(localAddress, remoteAddress, _) ⇒ log.warning("Tried to associate with unreachable remote address [{}]. " + - "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) + "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.", + remoteAddress, settings.UnknownAddressGateClosedFor.toMillis) + endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) + context.system.eventStream.publish(AddressTerminated(remoteAddress)) + Stop + + case ShutDownAssociation(localAddress, remoteAddress, _) ⇒ + log.debug("Remote system with address [{}] has shut down. " + + "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.", + remoteAddress, settings.UnknownAddressGateClosedFor.toMillis) endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop @@ -418,9 +431,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop - case _: QuarantinedUidException ⇒ - Stop - case NonFatal(e) ⇒ // logging e match { diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 66534ef5d6..91de2f5cfc 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -205,7 +205,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "send warning message for wrong address" in { - filterEvents(EventFilter.warning(pattern = "Address is now quarantined", occurrences = 1)) { + filterEvents(EventFilter.warning(pattern = "Address is now gated for ", occurrences = 1)) { system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping" } }