Replace PushStage-based Log with GraphStage (part of #19834) (#20788)

* Replace PushStage-based Log with GraphStage (part of #19834)

* Remove LogLogic and FilterLogic toStrings
This commit is contained in:
Michał Płachta 2016-06-22 16:36:18 +02:00 committed by Patrik Nordwall
parent e39255cef0
commit 00c5f49c7a
3 changed files with 71 additions and 60 deletions

View file

@ -41,8 +41,6 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearG
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
override def toString = "FilterLogic"
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPush(): Unit = {
@ -906,67 +904,84 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
/**
* INTERNAL API
*/
private[akka] final case class Log[T](name: String, extract: T Any,
logAdapter: Option[LoggingAdapter],
decider: Supervision.Decider) extends PushStage[T, T] {
private[akka] final case class Log[T](
name: String,
extract: T Any,
logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] {
import Log._
private var logLevels: LogLevels = _
private var log: LoggingAdapter = _
override def toString = "Log"
// TODO more optimisations can be done here - prepare logOnPush function etc
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
import Log._
override def preStart(ctx: LifecycleContext): Unit = {
logLevels = ctx.attributes.get[LogLevels](DefaultLogLevels)
log = logAdapter match {
case Some(l) l
case _
val mat = try ActorMaterializerHelper.downcast(ctx.materializer)
catch {
case ex: Exception
throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " +
"Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex)
private var logLevels: LogLevels = _
private var log: LoggingAdapter = _
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def preStart(): Unit = {
logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels)
log = logAdapter match {
case Some(l) l
case _
val mat = try ActorMaterializerHelper.downcast(materializer)
catch {
case ex: Exception
throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " +
"Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex)
}
Logging(mat.system, mat)(fromMaterializer)
}
Logging(mat.system, ctx)(fromLifecycleContext)
}
}
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (isEnabled(logLevels.onElement))
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
ctx.push(elem)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
if (isEnabled(logLevels.onFailure))
logLevels.onFailure match {
case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name)
case level log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
}
super.onUpstreamFailure(cause, ctx)
}
override def onPush(): Unit = {
try {
val elem = grab(in)
if (isEnabled(logLevels.onElement))
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
push(out, elem)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
super.onUpstreamFinish(ctx)
}
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Downstream finished.", name)
override def onUpstreamFailure(cause: Throwable): Unit = {
if (isEnabled(logLevels.onFailure))
logLevels.onFailure match {
case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name)
case level log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
}
super.onDownstreamFinish(ctx)
}
super.onUpstreamFailure(cause)
}
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
override def onUpstreamFinish(): Unit = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
override def decide(t: Throwable): Supervision.Directive = decider(t)
super.onUpstreamFinish()
}
override def onDownstreamFinish(): Unit = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Downstream finished.", name)
super.onDownstreamFinish()
}
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
setHandlers(in, out, this)
}
}
/**
@ -975,16 +990,16 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any,
private[akka] object Log {
/**
* Must be located here to be visible for implicit resolution, when LifecycleContext is passed to [[Logging]]
* Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]]
* More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
*/
final val fromLifecycleContext = new LogSource[LifecycleContext] {
final val fromMaterializer = new LogSource[Materializer] {
// do not expose private context classes (of OneBoundedInterpreter)
override def getClazz(t: LifecycleContext): Class[_] = classOf[Materializer]
override def getClazz(t: Materializer): Class[_] = classOf[Materializer]
override def genString(t: LifecycleContext): String = {
try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t.materializer).supervisor.path})"
override def genString(t: Materializer): String = {
try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})"
catch {
case ex: Exception LogSource.fromString.genString(DefaultLoggerName)
}