+str #15349 Add debug logging setting

This commit is contained in:
Patrik Nordwall 2015-01-27 13:36:13 +01:00
parent 03abd197fc
commit aa5af8e8ad
9 changed files with 33 additions and 12 deletions

View file

@ -38,6 +38,9 @@ akka {
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors for IO operations. # to be used by FlowMaterialiser when creating Actors for IO operations.
file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher}
# Enable additional troubleshooting logging at DEBUG log level
debug-logging = off
} }
} }

View file

@ -183,7 +183,8 @@ object MaterializerSettings {
config.getInt("max-input-buffer-size"), config.getInt("max-input-buffer-size"),
config.getString("dispatcher"), config.getString("dispatcher"),
StreamSubscriptionTimeoutSettings(config), StreamSubscriptionTimeoutSettings(config),
config.getString("file-io-dispatcher")) config.getString("file-io-dispatcher"),
config.getBoolean("debug-logging"))
/** /**
* Java API * Java API
@ -221,7 +222,8 @@ final case class MaterializerSettings(
maxInputBufferSize: Int, maxInputBufferSize: Int,
dispatcher: String, dispatcher: String,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
fileIODispatcher: String) { // FIXME Why does this exist?! fileIODispatcher: String, // FIXME Why does this exist?!
debugLogging: Boolean) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
@ -235,6 +237,9 @@ final case class MaterializerSettings(
def withDispatcher(dispatcher: String): MaterializerSettings = def withDispatcher(dispatcher: String): MaterializerSettings =
copy(dispatcher = dispatcher) copy(dispatcher = dispatcher)
def withDebugLogging(enable: Boolean): MaterializerSettings =
copy(debugLogging = enable)
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 // FIXME this considers 0 a power of 2 private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 // FIXME this considers 0 a power of 2
} }

View file

@ -251,7 +251,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
protected def fail(e: Throwable): Unit = { protected def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor // FIXME: escalate to supervisor
log.debug("fail {} due to: {}", self, e.getMessage) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
primaryInputs.cancel() primaryInputs.cancel()
primaryOutputs.cancel(e) primaryOutputs.cancel(e)
context.stop(self) context.stop(self)

View file

@ -214,7 +214,8 @@ private[akka] abstract class FanIn(val settings: MaterializerSettings, val input
protected def fail(e: Throwable): Unit = { protected def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor // FIXME: escalate to supervisor
log.debug("fail {} due to: {}", self, e.getMessage) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
inputBunch.cancel() inputBunch.cancel()
primaryOutputs.cancel(e) primaryOutputs.cancel(e)
context.stop(self) context.stop(self)

View file

@ -245,7 +245,8 @@ private[akka] abstract class FanOut(val settings: MaterializerSettings, val outp
protected def fail(e: Throwable): Unit = { protected def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor // FIXME: escalate to supervisor
log.debug("fail {} due to: {}", self, e.getMessage) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
primaryInputs.cancel() primaryInputs.cancel()
outputBunch.cancel(e) outputBunch.cancel(e)
context.stop(self) context.stop(self)

View file

@ -106,7 +106,8 @@ private[akka] class FanoutProcessorImpl(
override def fail(e: Throwable): Unit = { override def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor // FIXME: escalate to supervisor
log.debug("fail {} due to: {}", self, e.getMessage) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
primaryInputs.cancel() primaryInputs.cancel()
primaryOutputs.cancel(e) primaryOutputs.cancel(e)
// Stopping will happen after flush // Stopping will happen after flush

View file

@ -14,11 +14,15 @@ import akka.stream.stage._
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.Props import akka.actor.Props
import akka.actor.ActorLogging
import akka.event.LoggingAdapter
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundaryStage { private[akka] class BatchingActorInputBoundary(val size: Int)
extends BoundaryStage {
require(size > 0, "buffer size cannot be zero") require(size > 0, "buffer size cannot be zero")
require((size & (size - 1)) == 0, "buffer size must be a power of two") require((size & (size - 1)) == 0, "buffer size must be a power of two")
@ -143,7 +147,8 @@ private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundarySt
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryStage { private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter)
extends BoundaryStage {
private var exposedPublisher: ActorPublisher[Any] = _ private var exposedPublisher: ActorPublisher[Any] = _
@ -172,6 +177,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundarySta
def fail(e: Throwable): Unit = { def fail(e: Throwable): Unit = {
if (!downstreamCompleted) { if (!downstreamCompleted) {
downstreamCompleted = true downstreamCompleted = true
if (debugLogging)
log.debug("fail due to: {}", e.getMessage)
if (subscriber ne null) subscriber.onError(e) if (subscriber ne null) subscriber.onError(e)
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
} }
@ -256,10 +263,10 @@ private[akka] object ActorInterpreter {
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Stage[_, _]]) private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Stage[_, _]])
extends Actor { extends Actor with ActorLogging {
private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize)
private val downstream = new ActorOutputBoundary(self) private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log)
private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream) private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream)
interpreter.init() interpreter.init()

View file

@ -185,7 +185,8 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings)
writePump.nextPhase(writePump.running) writePump.nextPhase(writePump.running)
def fail(e: Throwable): Unit = { def fail(e: Throwable): Unit = {
log.debug("fail {} due to: {}", self, e.getMessage) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
tcpInputs.cancel() tcpInputs.cancel()
tcpOutputs.cancel(e) tcpOutputs.cancel(e)
primaryInputs.cancel() primaryInputs.cancel()

View file

@ -161,7 +161,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
} }
def fail(e: Throwable): Unit = { def fail(e: Throwable): Unit = {
log.debug("fail {} due to: {}", self, e.getMessage) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
incomingConnections.cancel() incomingConnections.cancel()
primaryOutputs.cancel(e) primaryOutputs.cancel(e)
} }