+str #18793 StageLogging that allows logger access in stages (#21696)

* +str #18793 StageLogging that allows logger access in stages
Also, non ActorMaterializers can opt-into providing a logger here.

* +str #18794 add javadsl for StageLogging

* fix missing test method on compile only class
This commit is contained in:
Konrad Malawski 2016-10-28 16:05:56 +02:00 committed by GitHub
parent b775db0be3
commit 0127d4f424
15 changed files with 238 additions and 58 deletions

View file

@ -0,0 +1,65 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.stream;
import akka.actor.ActorSystem;
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.stage.*;
import docs.AbstractJavaTest;
import org.junit.Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collector;
public class GraphStageLoggingDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@Test
public void compileOnlyTestClass() throws Exception { }
//#stage-with-logging
public class RandomLettersSource extends GraphStage<SourceShape<String>> {
public final Outlet<String> out = Outlet.create("RandomLettersSource.in");
private final SourceShape<String> shape = SourceShape.of(out);
@Override
public SourceShape<String> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogicWithLogging(shape()) {
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
final String s = nextChar();// ASCII lower case letters
// `log` is obtained from materializer automatically (via StageLogging)
log().debug("Randomly generated: [{}]", s);
push(out, s);
}
private String nextChar() {
final char i = (char) ThreadLocalRandom.current().nextInt('a', 'z' + 1);
return String.valueOf(i);
}
});
}
};
}
}
//#stage-with-logging
}

View file

@ -296,6 +296,29 @@ constructor and usually done in ``preStart``). In this case the stage **must** b
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
with care. with care.
Logging inside GraphStages
--------------------------
Logging debug or other important information in your stages is often a very good idea, especially when developing
more advances stages which may need to be debugged at some point.
You can extend the ``akka.stream.stage.GraphStageWithLogging`` or ``akka.strea.stage.TimerGraphStageWithLogging`` classes
instead of the usual ``GraphStage`` to enable you to easily obtain a ``LoggingAdapter`` inside your stage as long as
the ``Materializer`` you're using is able to provide you with a logger.
.. note::
Please note that you can always simply use a logging library directly inside a Stage.
Make sure to use an asynchronous appender however, to not accidentally block the stage when writing to files etc.
See :ref:`slf4j-directly-java` for more details on setting up async appenders in SLF4J.
The stage then gets access to the ``log`` field which it can safely use from any ``GraphStage`` callbacks:
.. includecode:: ../code/docs/stream/GraphStageLoggingDocTest.java#stage-with-logging
.. note::
**SPI Note:** If you're implementing a Materializer, you can add this ability to your materializer by implementing
``MaterializerLoggingProvider`` in your ``Materializer``.
Using timers Using timers
------------ ------------

View file

@ -0,0 +1,53 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.stream
import java.util.concurrent.ThreadLocalRandom
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging }
import akka.testkit.{ AkkaSpec, EventFilter }
class GraphStageLoggingDocSpec extends AkkaSpec("akka.loglevel = DEBUG") {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
//#stage-with-logging
final class RandomLettersSource extends GraphStage[SourceShape[String]] {
val out = Outlet[String]("RandomLettersSource.out")
override val shape: SourceShape[String] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with StageLogging {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val c = nextChar() // ASCII lower case letters
// `log` is obtained from materializer automatically (via StageLogging)
log.debug("Randomly generated: [{}]", c)
push(out, c.toString)
}
})
}
def nextChar(): Char =
ThreadLocalRandom.current().nextInt('a', 'z'.toInt + 1).toChar
}
//#stage-with-logging
"demonstrate logging in custom graphstage" in {
val n = 10
EventFilter.debug(start = "Randomly generated", occurrences = n).intercept {
Source.fromGraph(new RandomLettersSource)
.take(n)
.runWith(Sink.ignore)
.futureValue
}
}
}

View file

@ -300,6 +300,29 @@ constructor and usually done in ``preStart``). In this case the stage **must** b
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
with care. with care.
Logging inside GraphStages
--------------------------
Logging debug or other important information in your stages is often a very good idea, especially when developing
more advances stages which may need to be debugged at some point.
The helper trait ``akka.stream.stage.StageLogging`` is provided to enable you to easily obtain a ``LoggingAdapter``
inside of a ``GraphStage`` as long as the ``Materializer`` you're using is able to provide you with a logger.
In that sense, it serves a very similar purpose as ``ActorLogging`` does for Actors.
.. note::
Please note that you can always simply use a logging library directly inside a Stage.
Make sure to use an asynchronous appender however, to not accidentally block the stage when writing to files etc.
See :ref:`slf4j-directly-scala` for more details on setting up async appenders in SLF4J.
The stage then gets access to the ``log`` field which it can safely use from any ``GraphStage`` callbacks:
.. includecode:: ../code/docs/stream/GraphStageLoggingDocSpec.scala#stage-with-logging
.. note::
**SPI Note:** If you're implementing a Materializer, you can add this ability to your materializer by implementing
``MaterializerLoggingProvider`` in your ``Materializer``.
Using timers Using timers
------------ ------------

View file

@ -10,17 +10,15 @@ import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRe
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension } import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._ import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.stage._
import akka.util.{ ByteString, OptionVal } import akka.util.{ ByteString, OptionVal }
import akka.actor.EmptyLocalActorRef import akka.actor.EmptyLocalActorRef
import akka.remote.artery.compress.InboundCompressions import akka.remote.artery.compress.InboundCompressions
import akka.stream.stage.TimerGraphStageLogic
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.concurrent.Future import scala.concurrent.Future
import akka.remote.artery.compress.CompressionTable import akka.remote.artery.compress.CompressionTable
import akka.Done import akka.Done
import akka.stream.stage.GraphStageWithMaterializedValue
import scala.concurrent.Promise import scala.concurrent.Promise
import akka.event.Logging import akka.event.Logging

