Adds log operator to Source/FlowWithContext (#26386)

This commit is contained in:
Luc Bourlier 2019-02-17 21:00:02 +01:00 committed by Christopher Batey
parent d3836aecfb
commit f85e0e7c62
5 changed files with 190 additions and 0 deletions

View file

@ -0,0 +1,106 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.event.Logging
import akka.stream.Attributes.LogLevels
import akka.stream.testkit.{ StreamSpec, ScriptedTest }
import akka.stream._
import akka.testkit.TestProbe
class FlowWithContextLogSpec extends StreamSpec("""
akka.loglevel = DEBUG # test verifies logging
akka.actor.serialize-messages = off
""") with ScriptedTest {
implicit val mat: Materializer = ActorMaterializer()
val logProbe = {
val p = TestProbe()
system.eventStream.subscribe(p.ref, classOf[Logging.LogEvent])
p
}
"log() from FlowWithContextOps" must {
val supervisorPath = ActorMaterializerHelper.downcast(mat).supervisor.path
val LogSrc = s"akka.stream.Log($supervisorPath)"
val LogClazz = classOf[Materializer]
"on FlowWithContext" must {
"log each element" in {
val logging = FlowWithContext[Long, Message].log("my-log")
Source(List(Message("a", 1L), Message("b", 2L)))
.startContextPropagation(m m.offset)
.via(logging)
.endContextPropagation
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(a,1)"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(b,2)"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Upstream finished."))
}
"allow extracting value to be logged" in {
val logging = FlowWithContext[Long, Message].log("my-log2", m m.data)
Source(List(Message("a", 1L)))
.startContextPropagation(m m.offset)
.via(logging)
.endContextPropagation
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Element: a"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Upstream finished."))
}
"allow disabling element logging" in {
val disableElementLogging = Attributes.logLevels(
onElement = LogLevels.Off,
onFinish = Logging.DebugLevel,
onFailure = Logging.DebugLevel)
val logging = FlowWithContext[Long, Message].log("my-log3")
Source(List(Message("a", 1L), Message("b", 2L)))
.startContextPropagation(m m.offset)
.via(logging)
.endContextPropagation
.withAttributes(disableElementLogging)
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log3] Upstream finished."))
}
}
"on SourceWithContext" must {
"log each element" in {
Source(List(Message("a", 1L), Message("b", 2L)))
.startContextPropagation(m m.offset)
.log("my-log4")
.endContextPropagation
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(a,1)"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(b,2)"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Upstream finished."))
}
"allow extracting value to be logged" in {
Source(List(Message("a", 1L)))
.startContextPropagation(m m.offset)
.log("my-log5", m m.data)
.endContextPropagation
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Element: a"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Upstream finished."))
}
}
}
}

View file

@ -0,0 +1,4 @@
# FlowWithContextOps.log() #26386
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$3")

View file

@ -7,6 +7,8 @@ package akka.stream.javadsl
import akka.annotation.ApiMayChange
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.event.LoggingAdapter
import akka.util.ConstantFun
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
@ -198,6 +200,38 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl
elem Util.immutableSeq(fun(elem))
})
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
viaScala(_.log(name, e extract.apply(e))(log))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String, extract: function.Function[Out, Any]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
this.log(name, extract, null)
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String, log: LoggingAdapter): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate
private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] =

View file

@ -7,6 +7,8 @@ package akka.stream.javadsl
import akka.annotation.ApiMayChange
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.event.LoggingAdapter
import akka.util.ConstantFun
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
@ -179,6 +181,38 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon
elem Util.immutableSeq(fun(elem))
})
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.log]].
*
* @see [[akka.stream.javadsl.Source.log]]
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Ctx, Out, Mat] =
viaScala(_.log(name, e extract.apply(e))(log))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String, extract: function.Function[Out, Any]): SourceWithContext[Ctx, Out, Mat] =
this.log(name, extract, null)
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String, log: LoggingAdapter): SourceWithContext[Ctx, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*
* @see [[akka.stream.javadsl.Flow.log]]
*/
def log(name: String): SourceWithContext[Ctx, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate
private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] =

View file

@ -12,6 +12,8 @@ import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.stream._
import akka.util.ConstantFun
import akka.event.LoggingAdapter
/**
* Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context
@ -210,5 +212,15 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] {
def mapContext[Ctx2](f: Ctx Ctx2): Repr[Ctx2, Out] =
via(flow.map { case (e, ctx) (e, f(ctx)) })
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.log]].
*
* @see [[akka.stream.scaladsl.FlowOps.log]]
*/
def log(name: String, extract: Out Any = ConstantFun.scalaIdentityFunction)(implicit log: LoggingAdapter = null): Repr[Ctx, Out] = {
val extractWithContext: ((Out, Ctx)) Any = { case (e, _) extract(e) }
via(flow.log(name, extractWithContext)(log))
}
private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)]
}