Add LogWithMarker to Akka Stream #28450

This commit is contained in:
Johan Andrén 2020-03-05 15:05:05 +01:00 committed by GitHub
parent f1dbb79b71
commit c46861ed26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 937 additions and 9 deletions

View file

@ -10,7 +10,7 @@ import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.ExecutionContexts
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.event.{ LogMarker, LogSource, Logging, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
@ -1573,6 +1573,126 @@ private[stream] object Collect {
LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class LogWithMarker[T](
name: String,
marker: T => LogMarker,
extract: T => Any,
logAdapter: Option[MarkerLoggingAdapter])
extends SimpleLinearGraphStage[T] {
override def toString = "LogWithMarker"
// TODO more optimisations can be done here - prepare logOnPush function etc
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
import LogWithMarker._
private var logLevels: LogLevels = _
private var log: MarkerLoggingAdapter = _
def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
override def preStart(): Unit = {
logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels)
log = logAdapter match {
case Some(l) => l
case _ =>
Logging.withMarker(materializer.system, materializer)(fromMaterializer)
}
}
override def onPush(): Unit = {
try {
val elem = grab(in)
if (isEnabled(logLevels.onElement))
log.log(marker(elem), logLevels.onElement, log.format("[{}] Element: {}", name, extract(elem)))
push(out, elem)
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case _ => pull(in)
}
}
}
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
if (isEnabled(logLevels.onFailure))
logLevels.onFailure match {
case Logging.ErrorLevel => log.error(cause, "[{}] Upstream failed.", name)
case level =>
log.log(
level,
"[{}] Upstream failed, cause: {}: {}",
name,
Logging.simpleName(cause.getClass),
cause.getMessage)
}
super.onUpstreamFailure(cause)
}
override def onUpstreamFinish(): Unit = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
super.onUpstreamFinish()
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (isEnabled(logLevels.onFinish))
log.log(
logLevels.onFinish,
"[{}] Downstream finished, cause: {}: {}",
name,
Logging.simpleName(cause.getClass),
cause.getMessage)
super.onDownstreamFinish(cause: Throwable)
}
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object LogWithMarker {
/**
* Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]]
* More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
*/
final val fromMaterializer = new LogSource[Materializer] {
// do not expose private context classes (of OneBoundedInterpreter)
override def getClazz(t: Materializer): Class[_] = classOf[Materializer]
override def genString(t: Materializer): String = {
try s"$DefaultLoggerName(${t.supervisor.path})"
catch {
case _: Exception => LogSource.fromString.genString(DefaultLoggerName)
}
}
}
private final val DefaultLoggerName = "akka.stream.LogWithMarker"
private final val OffInt = LogLevels.Off.asInt
private final val DefaultLogLevels =
LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
}
/**
* INTERNAL API
*/