diff --git a/akka-docs/rst/java/code/docs/stream/GraphStageLoggingDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphStageLoggingDocTest.java new file mode 100644 index 0000000000..82b5a2e6ad --- /dev/null +++ b/akka-docs/rst/java/code/docs/stream/GraphStageLoggingDocTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package docs.stream; + +import akka.actor.ActorSystem; +import akka.stream.Attributes; +import akka.stream.Materializer; +import akka.stream.Outlet; +import akka.stream.SourceShape; +import akka.stream.stage.*; +import docs.AbstractJavaTest; +import org.junit.Test; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collector; + +public class GraphStageLoggingDocTest extends AbstractJavaTest { + static ActorSystem system; + static Materializer mat; + + @Test + public void compileOnlyTestClass() throws Exception { } + + //#stage-with-logging + public class RandomLettersSource extends GraphStage> { + public final Outlet out = Outlet.create("RandomLettersSource.in"); + + private final SourceShape shape = SourceShape.of(out); + + @Override + public SourceShape shape() { + return shape; + } + + @Override + public GraphStageLogic createLogic(Attributes inheritedAttributes) { + return new GraphStageLogicWithLogging(shape()) { + + { + setHandler(out, new AbstractOutHandler() { + @Override + public void onPull() throws Exception { + final String s = nextChar();// ASCII lower case letters + + // `log` is obtained from materializer automatically (via StageLogging) + log().debug("Randomly generated: [{}]", s); + + push(out, s); + } + + private String nextChar() { + final char i = (char) ThreadLocalRandom.current().nextInt('a', 'z' + 1); + return String.valueOf(i); + } + + }); + } + }; + } + } + //#stage-with-logging + +} diff --git a/akka-docs/rst/java/stream/stream-customize.rst b/akka-docs/rst/java/stream/stream-customize.rst index ffcc8d7fde..6d60f0cb01 100644 --- a/akka-docs/rst/java/stream/stream-customize.rst +++ b/akka-docs/rst/java/stream/stream-customize.rst @@ -296,6 +296,29 @@ constructor and usually done in ``preStart``). In this case the stage **must** b or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used with care. +Logging inside GraphStages +-------------------------- + +Logging debug or other important information in your stages is often a very good idea, especially when developing +more advances stages which may need to be debugged at some point. + +You can extend the ``akka.stream.stage.GraphStageWithLogging`` or ``akka.strea.stage.TimerGraphStageWithLogging`` classes +instead of the usual ``GraphStage`` to enable you to easily obtain a ``LoggingAdapter`` inside your stage as long as +the ``Materializer`` you're using is able to provide you with a logger. + +.. note:: + Please note that you can always simply use a logging library directly inside a Stage. + Make sure to use an asynchronous appender however, to not accidentally block the stage when writing to files etc. + See :ref:`slf4j-directly-java` for more details on setting up async appenders in SLF4J. + +The stage then gets access to the ``log`` field which it can safely use from any ``GraphStage`` callbacks: + +.. includecode:: ../code/docs/stream/GraphStageLoggingDocTest.java#stage-with-logging + +.. note:: + **SPI Note:** If you're implementing a Materializer, you can add this ability to your materializer by implementing + ``MaterializerLoggingProvider`` in your ``Materializer``. + Using timers ------------ diff --git a/akka-docs/rst/scala/code/docs/stream/GraphStageLoggingDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/GraphStageLoggingDocSpec.scala new file mode 100644 index 0000000000..c495517337 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/stream/GraphStageLoggingDocSpec.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package docs.stream + +import java.util.concurrent.ThreadLocalRandom + +import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging } +import akka.testkit.{ AkkaSpec, EventFilter } + +class GraphStageLoggingDocSpec extends AkkaSpec("akka.loglevel = DEBUG") { + + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + + //#stage-with-logging + final class RandomLettersSource extends GraphStage[SourceShape[String]] { + val out = Outlet[String]("RandomLettersSource.out") + override val shape: SourceShape[String] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) with StageLogging { + setHandler(out, new OutHandler { + override def onPull(): Unit = { + val c = nextChar() // ASCII lower case letters + + // `log` is obtained from materializer automatically (via StageLogging) + log.debug("Randomly generated: [{}]", c) + + push(out, c.toString) + } + }) + } + + def nextChar(): Char = + ThreadLocalRandom.current().nextInt('a', 'z'.toInt + 1).toChar + } + //#stage-with-logging + + "demonstrate logging in custom graphstage" in { + val n = 10 + EventFilter.debug(start = "Randomly generated", occurrences = n).intercept { + Source.fromGraph(new RandomLettersSource) + .take(n) + .runWith(Sink.ignore) + .futureValue + } + } + +} + diff --git a/akka-docs/rst/scala/stream/stream-customize.rst b/akka-docs/rst/scala/stream/stream-customize.rst index dd686cf539..ff894026a8 100644 --- a/akka-docs/rst/scala/stream/stream-customize.rst +++ b/akka-docs/rst/scala/stream/stream-customize.rst @@ -300,6 +300,29 @@ constructor and usually done in ``preStart``). In this case the stage **must** b or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used with care. +Logging inside GraphStages +-------------------------- + +Logging debug or other important information in your stages is often a very good idea, especially when developing +more advances stages which may need to be debugged at some point. + +The helper trait ``akka.stream.stage.StageLogging`` is provided to enable you to easily obtain a ``LoggingAdapter`` +inside of a ``GraphStage`` as long as the ``Materializer`` you're using is able to provide you with a logger. +In that sense, it serves a very similar purpose as ``ActorLogging`` does for Actors. + +.. note:: + Please note that you can always simply use a logging library directly inside a Stage. + Make sure to use an asynchronous appender however, to not accidentally block the stage when writing to files etc. + See :ref:`slf4j-directly-scala` for more details on setting up async appenders in SLF4J. + +The stage then gets access to the ``log`` field which it can safely use from any ``GraphStage`` callbacks: + +.. includecode:: ../code/docs/stream/GraphStageLoggingDocSpec.scala#stage-with-logging + +.. note:: + **SPI Note:** If you're implementing a Materializer, you can add this ability to your materializer by implementing + ``MaterializerLoggingProvider`` in your ``Materializer``. + Using timers ------------ diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 8ca2a0eae6..7f676796cf 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -10,17 +10,15 @@ import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRe import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.stream.stage._ import akka.util.{ ByteString, OptionVal } import akka.actor.EmptyLocalActorRef import akka.remote.artery.compress.InboundCompressions -import akka.stream.stage.TimerGraphStageLogic import java.util.concurrent.TimeUnit import scala.concurrent.Future import akka.remote.artery.compress.CompressionTable import akka.Done -import akka.stream.stage.GraphStageWithMaterializedValue import scala.concurrent.Promise import akka.event.Logging diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 3c8a2b38cc..2e25f2c3c4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -4,6 +4,7 @@ package akka.remote.artery import java.util.ArrayDeque + import scala.concurrent.Future import scala.concurrent.Promise import akka.Done @@ -11,11 +12,7 @@ import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet import akka.stream.Outlet -import akka.stream.stage.CallbackWrapper -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.GraphStageWithMaterializedValue -import akka.stream.stage.InHandler -import akka.stream.stage.OutHandler +import akka.stream.stage._ import akka.remote.UniqueAddress import akka.util.OptionVal import akka.event.Logging diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 1c3abacbbc..f13eef6a72 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -12,13 +12,10 @@ import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet import akka.stream.Outlet -import akka.stream.stage.GraphStage -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.InHandler -import akka.stream.stage.OutHandler -import akka.stream.stage.TimerGraphStageLogic +import akka.stream.stage._ import akka.util.OptionVal import akka.Done + import scala.concurrent.Future import akka.actor.Address diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala index edbbc289e5..27f0abd0a8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala @@ -7,10 +7,7 @@ import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet import akka.stream.Outlet -import akka.stream.stage.GraphStage -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.InHandler -import akka.stream.stage.OutHandler +import akka.stream.stage._ import akka.remote.UniqueAddress import akka.util.OptionVal import akka.event.Logging diff --git a/akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala b/akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala deleted file mode 100644 index 8f9e768299..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery - -import akka.stream.stage.GraphStageLogic -import akka.event.LoggingAdapter -import akka.stream.ActorMaterializer -import akka.event.NoLogging - -// TODO this can be removed when https://github.com/akka/akka/issues/18793 has been implemented -/** - * INTERNAL API - */ -private[akka] trait StageLogging { self: GraphStageLogic ⇒ - - private var _log: LoggingAdapter = _ - - protected def logSource: Class[_] = this.getClass - - def log: LoggingAdapter = { - // only used in StageLogic, i.e. thread safe - if (_log eq null) { - materializer match { - case a: ActorMaterializer ⇒ - _log = akka.event.Logging(a.system, logSource) - case _ ⇒ - _log = NoLogging - } - } - _log - } - -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala index 2b7ca5e27c..d2d190bc13 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala @@ -7,18 +7,13 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.concurrent.duration._ - import akka.actor.Address import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet import akka.stream.Outlet -import akka.stream.stage.GraphStage -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.InHandler -import akka.stream.stage.OutHandler -import akka.stream.stage.TimerGraphStageLogic +import akka.stream.stage._ import akka.util.OptionVal import akka.event.Logging diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 9f285be347..f00f97f886 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -167,7 +167,7 @@ private[akka] object ActorMaterializerHelper { * steps are split up into asynchronous regions is implementation * dependent. */ -abstract class ActorMaterializer extends Materializer { +abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider { def settings: ActorMaterializerSettings diff --git a/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala b/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala new file mode 100644 index 0000000000..cf922e7d55 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.stream + +import akka.event.LoggingAdapter + +/** + * SPI intended only to be extended by custom [[Materializer]] implementations, + * that also want to provide stages they materialize with specialized [[akka.event.LoggingAdapter]] instances. + */ +trait MaterializerLoggingProvider { this: Materializer ⇒ + + def makeLogger(logSource: Class[_]): LoggingAdapter + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 1f9b53e632..28e0191c26 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -240,6 +240,9 @@ private[akka] case class ActorMaterializerImpl( session.materialize().asInstanceOf[Mat] } + override def makeLogger(logSource: Class[_]): LoggingAdapter = + Logging(system, logSource) + override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId case other ⇒ other diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 269f38a41c..cf6868a645 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -1231,6 +1231,12 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap } +/** Java API: [[GraphStageLogic]] with [[StageLogging]]. */ +abstract class GraphStageLogicWithLogging(_shape: Shape) extends GraphStageLogic(_shape) with StageLogging + +/** Java API: [[TimerGraphStageLogic]] with [[StageLogging]]. */ +abstract class TimerGraphStageLogicWithLogging(_shape: Shape) extends TimerGraphStageLogic(_shape) with StageLogging + /** * Collection of callbacks for an input port of a [[GraphStage]] */ diff --git a/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala b/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala new file mode 100644 index 0000000000..58a98500b1 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.stream.stage + +import akka.event.{ LoggingAdapter, NoLogging } +import akka.stream.{ ActorMaterializer, MaterializerLoggingProvider } + +/** + * Simple way to obtain a [[LoggingAdapter]] when used together with an [[ActorMaterializer]]. + * If used with a different materializer [[NoLogging]] will be returned. + * + * Make sure to only access `log` from GraphStage callbacks (such as `pull`, `push` or the async-callback). + * + * Note, abiding to [[akka.stream.ActorAttributes.logLevels]] has to be done manually, + * the logger itself is configured based on the logSource provided to it. Also, the `log` + * itself would not know if you're calling it from a "on element" context or not, which is why + * these decisions have to be handled by the stage itself. + */ +trait StageLogging { self: GraphStageLogic ⇒ + private[this] var _log: LoggingAdapter = _ + + /** Override to customise reported log source */ + protected def logSource: Class[_] = this.getClass + + def log: LoggingAdapter = { + // only used in StageLogic, i.e. thread safe + if (_log eq null) { + materializer match { + case p: MaterializerLoggingProvider ⇒ + _log = p.makeLogger(logSource) + case _ ⇒ + _log = NoLogging + } + } + _log + } + +}