From 570f19f6acba8d1b0e8de2a887ded2fb400318d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 27 Jun 2013 15:30:41 +0200 Subject: [PATCH 1/2] Error level remoting events are logged by default #3475 --- .../akka/remote/RemotingLifecycleEvent.scala | 2 +- .../test/scala/akka/remote/RemotingSpec.scala | 26 ++++++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index 42c788e5d7..aec4fb2e27 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -84,6 +84,6 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) { def notifyListeners(message: RemotingLifecycleEvent): Unit = { system.eventStream.publish(message) - if (logEvents) log.log(message.logLevel, "{}", message) + if (logEvents || message.logLevel == Logging.ErrorLevel) log.log(message.logLevel, "{}", message) } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index afc49a7904..7c8ad93396 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -81,7 +81,7 @@ object RemotingSpec { transport = "akka.remote.Remoting" retry-gate-closed-for = 1 s - log-remote-lifecycle-events = on + log-remote-lifecycle-events = off enabled-transports = [ "akka.remote.test", @@ -474,9 +474,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "drop unserializable messages" in { object Unserializable - verifySend(Unserializable) { - expectMsgPF(1.second) { - case AssociationErrorEvent(_: NotSerializableException, _, _, _) ⇒ () + EventFilter.error(pattern = ".*No configured serialization.*", occurrences = 1).intercept { + verifySend(Unserializable) { + expectMsgPF(1.second) { + case AssociationErrorEvent(_: NotSerializableException, _, _, _) ⇒ () + } } } } @@ -491,18 +493,22 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "drop sent messages over payload size" in { val oversized = byteStringOfSize(maxPayloadBytes + 1) - verifySend(oversized) { - expectMsgPF(1.second) { - case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload sent") ⇒ () + EventFilter.error(pattern = ".*Discarding oversized payload sent.*", occurrences = 1).intercept { + verifySend(oversized) { + expectMsgPF(1.second) { + case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload sent") ⇒ () + } } } } "drop received messages over payload size" in { // Receiver should reply with a message of size maxPayload + 1, which will be dropped and an error logged - verifySend(maxPayloadBytes + 1) { - expectMsgPF(1.second) { - case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload received") ⇒ () + EventFilter.error(pattern = ".*Discarding oversized payload received.*", occurrences = 1).intercept { + verifySend(maxPayloadBytes + 1) { + expectMsgPF(1.second) { + case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload received") ⇒ () + } } } } From d30c5bcef7b09baf88b75a7de5e59a5469241f37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 1 Jul 2013 12:51:15 +0200 Subject: [PATCH 2/2] Logging of transient errors in remoting attempt 2. --- .../src/main/scala/akka/remote/Endpoint.scala | 13 ++++++------ .../akka/remote/RemotingLifecycleEvent.scala | 2 +- .../test/scala/akka/remote/RemotingSpec.scala | 20 +++++++------------ 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 8b366352e1..6aef90b5d6 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -433,8 +433,8 @@ private[remote] class EndpointWriter( throw reason } - private def publishAndStay(reason: Throwable): State = { - publishError(reason) + private def logAndStay(reason: Throwable): State = { + log.error(reason, "Transient association error (association remains live)") stay() } @@ -513,7 +513,7 @@ private[remote] class EndpointWriter( remoteMetrics.logPayloadBytes(msg, pduSize) if (pduSize > transport.maximumPayloadBytes) { - publishAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes.")) + logAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes.")) } else if (h.write(pdu)) { stay() } else { @@ -524,7 +524,7 @@ private[remote] class EndpointWriter( throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.") } } catch { - case e: NotSerializableException ⇒ publishAndStay(e) + case e: NotSerializableException ⇒ logAndStay(e) case e: EndpointException ⇒ publishAndThrow(e) case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e)) } @@ -707,8 +707,9 @@ private[remote] class EndpointReader( } case InboundPayload(oversized) ⇒ - publishError(new OversizedPayloadException(s"Discarding oversized payload received: " + - s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes.")) + log.error(new OversizedPayloadException(s"Discarding oversized payload received: " + + s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."), + "Transient error while reading from association (association remains live)") case StopReading(writer) ⇒ saveState() diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index aec4fb2e27..42c788e5d7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -84,6 +84,6 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) { def notifyListeners(message: RemotingLifecycleEvent): Unit = { system.eventStream.publish(message) - if (logEvents || message.logLevel == Logging.ErrorLevel) log.log(message.logLevel, "{}", message) + if (logEvents) log.log(message.logLevel, "{}", message) } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 7c8ad93396..6c88c95738 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -81,7 +81,7 @@ object RemotingSpec { transport = "akka.remote.Remoting" retry-gate-closed-for = 1 s - log-remote-lifecycle-events = off + log-remote-lifecycle-events = on enabled-transports = [ "akka.remote.test", @@ -474,11 +474,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "drop unserializable messages" in { object Unserializable - EventFilter.error(pattern = ".*No configured serialization.*", occurrences = 1).intercept { + EventFilter[NotSerializableException](pattern = ".*No configured serialization.*", occurrences = 1).intercept { verifySend(Unserializable) { - expectMsgPF(1.second) { - case AssociationErrorEvent(_: NotSerializableException, _, _, _) ⇒ () - } + expectNoMsg(1.second) // No AssocitionErrorEvent should be published } } } @@ -493,22 +491,18 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "drop sent messages over payload size" in { val oversized = byteStringOfSize(maxPayloadBytes + 1) - EventFilter.error(pattern = ".*Discarding oversized payload sent.*", occurrences = 1).intercept { + EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload sent.*", occurrences = 1).intercept { verifySend(oversized) { - expectMsgPF(1.second) { - case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload sent") ⇒ () - } + expectNoMsg(1.second) // No AssocitionErrorEvent should be published } } } "drop received messages over payload size" in { // Receiver should reply with a message of size maxPayload + 1, which will be dropped and an error logged - EventFilter.error(pattern = ".*Discarding oversized payload received.*", occurrences = 1).intercept { + EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload received.*", occurrences = 1).intercept { verifySend(maxPayloadBytes + 1) { - expectMsgPF(1.second) { - case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload received") ⇒ () - } + expectNoMsg(1.second) // No AssocitionErrorEvent should be published } } }