diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index c1724b0854..ebcd16fa71 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -159,10 +159,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[In, Out] = fusing.Map(f, supervision(attr)) } - final case class Log[T](name: String, extract: T ⇒ Any, loggingAdapter: Option[LoggingAdapter], attributes: Attributes = log) extends SymbolicStage[T, T] { - override def create(attr: Attributes): Stage[T, T] = fusing.Log(name, extract, loggingAdapter, supervision(attr)) - } - final case class Grouped[T](n: Int, attributes: Attributes = grouped) extends SymbolicStage[T, immutable.Seq[T]] { require(n > 0, "n must be greater than 0") override def create(attr: Attributes): Stage[T, immutable.Seq[T]] = fusing.Grouped(n) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4781dc1422..30cd433fb5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -41,8 +41,6 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearG override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler with InHandler { - override def toString = "FilterLogic" - def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) override def onPush(): Unit = { @@ -906,67 +904,84 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I /** * INTERNAL API */ -private[akka] final case class Log[T](name: String, extract: T ⇒ Any, - logAdapter: Option[LoggingAdapter], - decider: Supervision.Decider) extends PushStage[T, T] { +private[akka] final case class Log[T]( + name: String, + extract: T ⇒ Any, + logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] { - import Log._ - - private var logLevels: LogLevels = _ - private var log: LoggingAdapter = _ + override def toString = "Log" // TODO more optimisations can be done here - prepare logOnPush function etc + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler with InHandler { + import Log._ - override def preStart(ctx: LifecycleContext): Unit = { - logLevels = ctx.attributes.get[LogLevels](DefaultLogLevels) - log = logAdapter match { - case Some(l) ⇒ l - case _ ⇒ - val mat = try ActorMaterializerHelper.downcast(ctx.materializer) - catch { - case ex: Exception ⇒ - throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " + - "Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex) + private var logLevels: LogLevels = _ + private var log: LoggingAdapter = _ + + def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + override def preStart(): Unit = { + logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels) + log = logAdapter match { + case Some(l) ⇒ l + case _ ⇒ + val mat = try ActorMaterializerHelper.downcast(materializer) + catch { + case ex: Exception ⇒ + throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " + + "Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex) + } + + Logging(mat.system, mat)(fromMaterializer) } - - Logging(mat.system, ctx)(fromLifecycleContext) - } - } - - override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - if (isEnabled(logLevels.onElement)) - log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem)) - - ctx.push(elem) - } - - override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - if (isEnabled(logLevels.onFailure)) - logLevels.onFailure match { - case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name) - case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage) } - super.onUpstreamFailure(cause, ctx) - } + override def onPush(): Unit = { + try { + val elem = grab(in) + if (isEnabled(logLevels.onElement)) + log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem)) - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - if (isEnabled(logLevels.onFinish)) - log.log(logLevels.onFinish, "[{}] Upstream finished.", name) + push(out, elem) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case _ ⇒ pull(in) + } + } + } - super.onUpstreamFinish(ctx) - } + override def onPull(): Unit = pull(in) - override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { - if (isEnabled(logLevels.onFinish)) - log.log(logLevels.onFinish, "[{}] Downstream finished.", name) + override def onUpstreamFailure(cause: Throwable): Unit = { + if (isEnabled(logLevels.onFailure)) + logLevels.onFailure match { + case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name) + case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage) + } - super.onDownstreamFinish(ctx) - } + super.onUpstreamFailure(cause) + } - private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt + override def onUpstreamFinish(): Unit = { + if (isEnabled(logLevels.onFinish)) + log.log(logLevels.onFinish, "[{}] Upstream finished.", name) - override def decide(t: Throwable): Supervision.Directive = decider(t) + super.onUpstreamFinish() + } + + override def onDownstreamFinish(): Unit = { + if (isEnabled(logLevels.onFinish)) + log.log(logLevels.onFinish, "[{}] Downstream finished.", name) + + super.onDownstreamFinish() + } + + private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt + + setHandlers(in, out, this) + } } /** @@ -975,16 +990,16 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, private[akka] object Log { /** - * Must be located here to be visible for implicit resolution, when LifecycleContext is passed to [[Logging]] + * Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]] * More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source. */ - final val fromLifecycleContext = new LogSource[LifecycleContext] { + final val fromMaterializer = new LogSource[Materializer] { // do not expose private context classes (of OneBoundedInterpreter) - override def getClazz(t: LifecycleContext): Class[_] = classOf[Materializer] + override def getClazz(t: Materializer): Class[_] = classOf[Materializer] - override def genString(t: LifecycleContext): String = { - try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t.materializer).supervisor.path})" + override def genString(t: Materializer): String = { + try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})" catch { case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 12ca3f4e73..a2636d4bcb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1584,7 +1584,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def log(name: String, extract: Out ⇒ Any = ConstantFun.scalaIdentityFunction)(implicit log: LoggingAdapter = null): Repr[Out] = - andThen(Stages.Log(name, extract.asInstanceOf[Any ⇒ Any], Option(log))) + via(Log(name, extract.asInstanceOf[Any ⇒ Any], Option(log))) /** * Combine the elements of current flow and the given [[Source]] into a stream of tuples.