From a67fa18f8d1b7f55c2f2345dcb25dca98805cb1f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 Mar 2013 18:08:07 +0100 Subject: [PATCH] Reduce unwanted logging from remoting, see #2826 * Handle logging in EndpointManager supervisorStrategy * Added some more exception types to be able to differentiate failures --- .../akka/cluster/MultiNodeClusterSpec.scala | 3 ++ .../src/main/scala/akka/remote/Endpoint.scala | 19 ++++++-- .../src/main/scala/akka/remote/Remoting.scala | 44 ++++++++++++------- .../test/scala/akka/remote/RemotingSpec.scala | 7 +-- 4 files changed, 49 insertions(+), 24 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 0f3fae3c7c..f5d07b8724 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -81,6 +81,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro Seq(".*received dead letter from.*ClientDisconnected", ".*received dead letter from.*deadLetters.*PoisonPill", + ".*received dead letter from.*Disassociated", + ".*received dead letter from.*DisassociateUnderlying", + ".*received dead letter from.*HandleListenerRegistered", ".*installing context org.jboss.netty.channel.DefaultChannelPipeline.*") foreach { s ⇒ sys.eventStream.publish(Mute(EventFilter.warning(pattern = s))) } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index b5759409c9..fe3f96f6ca 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -124,9 +124,22 @@ private[remote] class EndpointException(msg: String, cause: Throwable) extends A /** * INTERNAL API */ +@SerialVersionUID(1L) private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) extends EndpointException("Invalid address: " + remoteAddress, cause) +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[remote] class EndpointDisassociatedException(msg: String) extends EndpointException(msg) + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[remote] class EndpointAssociationException(msg: String, cause: Throwable) extends EndpointException(msg, cause) + /** * INTERNAL API */ @@ -184,11 +197,9 @@ private[remote] class EndpointWriter( stash() stay() case Event(Status.Failure(e: InvalidAssociationException), _) ⇒ - log.error("Tried to associate with invalid remote address [{}]. " + - "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) case Event(Status.Failure(e), _) ⇒ - publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e)) + publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e)) case Event(inboundHandle: AssociationHandle, _) ⇒ // Assert handle == None? handle = Some(inboundHandle) @@ -246,7 +257,7 @@ private[remote] class EndpointWriter( } whenUnhandled { - case Event(Terminated(r), _) if Some(r) == reader ⇒ publishAndThrow(new EndpointException("Disassociated")) + case Event(Terminated(r), _) if r == reader.orNull ⇒ publishAndThrow(new EndpointDisassociatedException("Disassociated")) case Event(TakeOver(newHandle), _) ⇒ // Shutdown old reader handle foreach { _.disassociate() } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1be99f6f86..1fe6fc21a7 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -352,24 +352,34 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Some(context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)) else None - override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow) { - case InvalidAssociation(localAddress, remoteAddress, e) ⇒ - endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) - Stop - - case NonFatal(e) ⇒ - // Retrying immediately if the retry gate is disabled, and it is an endpoint used for writing. - if (!retryGateEnabled && endpoints.isWritable(sender)) { - // This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue - // to the restarted endpoint -- thus no messages are lost - Restart - } else { - // This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure, - // keeps throwing away messages until the retry gate becomes open (time specified in RetryGateClosedFor) - endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor) + override val supervisorStrategy = + OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) { + case InvalidAssociation(localAddress, remoteAddress, _) ⇒ + log.error("Tried to associate with invalid remote address [{}]. " + + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) + endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) Stop - } - } + + case NonFatal(e) ⇒ + + // logging + e match { + case _: EndpointDisassociatedException | _: EndpointAssociationException ⇒ // no logging + case _ ⇒ log.error(e, e.getMessage) + } + + // Retrying immediately if the retry gate is disabled, and it is an endpoint used for writing. + if (!retryGateEnabled && endpoints.isWritable(sender)) { + // This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue + // to the restarted endpoint -- thus no messages are lost + Restart + } else { + // This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure, + // keeps throwing away messages until the retry gate becomes open (time specified in RetryGateClosedFor) + endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor) + Stop + } + } def receive = { case Listen(addressesPromise) ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index e1f22f1379..7ae8c33df1 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -137,9 +137,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "send error message for wrong address" in { - filterEvents(EventFilter[EndpointException](occurrences = 6), EventFilter.error(start = "Association", occurrences = 6)) { - system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping" - } + filterEvents(EventFilter.error(start = "Association", occurrences = 6), + EventFilter.warning(pattern = ".*dead letter.*echo.*", occurrences = 1)) { + system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping" + } } "support ask" in {