diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 848ea8fb7e..70907f2ea9 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -38,6 +38,9 @@ akka { # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations. file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} + + # Enable additional troubleshooting logging at DEBUG log level + debug-logging = off } } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index dafe5a8427..6b6ca80ac5 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -183,7 +183,8 @@ object MaterializerSettings { config.getInt("max-input-buffer-size"), config.getString("dispatcher"), StreamSubscriptionTimeoutSettings(config), - config.getString("file-io-dispatcher")) + config.getString("file-io-dispatcher"), + config.getBoolean("debug-logging")) /** * Java API @@ -221,7 +222,8 @@ final case class MaterializerSettings( maxInputBufferSize: Int, dispatcher: String, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, - fileIODispatcher: String) { // FIXME Why does this exist?! + fileIODispatcher: String, // FIXME Why does this exist?! + debugLogging: Boolean) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -235,6 +237,9 @@ final case class MaterializerSettings( def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher) + def withDebugLogging(enable: Boolean): MaterializerSettings = + copy(debugLogging = enable) + private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 // FIXME this considers 0 a power of 2 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index d6e4fa56d2..31f6739a6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -251,7 +251,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin protected def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 7b02e0ecbd..1294ea2128 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -214,7 +214,8 @@ private[akka] abstract class FanIn(val settings: MaterializerSettings, val input protected def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) inputBunch.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index e7831d2020..7143f3f3a7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -245,7 +245,8 @@ private[akka] abstract class FanOut(val settings: MaterializerSettings, val outp protected def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() outputBunch.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 0967d9cb0e..ccff0f8bd6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -106,7 +106,8 @@ private[akka] class FanoutProcessorImpl( override def fail(e: Throwable): Unit = { // FIXME: escalate to supervisor - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) // Stopping will happen after flush diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index d28dd1cf2d..30327ffd90 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -14,11 +14,15 @@ import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } import scala.util.control.NonFatal import akka.actor.Props +import akka.actor.ActorLogging +import akka.event.LoggingAdapter /** * INTERNAL API */ -private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundaryStage { +private[akka] class BatchingActorInputBoundary(val size: Int) + extends BoundaryStage { + require(size > 0, "buffer size cannot be zero") require((size & (size - 1)) == 0, "buffer size must be a power of two") @@ -143,7 +147,8 @@ private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundarySt /** * INTERNAL API */ -private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryStage { +private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter) + extends BoundaryStage { private var exposedPublisher: ActorPublisher[Any] = _ @@ -172,6 +177,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundarySta def fail(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true + if (debugLogging) + log.debug("fail due to: {}", e.getMessage) if (subscriber ne null) subscriber.onError(e) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } @@ -256,10 +263,10 @@ private[akka] object ActorInterpreter { * INTERNAL API */ private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Stage[_, _]]) - extends Actor { + extends Actor with ActorLogging { private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) - private val downstream = new ActorOutputBoundary(self) + private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log) private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream) interpreter.init() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 27ef598909..2a131f232f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -185,7 +185,8 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) writePump.nextPhase(writePump.running) def fail(e: Throwable): Unit = { - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) tcpInputs.cancel() tcpOutputs.cancel(e) primaryInputs.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index d88420943e..86d49677e2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -161,7 +161,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } def fail(e: Throwable): Unit = { - log.debug("fail {} due to: {}", self, e.getMessage) + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) incomingConnections.cancel() primaryOutputs.cancel(e) }