View file

@ -4,6 +4,7 @@
package akka.remote.artery package akka.remote.artery
import java.util.ArrayDeque import java.util.ArrayDeque
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import akka.Done import akka.Done
@ -11,11 +12,7 @@ import akka.stream.Attributes
import akka.stream.FlowShape import akka.stream.FlowShape
import akka.stream.Inlet import akka.stream.Inlet
import akka.stream.Outlet import akka.stream.Outlet
import akka.stream.stage.CallbackWrapper import akka.stream.stage._
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.util.OptionVal import akka.util.OptionVal
import akka.event.Logging import akka.event.Logging

View file

@ -12,13 +12,10 @@ import akka.stream.Attributes
import akka.stream.FlowShape import akka.stream.FlowShape
import akka.stream.Inlet import akka.stream.Inlet
import akka.stream.Outlet import akka.stream.Outlet
import akka.stream.stage.GraphStage import akka.stream.stage._
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.util.OptionVal import akka.util.OptionVal
import akka.Done import akka.Done
import scala.concurrent.Future import scala.concurrent.Future
import akka.actor.Address import akka.actor.Address

View file

@ -7,10 +7,7 @@ import akka.stream.Attributes
import akka.stream.FlowShape import akka.stream.FlowShape
import akka.stream.Inlet import akka.stream.Inlet
import akka.stream.Outlet import akka.stream.Outlet
import akka.stream.stage.GraphStage import akka.stream.stage._
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.util.OptionVal import akka.util.OptionVal
import akka.event.Logging import akka.event.Logging

View file

@ -1,34 +0,0 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.stream.stage.GraphStageLogic
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import akka.event.NoLogging
// TODO this can be removed when https://github.com/akka/akka/issues/18793 has been implemented
/**
* INTERNAL API
*/
private[akka] trait StageLogging { self: GraphStageLogic
private var _log: LoggingAdapter = _
protected def logSource: Class[_] = this.getClass
def log: LoggingAdapter = {
// only used in StageLogic, i.e. thread safe
if (_log eq null) {
materializer match {
case a: ActorMaterializer
_log = akka.event.Logging(a.system, logSource)
case _
_log = NoLogging
}
}
_log
}
}

View file

@ -7,18 +7,13 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.FlowShape import akka.stream.FlowShape
import akka.stream.Inlet import akka.stream.Inlet
import akka.stream.Outlet import akka.stream.Outlet
import akka.stream.stage.GraphStage import akka.stream.stage._
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.util.OptionVal import akka.util.OptionVal
import akka.event.Logging import akka.event.Logging

View file

@ -167,7 +167,7 @@ private[akka] object ActorMaterializerHelper {
* steps are split up into asynchronous regions is implementation * steps are split up into asynchronous regions is implementation
* dependent. * dependent.
*/ */
abstract class ActorMaterializer extends Materializer { abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider {
def settings: ActorMaterializerSettings def settings: ActorMaterializerSettings

View file

@ -0,0 +1,17 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream
import akka.event.LoggingAdapter
/**
* SPI intended only to be extended by custom [[Materializer]] implementations,
* that also want to provide stages they materialize with specialized [[akka.event.LoggingAdapter]] instances.
*/
trait MaterializerLoggingProvider { this: Materializer
def makeLogger(logSource: Class[_]): LoggingAdapter
}

View file

@ -240,6 +240,9 @@ private[akka] case class ActorMaterializerImpl(
session.materialize().asInstanceOf[Mat] session.materialize().asInstanceOf[Mat]
} }
override def makeLogger(logSource: Class[_]): LoggingAdapter =
Logging(system, logSource)
override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match {
case Deploy.NoDispatcherGiven Dispatchers.DefaultDispatcherId case Deploy.NoDispatcherGiven Dispatchers.DefaultDispatcherId
case other other case other other

View file

@ -1231,6 +1231,12 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
} }
/** Java API: [[GraphStageLogic]] with [[StageLogging]]. */
abstract class GraphStageLogicWithLogging(_shape: Shape) extends GraphStageLogic(_shape) with StageLogging
/** Java API: [[TimerGraphStageLogic]] with [[StageLogging]]. */
abstract class TimerGraphStageLogicWithLogging(_shape: Shape) extends TimerGraphStageLogic(_shape) with StageLogging
/** /**
* Collection of callbacks for an input port of a [[GraphStage]] * Collection of callbacks for an input port of a [[GraphStage]]
*/ */

View file

@ -0,0 +1,40 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.stage
import akka.event.{ LoggingAdapter, NoLogging }
import akka.stream.{ ActorMaterializer, MaterializerLoggingProvider }
/**
* Simple way to obtain a [[LoggingAdapter]] when used together with an [[ActorMaterializer]].
* If used with a different materializer [[NoLogging]] will be returned.
*
* Make sure to only access `log` from GraphStage callbacks (such as `pull`, `push` or the async-callback).
*
* Note, abiding to [[akka.stream.ActorAttributes.logLevels]] has to be done manually,
* the logger itself is configured based on the logSource provided to it. Also, the `log`
* itself would not know if you're calling it from a "on element" context or not, which is why
* these decisions have to be handled by the stage itself.
*/
trait StageLogging { self: GraphStageLogic
private[this] var _log: LoggingAdapter = _
/** Override to customise reported log source */
protected def logSource: Class[_] = this.getClass
def log: LoggingAdapter = {
// only used in StageLogic, i.e. thread safe
if (_log eq null) {
materializer match {
case p: MaterializerLoggingProvider
_log = p.makeLogger(logSource)
case _
_log = NoLogging
}
}
_log
}
}