diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 3209c2d6ec..3a22653e75 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -3,6 +3,7 @@ */ package akka.stream.testkit +import scala.language.existentials import akka.actor.ActorSystem import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } import akka.testkit.TestProbe diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala index 878701ff6e..85b3b5d4f2 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala @@ -10,6 +10,7 @@ import java.net.InetSocketAddress import java.net.SocketAddress object TestUtils { // FIXME: remove once going back to project dependencies + import scala.language.reflectiveCalls // Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object type GeneralSocket = { def bind(sa: SocketAddress): Unit diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 848ea8fb7e..70907f2ea9 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -38,6 +38,9 @@ akka { # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations. file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} + + # Enable additional troubleshooting logging at DEBUG log level + debug-logging = off } } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index dafe5a8427..6b6ca80ac5 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -183,7 +183,8 @@ object MaterializerSettings { config.getInt("max-input-buffer-size"), config.getString("dispatcher"), StreamSubscriptionTimeoutSettings(config), - config.getString("file-io-dispatcher")) + config.getString("file-io-dispatcher"), + config.getBoolean("debug-logging")) /** * Java API @@ -221,7 +222,8 @@ final case class MaterializerSettings( maxInputBufferSize: Int, dispatcher: String, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, - fileIODispatcher: String) { // FIXME Why does this exist?! + fileIODispatcher: String, // FIXME Why does this exist?! + debugLogging: Boolean) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -235,6 +237,9 @@ final case class MaterializerSettings( def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher) + def withDebugLogging(enable: Boolean): MaterializerSettings = + copy(debugLogging = enable) + private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 // FIXME this considers 0 a power of 2 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 109382723e..31f6739a6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -250,7 +250,9 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin protected def onError(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index a99c515ec3..72863adb81 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -214,7 +214,9 @@ private[akka] abstract class FanIn(val settings: MaterializerSettings, val input override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) inputBunch.cancel() primaryOutputs.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 29e85c9fce..5dc77ecdc9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -244,7 +244,9 @@ private[akka] abstract class FanOut(val settings: MaterializerSettings, val outp override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() outputBunch.cancel(e) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 5df3d7b9c9..ccff0f8bd6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -105,7 +105,9 @@ private[akka] class FanoutProcessorImpl( } override def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() primaryOutputs.cancel(e) // Stopping will happen after flush diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index d28dd1cf2d..30327ffd90 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -14,11 +14,15 @@ import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } import scala.util.control.NonFatal import akka.actor.Props +import akka.actor.ActorLogging +import akka.event.LoggingAdapter /** * INTERNAL API */ -private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundaryStage { +private[akka] class BatchingActorInputBoundary(val size: Int) + extends BoundaryStage { + require(size > 0, "buffer size cannot be zero") require((size & (size - 1)) == 0, "buffer size must be a power of two") @@ -143,7 +147,8 @@ private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundarySt /** * INTERNAL API */ -private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryStage { +private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter) + extends BoundaryStage { private var exposedPublisher: ActorPublisher[Any] = _ @@ -172,6 +177,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundarySta def fail(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true + if (debugLogging) + log.debug("fail due to: {}", e.getMessage) if (subscriber ne null) subscriber.onError(e) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } @@ -256,10 +263,10 @@ private[akka] object ActorInterpreter { * INTERNAL API */ private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Stage[_, _]]) - extends Actor { + extends Actor with ActorLogging { private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) - private val downstream = new ActorOutputBoundary(self) + private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log) private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream) interpreter.init() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 10307a6e7c..2a131f232f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -15,6 +15,7 @@ import akka.stream.StreamTcpException import org.reactivestreams.Processor import akka.actor.Stash import akka.stream.impl._ +import akka.actor.ActorLogging /** * INTERNAL API @@ -36,7 +37,8 @@ private[akka] object TcpStreamActor { /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash { +private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash + with ActorLogging { import TcpStreamActor._ @@ -183,6 +185,8 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) writePump.nextPhase(writePump.running) def fail(e: Throwable): Unit = { + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) tcpInputs.cancel() tcpOutputs.cancel(e) primaryInputs.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 3b7303bc36..86d49677e2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -19,6 +19,7 @@ import akka.util.ByteString import org.reactivestreams.Subscriber import akka.stream.ConnectionException import akka.stream.BindFailedException +import akka.actor.ActorLogging /** * INTERNAL API @@ -39,7 +40,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], bindCmd: Tcp.Bind, settings: MaterializerSettings) extends Actor - with Pump with Stash { + with Pump with Stash with ActorLogging { import context.system object primaryOutputs extends SimpleOutputs(self, pump = this) { @@ -160,6 +161,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } def fail(e: Throwable): Unit = { + if (settings.debugLogging) + log.debug("fail due to: {}", e.getMessage) incomingConnections.cancel() primaryOutputs.cancel(e) } diff --git a/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala index 69565d6c19..067dd749a1 100644 --- a/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala +++ b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala @@ -360,7 +360,8 @@ class SslTlsCipherActor(val requester: ActorRef, val sessionNegotioation: SslTls override def receive = inputSubstreamManagement orElse outputSubstreamManagement protected def fail(e: Throwable): Unit = { - log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + // FIXME: escalate to supervisor + if (tracing) log.debug("fail {} due to: {}", self, e.getMessage) failInputs(e) failOutputs(e) context.stop(self)