From 03abd197fcb8204879a11980d37766cae5ad4804 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 23 Jan 2015 16:12:38 +0100 Subject: [PATCH 1/2] =str #15349 Sanitize error logging --- .../src/test/scala/akka/stream/testkit/StreamTestKit.scala | 1 + .../src/test/scala/akka/stream/testkit/TestUtils.scala | 1 + .../src/main/scala/akka/stream/impl/ActorProcessor.scala | 3 ++- akka-stream/src/main/scala/akka/stream/impl/FanIn.scala | 3 ++- akka-stream/src/main/scala/akka/stream/impl/FanOut.scala | 3 ++- .../src/main/scala/akka/stream/impl/FanoutProcessor.scala | 3 ++- .../main/scala/akka/stream/impl/io/TcpConnectionStream.scala | 5 ++++- .../scala/akka/stream/impl/io/TcpListenStreamActor.scala | 4 +++- .../src/main/scala/akka/stream/ssl/SslTlsCipher.scala | 3 ++- 9 files changed, 19 insertions(+), 7 deletions(-) diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 9ad1daca25..d87712abc6 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -3,6 +3,7 @@ */ package akka.stream.testkit +import scala.language.existentials import akka.actor.ActorSystem import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } import akka.testkit.TestProbe diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala index 878701ff6e..85b3b5d4f2 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala @@ -10,6 +10,7 @@ import java.net.InetSocketAddress import java.net.SocketAddress object TestUtils { // FIXME: remove once going back to project dependencies + import scala.language.reflectiveCalls // Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object type GeneralSocket = { def bind(sa: SocketAddress): Unit diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 109382723e..d6e4fa56d2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -250,7 +250,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin protected def onError(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 33dad6bfc7..7b02e0ecbd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -213,7 +213,8 @@ private[akka] abstract class FanIn(val settings: MaterializerSettings, val input override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) inputBunch.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index a8dd5cb71b..e7831d2020 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -244,7 +244,8 @@ private[akka] abstract class FanOut(val settings: MaterializerSettings, val outp override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) primaryInputs.cancel() outputBunch.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 5df3d7b9c9..0967d9cb0e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -105,7 +105,8 @@ private[akka] class FanoutProcessorImpl( } override def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + log.debug("fail {} due to: {}", self, e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) // Stopping will happen after flush diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 10307a6e7c..27ef598909 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -15,6 +15,7 @@ import akka.stream.StreamTcpException import org.reactivestreams.Processor import akka.actor.Stash import akka.stream.impl._ +import akka.actor.ActorLogging /** * INTERNAL API @@ -36,7 +37,8 @@ private[akka] object TcpStreamActor { /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash { +private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash + with ActorLogging { import TcpStreamActor._ @@ -183,6 +185,7 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) writePump.nextPhase(writePump.running) def fail(e: Throwable): Unit = { + log.debug("fail {} due to: {}", self, e.getMessage) tcpInputs.cancel() tcpOutputs.cancel(e) primaryInputs.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 3b7303bc36..d88420943e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -19,6 +19,7 @@ import akka.util.ByteString import org.reactivestreams.Subscriber import akka.stream.ConnectionException import akka.stream.BindFailedException +import akka.actor.ActorLogging /** * INTERNAL API @@ -39,7 +40,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], bindCmd: Tcp.Bind, settings: MaterializerSettings) extends Actor - with Pump with Stash { + with Pump with Stash with ActorLogging { import context.system object primaryOutputs extends SimpleOutputs(self, pump = this) { @@ -160,6 +161,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } def fail(e: Throwable): Unit = { + log.debug("fail {} due to: {}", self, e.getMessage) incomingConnections.cancel() primaryOutputs.cancel(e) } diff --git a/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala index 69565d6c19..067dd749a1 100644 --- a/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala +++ b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala @@ -360,7 +360,8 @@ class SslTlsCipherActor(val requester: ActorRef, val sessionNegotioation: SslTls override def receive = inputSubstreamManagement orElse outputSubstreamManagement protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (tracing) log.debug("fail {} due to: {}", self, e.getMessage) failInputs(e) failOutputs(e) context.stop(self) From aa5af8e8ada1f6b5d10cd51201ff7d575fc9b53e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 27 Jan 2015 13:36:13 +0100 Subject: [PATCH 2/2] +str #15349 Add debug logging setting --- akka-stream/src/main/resources/reference.conf | 3 +++ .../main/scala/akka/stream/FlowMaterializer.scala | 9 +++++++-- .../scala/akka/stream/impl/ActorProcessor.scala | 3 ++- .../src/main/scala/akka/stream/impl/FanIn.scala | 3 ++- .../src/main/scala/akka/stream/impl/FanOut.scala | 3 ++- .../scala/akka/stream/impl/FanoutProcessor.scala | 3 ++- .../stream/impl/fusing/ActorInterpreter.scala | 15 +++++++++++---- .../akka/stream/impl/io/TcpConnectionStream.scala | 3 ++- .../stream/impl/io/TcpListenStreamActor.scala | 3 ++- 9 files changed, 33 insertions(+), 12 deletions(-) diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 848ea8fb7e..70907f2ea9 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -38,6 +38,9 @@ akka { # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations. file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} + + # Enable additional troubleshooting logging at DEBUG log level + debug-logging = off } } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index dafe5a8427..6b6ca80ac5 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -183,7 +183,8 @@ object MaterializerSettings { config.getInt("max-input-buffer-size"), config.getString("dispatcher"), StreamSubscriptionTimeoutSettings(config), - config.getString("file-io-dispatcher")) + config.getString("file-io-dispatcher"), + config.getBoolean("debug-logging")) /** * Java API @@ -221,7 +222,8 @@ final case class MaterializerSettings( maxInputBufferSize: Int, dispatcher: String, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, - fileIODispatcher: String) { // FIXME Why does this exist?! + fileIODispatcher: String, // FIXME Why does this exist?! + debugLogging: Boolean) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -235,6 +237,9 @@ final case class MaterializerSettings( def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher) + def withDebugLogging(enable: Boolean): MaterializerSettings = + copy(debugLogging = enable) + private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 // FIXME this considers 0 a power of 2 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index d6e4fa56d2..31f6739a6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -251,7 +251,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin protected def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 7b02e0ecbd..1294ea2128 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -214,7 +214,8 @@ private[akka] abstract class FanIn(val settings: MaterializerSettings, val input protected def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) inputBunch.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index e7831d2020..7143f3f3a7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -245,7 +245,8 @@ private[akka] abstract class FanOut(val settings: MaterializerSettings, val outp protected def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() outputBunch.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 0967d9cb0e..ccff0f8bd6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -106,7 +106,8 @@ private[akka] class FanoutProcessorImpl( override def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) // Stopping will happen after flush diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index d28dd1cf2d..30327ffd90 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -14,11 +14,15 @@ import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } import scala.util.control.NonFatal import akka.actor.Props +import akka.actor.ActorLogging +import akka.event.LoggingAdapter /** * INTERNAL API */ -private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundaryStage { +private[akka] class BatchingActorInputBoundary(val size: Int) + extends BoundaryStage { + require(size > 0, "buffer size cannot be zero") require((size & (size - 1)) == 0, "buffer size must be a power of two") @@ -143,7 +147,8 @@ private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundarySt /** * INTERNAL API */ -private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryStage { +private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter) + extends BoundaryStage { private var exposedPublisher: ActorPublisher[Any] = _ @@ -172,6 +177,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundarySta def fail(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true + if (debugLogging) + log.debug("fail due to: {}", e.getMessage) if (subscriber ne null) subscriber.onError(e) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } @@ -256,10 +263,10 @@ private[akka] object ActorInterpreter { * INTERNAL API */ private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Stage[_, _]]) - extends Actor { + extends Actor with ActorLogging { private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) - private val downstream = new ActorOutputBoundary(self) + private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log) private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream) interpreter.init() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 27ef598909..2a131f232f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -185,7 +185,8 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) writePump.nextPhase(writePump.running) def fail(e: Throwable): Unit = { - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) tcpInputs.cancel() tcpOutputs.cancel(e) primaryInputs.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index d88420943e..86d49677e2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -161,7 +161,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } def fail(e: Throwable): Unit = { - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) incomingConnections.cancel() primaryOutputs.cancel(e) }