diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala index b1d326613a..9ca43de2d9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala @@ -1,7 +1,8 @@ package docs.stream.cookbook import akka.event.Logging -import akka.stream.scaladsl.{ Sink, Source, Flow } +import akka.stream.OperationAttributes +import akka.stream.scaladsl.{ Sink, Source } import akka.testkit.{ EventFilter, TestProbe } class RecipeLoggingElements extends RecipeSpec { @@ -22,35 +23,23 @@ class RecipeLoggingElements extends RecipeSpec { printProbe.expectMsgAllOf("1", "2", "3") } - "work with PushStage" in { + "use log()" in { val mySource = Source(List("1", "2", "3")) + def analyse(s: String) = s - //#loggingadapter - import akka.stream.stage._ - class LoggingStage[T] extends PushStage[T, T] { - private val log = Logging(system, "loggingName") + //#log-custom + // customise log levels + mySource.log("before-map") + .withAttributes(OperationAttributes.logLevels(onElement = Logging.WarningLevel)) + .map(analyse) - override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - log.debug("Element flowing through: {}", elem) - ctx.push(elem) - } + // or provide custom logging adapter + implicit val adapter = Logging(system, "customLogger") + mySource.log("custom") + //#log-custom - override def onUpstreamFailure(cause: Throwable, - ctx: Context[T]): TerminationDirective = { - log.error(cause, "Upstream failed.") - super.onUpstreamFailure(cause, ctx) - } - - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - log.debug("Upstream finished") - super.onUpstreamFinish(ctx) - } - } - - val loggedSource = mySource.transform(() => new LoggingStage) - //#loggingadapter - - EventFilter.debug(start = "Element flowing").intercept { + val loggedSource = mySource.log("custom") + EventFilter.debug(start = "[custom] Element: ").intercept { loggedSource.runWith(Sink.ignore) } diff --git a/akka-docs-dev/rst/scala/stream-cookbook.rst b/akka-docs-dev/rst/scala/stream-cookbook.rst index 1e6e191f1d..e98d49d75e 100644 --- a/akka-docs-dev/rst/scala/stream-cookbook.rst +++ b/akka-docs-dev/rst/scala/stream-cookbook.rst @@ -39,7 +39,7 @@ handlers, emitting log information through an Akka :class:`LoggingAdapter`. This the elements flowing in the stream, it just emits them unmodified by calling ``ctx.push(elem)`` in its ``onPush`` event handler logic. -.. includecode:: code/docs/stream/cookbook/RecipeLoggingElements.scala#loggingadapter +.. includecode:: code/docs/stream/cookbook/RecipeLoggingElements.scala#log-custom Flattening a stream of sequences -------------------------------- diff --git a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java index e7746c39a1..e26af67b99 100644 --- a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java @@ -5,7 +5,9 @@ package akka.stream; import akka.actor.ActorSystem; +import akka.event.Logging; import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.OperationAttributes; public abstract class StreamTest { final protected ActorSystem system; diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index b317535e4e..07b2e08580 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl.fusing +import akka.stream.OperationAttributes import akka.stream.testkit.AkkaSpec import akka.stream.stage._ import akka.testkit.TestProbe @@ -61,6 +62,7 @@ trait InterpreterSpecKit extends AkkaSpec { val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, (op, ctx, event) ⇒ sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event), ActorFlowMaterializer(), + OperationAttributes.none, forkLimit, overflowToHeap) interpreter.init() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala new file mode 100644 index 0000000000..f6d9f0b06f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.event.{ DummyClassForStringSources, Logging } +import akka.stream._ +import akka.stream.OperationAttributes +import akka.stream.OperationAttributes.LogLevels +import akka.stream.testkit.{ AkkaSpec, ScriptedTest } +import akka.testkit.TestProbe + +import scala.util.control.NoStackTrace + +class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest { + + implicit val mat: FlowMaterializer = ActorFlowMaterializer() + + val logProbe = { + val p = TestProbe() + system.eventStream.subscribe(p.ref, classOf[Logging.LogEvent]) + p + } + + "A Log" must { + + val LogSrc = s"akka.stream.Log(akka://${Logging.simpleName(classOf[FlowLogSpec])})" + val LogClazz = classOf[DummyClassForStringSources] + + "on Flow" must { + + "debug each element" in { + val debugging = Flow[Int].log("my-debug") + Source(1 to 2).via(debugging).runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 1")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 2")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished.")) + } + + "allow disabling element logging" in { + val disableElementLogging = OperationAttributes.logLevels( + onElement = LogLevels.Off, + onFinish = Logging.DebugLevel, + onFailure = Logging.DebugLevel) + + val debugging = Flow[Int].log("my-debug") + Source(1 to 2) + .via(debugging) + .withAttributes(disableElementLogging) + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished.")) + } + + } + + "on javadsl.Flow" must { + "debug each element" in { + val log = Logging(system, "com.example.ImportantLogger") + + val debugging: javadsl.Flow[Integer, Integer, Unit] = javadsl.Flow.of(classOf[Integer]) + .log("log-1") + .log("log-2", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i }) + .log("log-3", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i }, log) + .log("log-4", log) + + javadsl.Source.single[Integer](1).via(debugging).runWith(javadsl.Sink.ignore(), mat) + + var counter = 0 + var finishCounter = 0 + import scala.concurrent.duration._ + logProbe.fishForMessage(3.seconds) { + case Logging.Debug(_, _, msg: String) if msg contains "Element: 1" ⇒ + counter += 1 + counter == 4 && finishCounter == 4 + + case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished" ⇒ + finishCounter += 1 + counter == 4 && finishCounter == 4 + } + } + } + + "on Source" must { + "debug each element" in { + Source(1 to 2).log("flow-s2").runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 1")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 2")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Upstream finished.")) + } + + "allow extracting value to be logged" in { + case class Complex(a: Int, b: String) + Source.single(Complex(1, "42")).log("flow-s3", _.b).runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Element: 42")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Upstream finished.")) + } + + "log upstream failure" in { + val cause = new TestException + Source.failed(cause).log("flow-4").runWith(Sink.ignore) + logProbe.expectMsg(Logging.Error(cause, LogSrc, LogClazz, "[flow-4] Upstream failed.")) + } + + "allow passing in custom LoggingAdapter" in { + val log = Logging(system, "com.example.ImportantLogger") + + Source.single(42).log("flow-5")(log).runWith(Sink.ignore) + + val src = "com.example.ImportantLogger(akka://FlowLogSpec)" + val clazz = classOf[DummyClassForStringSources] + logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Element: 42")) + logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Upstream finished.")) + } + + "allow configuring log levels via OperationAttributes" in { + val logAttrs = OperationAttributes.logLevels( + onElement = Logging.WarningLevel, + onFinish = Logging.InfoLevel, + onFailure = Logging.DebugLevel) + + Source.single(42) + .log("flow-6") + .withAttributes(OperationAttributes.logLevels( + onElement = Logging.WarningLevel, + onFinish = Logging.InfoLevel, + onFailure = Logging.DebugLevel)) + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Warning(LogSrc, LogClazz, "[flow-6] Element: 42")) + logProbe.expectMsg(Logging.Info(LogSrc, LogClazz, "[flow-6] Upstream finished.")) + + val cause = new TestException + Source.failed(cause) + .log("flow-6e") + .withAttributes(logAttrs) + .runWith(Sink.ignore) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-6e] Upstream failed, cause: FlowLogSpec$TestException: Boom!")) + } + } + + "on javadsl.Source" must { + "debug each element" in { + val log = Logging(system, "com.example.ImportantLogger") + + javadsl.Source.single[Integer](1) + .log("log-1") + .log("log-2", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i }) + .log("log-3", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i }, log) + .log("log-4", log) + .runWith(javadsl.Sink.ignore(), mat) + + var counter = 1 + import scala.concurrent.duration._ + logProbe.fishForMessage(3.seconds) { + case Logging.Debug(_, _, msg: String) if msg contains "Element: 1" ⇒ + counter += 1 + counter == 4 + + case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished" ⇒ + false + } + } + } + + } + + final class TestException extends RuntimeException("Boom!") with NoStackTrace + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 3c1aef5e2c..0f1b05744a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -11,8 +11,7 @@ import akka.stream.stage.Stage import scala.collection.immutable import scala.concurrent.duration._ import akka.actor._ -import akka.stream.ActorFlowMaterializerSettings -import akka.stream.ActorFlowMaterializer +import akka.stream.{ OperationAttributes, ActorFlowMaterializerSettings, ActorFlowMaterializer } import akka.stream.impl._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -45,7 +44,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece _settings: ActorFlowMaterializerSettings, _ops: Seq[Stage[_, _]], brokenMessage: Any) - extends ActorInterpreter(_settings, _ops, mat) { + extends ActorInterpreter(_settings, _ops, mat, OperationAttributes.none) { import akka.stream.actor.ActorSubscriberMessage._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index 3fc85000c8..c252842be4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -3,6 +3,9 @@ */ package akka.stream.scaladsl +import akka.actor.PoisonPill +import akka.stream.{ OperationAttributes, OverflowStrategy, ActorFlowMaterializer, ActorFlowMaterializerSettings } +import akka.stream.stage._ import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -12,7 +15,9 @@ import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.testkit.{ EventFilter, TestProbe } import com.typesafe.config.ConfigFactory -import akka.stream.stage._ + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { @@ -418,6 +423,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug val upsub = up.expectSubscription() upsub.expectCancellation() } + } } diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 02b60dbc16..0331d9fc99 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -37,7 +37,7 @@ akka { # Enable additional troubleshooting logging at DEBUG log level debug-logging = off - + # Maximum number of elements emitted in batch if downstream signals large demand output-burst-limit = 1000 } diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 87dcf61b6c..5c113d9fb8 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -11,7 +11,7 @@ import akka.stream.impl._ import com.typesafe.config.Config import scala.concurrent.duration._ -import akka.japi.{ function ⇒ japi } +import akka.japi.function object ActorFlowMaterializer { @@ -145,14 +145,16 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings - /** INTERNAL API */ - private[akka] def system: ActorSystem - /** * INTERNAL API: this might become public later */ private[akka] def actorOf(context: MaterializationContext, props: Props): ActorRef + /** + * INTERNAL API + */ + private[akka] def system: ActorSystem + } /** @@ -247,7 +249,7 @@ final case class ActorFlowMaterializerSettings( * overridden for specific flows of the stream operations with * [[akka.stream.OperationAttributes#supervisionStrategy]]. */ - def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { + def withSupervisionStrategy(decider: function.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { import Supervision._ copy(supervisionDecider = decider match { case `resumingDecider` ⇒ resumingDecider diff --git a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index fd7e89e5a0..e86aa034ba 100644 --- a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -3,13 +3,15 @@ */ package akka.stream +import akka.event.Logging + import scala.collection.immutable -import akka.japi.{ function ⇒ japi } import akka.stream.impl.Stages.StageModule +import akka.japi.function /** - * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] - * materialization. + * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]] + * or [[akka.stream.scaladsl.FlowGraph]] / [[akka.stream.javadsl.FlowGraph]] materialization. * * Note that more attributes for the [[ActorFlowMaterializer]] are defined in [[ActorOperationAttributes]]. */ @@ -96,6 +98,9 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio /** * INTERNAL API */ + private[akka] def logLevels: Option[LogLevels] = + attributes.collectFirst { case l: LogLevels ⇒ l } + private[akka] def transform(node: StageModule): StageModule = if ((this eq OperationAttributes.none) || (this eq node.attributes)) node else node.withAttributes(attributes = this and node.attributes) @@ -110,6 +115,11 @@ object OperationAttributes { trait Attribute final case class Name(n: String) extends Attribute final case class InputBuffer(initial: Int, max: Int) extends Attribute + final case class LogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) extends Attribute + object LogLevels { + /** Use to disable logging on certain operations when configuring [[OperationAttributes.LogLevels]] */ + final val Off: Logging.LogLevel = Logging.levelFor("off").get + } /** * INTERNAL API @@ -132,6 +142,30 @@ object OperationAttributes { */ def inputBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(InputBuffer(initial, max)) + /** + * Java API + * + * Configures `log()` stage log-levels to be used when logging. + * Logging a certain operation can be completely disabled by using [[LogLevels.Off]]. + * + * Passing in null as any of the arguments sets the level to its default value, which is: + * `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`. + */ + def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) = + logLevels( + onElement = Option(onElement).getOrElse(Logging.DebugLevel), + onFinish = Option(onFinish).getOrElse(Logging.DebugLevel), + onFailure = Option(onFailure).getOrElse(Logging.ErrorLevel)) + + /** + * Configures `log()` stage log-levels to be used when logging. + * Logging a certain operation can be completely disabled by using [[LogLevels.Off]]. + * + * See [[OperationAttributes.createLogLevels]] for Java API + */ + def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel) = + OperationAttributes(LogLevels(onElement, onFinish, onFailure)) + } /** @@ -157,6 +191,31 @@ object ActorOperationAttributes { /** * Java API: Decides how exceptions from application code are to be handled. */ - def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): OperationAttributes = + def withSupervisionStrategy(decider: function.Function[Throwable, Supervision.Directive]): OperationAttributes = ActorOperationAttributes.supervisionStrategy(decider.apply _) + + /** + * Java API + * + * Configures `log()` stage log-levels to be used when logging. + * Logging a certain operation can be completely disabled by using [[LogLevels.Off]]. + * + * Passing in null as any of the arguments sets the level to its default value, which is: + * `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`. + */ + def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) = + logLevels( + onElement = Option(onElement).getOrElse(Logging.DebugLevel), + onFinish = Option(onFinish).getOrElse(Logging.DebugLevel), + onFailure = Option(onFailure).getOrElse(Logging.ErrorLevel)) + + /** + * Configures `log()` stage log-levels to be used when logging. + * Logging a certain operation can be completely disabled by using [[LogLevels.Off]]. + * + * See [[OperationAttributes.createLogLevels]] for Java API + */ + def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel) = + OperationAttributes(LogLevels(onElement, onFinish, onFailure)) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index c83527908a..9026ffee96 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -53,6 +53,7 @@ private[akka] case class ActorFlowMaterializerImpl( case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max) case Dispatcher(dispatcher) ⇒ s.withDispatcher(dispatcher) case SupervisionStrategy(decider) ⇒ s.withSupervisionStrategy(decider) + case l: LogLevels ⇒ s case Name(_) ⇒ s } } @@ -286,30 +287,31 @@ private[akka] object ActorProcessorFactory { // Also, otherwise the attributes will not affect the settings properly! val settings = materializer.effectiveSettings(att) op match { - case Identity(_) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(_identity, settings.supervisionDecider)), materializer), ()) - case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops, materializer), ()) - case Map(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer), ()) - case Filter(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer), ()) - case Drop(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer), ()) - case Take(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer), ()) - case Collect(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer), ()) - case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer), ()) - case Expand(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer), ()) - case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer), ()) - case Buffer(n, s, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer), ()) - case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer), ()) - case MapAsync(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer), ()) - case MapAsyncUnordered(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer), ()) - case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer), ()) + case Identity(_) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(_identity, settings.supervisionDecider)), materializer, att), ()) + case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops, materializer, att), ()) + case Map(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer, att), ()) + case Filter(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer, att), ()) + case Drop(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer, att), ()) + case Take(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer, att), ()) + case Collect(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer, att), ()) + case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ()) + case Expand(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ()) + case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ()) + case Buffer(n, s, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ()) + case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ()) + case MapAsync(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ()) + case MapAsyncUnordered(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ()) + case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ()) + case Log(n, e, l, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case SplitWhen(p, _) ⇒ (SplitWhenProcessorImpl.props(settings, p), ()) case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) - case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer), ()) + case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer, att), ()) case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) case MaterializingStageFactory(mkStageAndMat, _) ⇒ val sm = mkStageAndMat() - (ActorInterpreter.props(settings, List(sm._1), materializer), sm._2) + (ActorInterpreter.props(settings, List(sm._1), materializer, att), sm._2) case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 3edb24b4e2..138cfa3522 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl -import akka.event.Logging +import akka.event.{ LoggingAdapter, Logging } import akka.stream.{ OverflowStrategy, TimerTransformer } import akka.stream.OperationAttributes import akka.stream.OperationAttributes._ @@ -132,6 +132,11 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } + final case class Log(name: String, extract: Any ⇒ Any, loggingAdapter: Option[LoggingAdapter], attributes: OperationAttributes = map) extends StageModule { + def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) + override protected def newInstance: StageModule = this.copy() + } + final case class Filter(p: Any ⇒ Boolean, attributes: OperationAttributes = filter) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 9e29b12400..e017d5f1f9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -5,17 +5,16 @@ package akka.stream.impl.fusing import java.util.Arrays import akka.actor.{ Actor, ActorRef } -import akka.event.Logging import akka.stream.ActorFlowMaterializerSettings import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.impl._ +import akka.stream.OperationAttributes import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } -import scala.util.control.NonFatal import akka.actor.Props import akka.actor.ActorLogging -import akka.event.LoggingAdapter +import akka.event.{ Logging, LoggingAdapter } import akka.actor.DeadLetterSuppression import akka.stream.ActorFlowMaterializer @@ -302,8 +301,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, * INTERNAL API */ private[akka] object ActorInterpreter { - def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]], materializer: ActorFlowMaterializer): Props = - Props(new ActorInterpreter(settings, ops, materializer)) + def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]], materializer: ActorFlowMaterializer, attributes: OperationAttributes = OperationAttributes.none): Props = + Props(new ActorInterpreter(settings, ops, materializer, attributes)) case class AsyncInput(op: AsyncStage[Any, Any, Any], ctx: AsyncContext[Any, Any], event: Any) extends DeadLetterSuppression } @@ -311,7 +310,7 @@ private[akka] object ActorInterpreter { /** * INTERNAL API */ -private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings, val ops: Seq[Stage[_, _]], val materializer: ActorFlowMaterializer) +private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings, val ops: Seq[Stage[_, _]], val materializer: ActorFlowMaterializer, val attributes: OperationAttributes) extends Actor with ActorLogging { import ActorInterpreter._ @@ -321,15 +320,20 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings new OneBoundedInterpreter(upstream +: ops :+ downstream, (op, ctx, event) ⇒ self ! AsyncInput(op, ctx, event), materializer, + attributes, name = context.self.path.toString) + interpreter.init() - def receive: Receive = upstream.subreceive.orElse[Any, Unit](downstream.subreceive).orElse[Any, Unit] { - case AsyncInput(op, ctx, event) ⇒ - ctx.enter() - op.onAsyncInput(event, ctx) - ctx.execute() - } + def receive: Receive = + upstream.subreceive + .orElse[Any, Unit](downstream.subreceive) + .orElse[Any, Unit] { + case AsyncInput(op, ctx, event) ⇒ + ctx.enter() + op.onAsyncInput(event, ctx) + ctx.execute() + } override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { super.aroundReceive(receive, msg) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index ac5c9e0785..26e25412f3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -3,13 +3,14 @@ */ package akka.stream.impl.fusing -import scala.annotation.{ tailrec, switch } +import akka.stream.{ FlowMaterializer, Supervision } +import akka.stream.impl.ReactiveStreamsCompliance +import akka.stream.OperationAttributes +import akka.stream.stage._ + +import scala.annotation.{ switch, tailrec } import scala.collection.breakOut import scala.util.control.NonFatal -import akka.stream.stage._ -import akka.stream.Supervision -import akka.stream.impl.ReactiveStreamsCompliance -import akka.stream.FlowMaterializer // TODO: // fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions) @@ -124,11 +125,12 @@ private[akka] object OneBoundedInterpreter { private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) ⇒ Unit, materializer: FlowMaterializer, + attributes: OperationAttributes = OperationAttributes.none, val forkLimit: Int = 100, val overflowToHeap: Boolean = true, val name: String = "") { - import OneBoundedInterpreter._ import AbstractStage._ + import OneBoundedInterpreter._ type UntypedOp = AbstractStage[Any, Any, Directive, Directive, Context[Any]] require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op") @@ -301,7 +303,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], mustHave(DownstreamBall) } removeBits(DownstreamBall | PrecedingWasPull) - pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + finishCurrentOp() // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // path. Other forks are not order dependent because they execute on isolated execution domains which cannot // "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream @@ -380,6 +382,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def materializer: FlowMaterializer = OneBoundedInterpreter.this.materializer + override def attributes: OperationAttributes = OneBoundedInterpreter.this.attributes } private final val Pushing: State = new State { @@ -430,7 +433,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private final val Completing: State = new State { override def advance(): Unit = { elementInFlight = null - pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + finishCurrentOp() activeOpIndex += 1 } @@ -463,7 +466,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private final val Cancelling: State = new State { override def advance(): Unit = { elementInFlight = null - pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + finishCurrentOp() activeOpIndex -= 1 } @@ -485,7 +488,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private final case class Failing(cause: Throwable) extends State { override def advance(): Unit = { elementInFlight = null - pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + finishCurrentOp() activeOpIndex += 1 } @@ -630,16 +633,22 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], (pipeline(op): Any) match { case b: BoundaryStage ⇒ b.context = new EntryState("boundary", op) + case a: AsyncStage[Any, Any, Any] @unchecked ⇒ a.context = new EntryState("async", op) activeOpIndex = op - a.initAsyncInput(a.context) + a.initAsyncInput(a.context) // TODO remove asyncInput? it's like preStart + case _ ⇒ } op += 1 } } + private def finishCurrentOp(): Unit = { + pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + } + /** * Starts execution of detached regions. * diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index cd59892e5b..9a5a01dc7b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -3,16 +3,23 @@ */ package akka.stream.impl.fusing -import scala.collection.immutable +import akka.event.Logging.LogLevel +import akka.event.Logging +import akka.event.LoggingAdapter +import akka.stream.OperationAttributes.LogLevels import akka.stream.impl.FixedSizeBuffer -import akka.stream.stage._ -import akka.stream._ -import akka.stream.Supervision -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Try, Success, Failure } -import scala.annotation.tailrec -import scala.util.control.NonFatal import akka.stream.impl.ReactiveStreamsCompliance +import akka.stream.stage._ +import akka.stream.Supervision +import akka.stream._ + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.Future +import scala.util.control.NonFatal +import scala.util.Failure +import scala.util.Success +import scala.util.Try /** * INTERNAL API @@ -42,7 +49,9 @@ private[akka] final object Collect { } private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] { + import Collect.NotApplied + override def onPush(elem: In, ctx: Context[Out]): SyncDirective = pf.applyOrElse(elem, NotApplied) match { case NotApplied ⇒ ctx.pull() @@ -96,6 +105,7 @@ private[akka] final case class Take[T](count: Long) extends PushStage[T, T] { */ private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] { private var left: Long = count + override def onPush(elem: T, ctx: Context[T]): SyncDirective = if (left > 0) { left -= 1 @@ -196,6 +206,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut * INTERNAL API */ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] { + import OverflowStrategy._ private val buffer = FixedSizeBuffer[T](size) @@ -256,6 +267,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt */ private[akka] final case class Completed[T]() extends PushPullStage[T, T] { override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish() + override def onPull(ctx: Context[T]): SyncDirective = ctx.finish() } @@ -361,6 +373,7 @@ private[akka] object MapAsync { */ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out], decider: Supervision.Decider) extends AsyncStage[In, Out, (Int, Try[Out])] { + import MapAsync._ type Notification = (Int, Try[Out]) @@ -501,3 +514,71 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I if (todo > 0) ctx.absorbTermination() else ctx.finish() } + +/** + * INTERNAL API + */ +private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapter: Option[LoggingAdapter]) extends PushStage[T, T] { + + import Log._ + + private var logLevels: LogLevels = _ + private var log: LoggingAdapter = _ + + // TODO implement as real preStart once https://github.com/akka/akka/pull/17295 is done + def preStart(ctx: Context[T]): Unit = { + logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels) + log = logAdapter getOrElse { + val sys = ctx.materializer.asInstanceOf[ActorFlowMaterializer].system + Logging(sys, DefaultLoggerName) + } + } + + override def onPush(elem: T, ctx: Context[T]): SyncDirective = { + if (log == null) preStart(ctx) + if (isEnabled(logLevels.onElement)) + log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem)) + + ctx.push(elem) + } + + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { + if (log == null) preStart(ctx) + if (isEnabled(logLevels.onFailure)) + logLevels.onFailure match { + case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name) + case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage) + } + + super.onUpstreamFailure(cause, ctx) + } + + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + if (log == null) preStart(ctx) + if (isEnabled(logLevels.onFinish)) + log.log(logLevels.onFinish, "[{}] Upstream finished.", name) + + super.onUpstreamFinish(ctx) + } + + override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { + if (log == null) preStart(ctx) + if (isEnabled(logLevels.onFinish)) + log.log(logLevels.onFinish, "[{}] Downstream finished.", name) + + super.onDownstreamFinish(ctx) + } + + private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt + +} + +/** + * INTERNAL API + */ +private[akka] object Log { + private final val DefaultLoggerName = "akka.stream.Log" + + private final val OffInt = LogLevels.Off.asInt + private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 88dd171ad1..ce5121225a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -3,6 +3,7 @@ */ package akka.stream.javadsl +import akka.event.LoggingAdapter import akka.stream._ import akka.japi.{ Util, Pair } import akka.japi.function @@ -11,7 +12,7 @@ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.stream.stage.Stage -import akka.stream.impl.StreamLayout +import akka.stream.impl.{ Stages, StreamLayout } object Flow { @@ -156,8 +157,6 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels - * - * */ def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.map(f.apply)) @@ -593,6 +592,81 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph override def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses the given [[LoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.log(name, e ⇒ extract.apply(e))(log)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, extract: function.Function[Out, Any]): javadsl.Flow[In, Out, Mat] = + this.log(name, extract, null) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * Uses the given [[LoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] = + this.log(name, javaIdentityFunction[Out], log) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow. + * + * Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + */ + def log(name: String): javadsl.Flow[In, Out, Mat] = + this.log(name, javaIdentityFunction[Out], null) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index d706af778b..f92290a163 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,6 +8,7 @@ import akka.japi.function import scala.collection.immutable import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } +import akka.event.LoggingAdapter import akka.japi.Util import akka.stream.OperationAttributes._ import akka.stream._ @@ -524,4 +525,86 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses the given [[LoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Source[Out, Mat] = + new Source(delegate.log(name, e ⇒ extract.apply(e))(log)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, extract: function.Function[Out, Any]): javadsl.Source[Out, Mat] = + this.log(name, extract, null) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * Uses the given [[LoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] = + this.log(name, javaIdentityFunction[Out], log) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String): javadsl.Source[Out, Mat] = + this.log(name, javaIdentityFunction[Out], null) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala index d13bc2105e..6d38905c95 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala @@ -3,8 +3,17 @@ */ package akka.stream +import akka.japi.function.Function + package object javadsl { + val JavaIdentityFunction = new Function[Any, Any] { + @throws(classOf[Exception]) + override def apply(param: Any): Any = param + } + + def javaIdentityFunction[T] = JavaIdentityFunction.asInstanceOf[Function[T, T]] + def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) ⇒ M = f match { case s: Function2[_, _, _] ⇒ s.asInstanceOf[(M1, M2) ⇒ M] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3ef4f7d3f6..37ee2e5a2e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,12 +3,15 @@ */ package akka.stream.scaladsl +import akka.actor.ActorSystem +import akka.event.LoggingAdapter import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream._ import akka.stream.OperationAttributes._ import akka.util.Collections.EmptyImmutableSeq import org.reactivestreams.Processor +import scala.annotation.implicitNotFound import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -340,6 +343,8 @@ trait FlowOps[+Out, +Mat] { import FlowOps._ type Repr[+O, +M] <: FlowOps[O, M] + private final val _identity = (x: Any) ⇒ x + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. @@ -849,6 +854,26 @@ trait FlowOps[+Out, +Mat] { private[akka] def timerTransform[U](mkStage: () ⇒ TimerTransformer[Out, U]): Repr[U, Mat] = andThen(TimerTransform(mkStage.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow: + * + * Uses implicit [[LoggingAdapter]] if available, otherwise uses an internally created one, + * which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def log(name: String, extract: Out ⇒ Any = _identity)(implicit log: LoggingAdapter = null): Repr[Out, Mat] = + andThen(Stages.Log(name, extract.asInstanceOf[Any ⇒ Any], Option(log))) + def withAttributes(attr: OperationAttributes): Repr[Out, Mat] /** INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 6b17b96746..ec027ffd14 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -3,8 +3,7 @@ */ package akka.stream.stage -import akka.stream.Supervision -import akka.stream.FlowMaterializer +import akka.stream.{ FlowMaterializer, OperationAttributes, Supervision } /** * General interface for stream transformation. @@ -571,6 +570,9 @@ sealed trait Context[Out] { * It can be used to materialize sub-flows. */ def materializer: FlowMaterializer + + /** Returns operation attributes associated with the this Stage */ + def attributes: OperationAttributes } /** @@ -643,3 +645,4 @@ trait AsyncContext[Out, Ext] extends DetachedContext[Out] { private[akka] trait BoundaryContext extends Context[Any] { def exit(): FreeDirective } +