diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala new file mode 100644 index 0000000000..e6d68c299f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.event.Logging +import akka.stream.Attributes.LogLevels +import akka.stream.testkit.{ StreamSpec, ScriptedTest } +import akka.stream._ +import akka.testkit.TestProbe + +class FlowWithContextLogSpec extends StreamSpec(""" + akka.loglevel = DEBUG # test verifies logging + akka.actor.serialize-messages = off + """) with ScriptedTest { + + implicit val mat: Materializer = ActorMaterializer() + + val logProbe = { + val p = TestProbe() + system.eventStream.subscribe(p.ref, classOf[Logging.LogEvent]) + p + } + + "log() from FlowWithContextOps" must { + + val supervisorPath = ActorMaterializerHelper.downcast(mat).supervisor.path + val LogSrc = s"akka.stream.Log($supervisorPath)" + val LogClazz = classOf[Materializer] + + "on FlowWithContext" must { + + "log each element" in { + val logging = FlowWithContext[Long, Message].log("my-log") + Source(List(Message("a", 1L), Message("b", 2L))) + .startContextPropagation(m ⇒ m.offset) + .via(logging) + .endContextPropagation + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(a,1)")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(b,2)")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Upstream finished.")) + } + + "allow extracting value to be logged" in { + val logging = FlowWithContext[Long, Message].log("my-log2", m ⇒ m.data) + Source(List(Message("a", 1L))) + .startContextPropagation(m ⇒ m.offset) + .via(logging) + .endContextPropagation + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Element: a")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Upstream finished.")) + } + + "allow disabling element logging" in { + val disableElementLogging = Attributes.logLevels( + onElement = LogLevels.Off, + onFinish = Logging.DebugLevel, + onFailure = Logging.DebugLevel) + + val logging = FlowWithContext[Long, Message].log("my-log3") + Source(List(Message("a", 1L), Message("b", 2L))) + .startContextPropagation(m ⇒ m.offset) + .via(logging) + .endContextPropagation + .withAttributes(disableElementLogging) + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log3] Upstream finished.")) + } + + } + + "on SourceWithContext" must { + + "log each element" in { + Source(List(Message("a", 1L), Message("b", 2L))) + .startContextPropagation(m ⇒ m.offset) + .log("my-log4") + .endContextPropagation + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(a,1)")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(b,2)")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Upstream finished.")) + } + + "allow extracting value to be logged" in { + Source(List(Message("a", 1L))) + .startContextPropagation(m ⇒ m.offset) + .log("my-log5", m ⇒ m.data) + .endContextPropagation + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Element: a")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Upstream finished.")) + } + + } + } + +} diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes new file mode 100644 index 0000000000..2bebd64ae3 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -0,0 +1,4 @@ +# FlowWithContextOps.log() #26386 +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$3") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index d3e13ea619..64a456e9ab 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -7,6 +7,8 @@ package akka.stream.javadsl import akka.annotation.ApiMayChange import akka.japi.{ Pair, Util, function } import akka.stream._ +import akka.event.LoggingAdapter +import akka.util.ConstantFun import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ @@ -198,6 +200,38 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl elem ⇒ Util.immutableSeq(fun(elem)) }) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + viaScala(_.log(name, e ⇒ extract.apply(e))(log)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String, extract: function.Function[Out, Any]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + this.log(name, extract, null) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String, log: LoggingAdapter): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + this.log(name, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + this.log(name, ConstantFun.javaIdentityFunction[Out], null) + def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] ⇒ scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index b9387c945c..6ddd66ace0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -7,6 +7,8 @@ package akka.stream.javadsl import akka.annotation.ApiMayChange import akka.japi.{ Pair, Util, function } import akka.stream._ +import akka.event.LoggingAdapter +import akka.util.ConstantFun import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ @@ -179,6 +181,38 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon elem ⇒ Util.immutableSeq(fun(elem)) }) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.log]]. + * + * @see [[akka.stream.javadsl.Source.log]] + */ + def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Ctx, Out, Mat] = + viaScala(_.log(name, e ⇒ extract.apply(e))(log)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String, extract: function.Function[Out, Any]): SourceWithContext[Ctx, Out, Mat] = + this.log(name, extract, null) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String, log: LoggingAdapter): SourceWithContext[Ctx, Out, Mat] = + this.log(name, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. + * + * @see [[akka.stream.javadsl.Flow.log]] + */ + def log(name: String): SourceWithContext[Ctx, Out, Mat] = + this.log(name, ConstantFun.javaIdentityFunction[Out], null) + def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] ⇒ scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index b8dbaa9795..cfa12f63cb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -12,6 +12,8 @@ import akka.NotUsed import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.stream._ +import akka.util.ConstantFun +import akka.event.LoggingAdapter /** * Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context @@ -210,5 +212,15 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] = via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) }) + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.log]]. + * + * @see [[akka.stream.scaladsl.FlowOps.log]] + */ + def log(name: String, extract: Out ⇒ Any = ConstantFun.scalaIdentityFunction)(implicit log: LoggingAdapter = null): Repr[Ctx, Out] = { + val extractWithContext: ((Out, Ctx)) ⇒ Any = { case (e, _) ⇒ extract(e) } + via(flow.log(name, extractWithContext)(log)) + } + private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] }