diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index 271f312bc1..f6a1a34d60 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -282,7 +282,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S }) .runWith(Sink.ignore) } - ex.getMessage should startWith("No handler defined in stage [stage-name] for in port [in") + ex.getMessage should startWith("No handler defined in stage [").and(include("] for in port [in")) } "give a good error message if out handler missing" in { @@ -308,7 +308,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S .map(_ => "whatever") .runWith(Sink.ignore) } - ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out")) } "give a good error message if out handler missing with downstream boundary" in { @@ -332,7 +332,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S }) .runWith(Sink.ignore.async) } - ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out")) } "give a good error message if handler missing with downstream publisher" in { @@ -357,7 +357,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S }) .runWith(Sink.ignore) } - ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out")) } "give a good error message if handler missing when stage is an island" in { @@ -382,7 +382,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S .async .runWith(Sink.ignore) } - ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out") + ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out")) } "give a good error message if sub source is pushed twice" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala index fc04dcc7ea..f06fa19d58 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala @@ -46,7 +46,7 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - source.runWith(Sink.head).futureValue shouldBe Some("my-name") + source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "propagate attributes when nested" in { @@ -58,7 +58,7 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - source.runWith(Sink.head).futureValue shouldBe Some("my-name") + source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "handle factory failure" in { @@ -120,7 +120,7 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name") + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "propagate attributes when nested" in { @@ -132,7 +132,7 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name") + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "handle factory failure" in { @@ -192,7 +192,7 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name") + Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("setup-my-name") } "propagate attributes when nested" in { @@ -204,7 +204,7 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name") + Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("setup-my-name") } "handle factory failure" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala index 62390cfe15..a2d87ae29c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala @@ -49,7 +49,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - source.runWith(Sink.head).futureValue shouldBe Some("my-name") + source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "propagate attributes when nested" in { @@ -61,7 +61,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - source.runWith(Sink.head).futureValue shouldBe Some("my-name") + source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "handle factory failure" in { @@ -123,7 +123,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name") + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "propagate attributes when nested" in { @@ -135,7 +135,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name") + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } "handle factory failure" in { @@ -195,7 +195,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name") + Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("setup-my-name") } "propagate attributes when nested" in { @@ -207,7 +207,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name") + Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("setup-my-name") } "handle factory failure" in { diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 75e00f98f8..da63816b41 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -7,12 +7,10 @@ package akka.stream import java.net.URLEncoder import java.time.Duration import java.util.Optional - import scala.annotation.tailrec import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.{ classTag, ClassTag } - import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.annotation.InternalApi @@ -21,6 +19,7 @@ import akka.japi.function import akka.stream.impl.TraversalBuilder import akka.util.{ ByteString, OptionVal } import akka.util.JavaDurationConverters._ +import akka.util.LineNumbers /** * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]] @@ -304,6 +303,32 @@ object Attributes { final case class Name(n: String) extends Attribute + /** + * Attribute that contains the source location of for example a lambda passed to an operator, useful for example + * for debugging. Included in the default toString of GraphStageLogic if present + */ + final class SourceLocation(lambda: AnyRef) extends Attribute { + lazy val locationName: String = { + val locationName = LineNumbers(lambda) match { + case LineNumbers.NoSourceInfo => "unknown" + case LineNumbers.UnknownSourceFormat(_) => "unknown" + case LineNumbers.SourceFile(filename) => filename + case LineNumbers.SourceFileLines(filename, from, _) => + s"$filename:$from" + } + s"${lambda.getClass.getPackage.getName}-$locationName" + } + + override def toString: String = locationName + } + + object SourceLocation { + def forLambda(lambda: AnyRef): SourceLocation = new SourceLocation(lambda) + + def stringFrom(attributes: Attributes): String = + attributes.get[SourceLocation].map(_.locationName).getOrElse("unknown") + } + /** * Each asynchronous piece of a materialized stream topology is executed by one Actor * that manages an input buffer for all inlets of its shape. This attribute configures diff --git a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala index fbebecb4aa..1d083a5ab0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala @@ -6,8 +6,8 @@ package akka.stream.impl import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal - import akka.annotation.InternalApi +import akka.stream.Attributes.SourceLocation import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.scaladsl.{ Keep, Source } @@ -28,7 +28,7 @@ import akka.stream.stage._ val out = Outlet[T]("LazySource.out") override val shape = SourceShape(out) - override protected def initialAttributes = DefaultAttributes.lazySource + override protected def initialAttributes = DefaultAttributes.lazySource and SourceLocation.forLambda(sourceFactory) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { val matPromise = Promise[M]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index bcf4c361e4..b8650ed08e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -834,9 +834,8 @@ private final case class SavedIslandData( if (isIn) s"in port id [$missingHandlerIdx]" else s"out port id [$missingHandlerIdx]" } - throw new IllegalStateException( - s"No handler defined in stage [${logic.originalStage.getOrElse(logic).toString}] for $portLabel." + - " All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.") + throw new IllegalStateException(s"No handler defined in stage [${logic.toString}] for $portLabel." + + " All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.") } override def toString: String = "GraphStagePhase" diff --git a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala index aaaae0fbef..b810d975e5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala @@ -7,8 +7,8 @@ package akka.stream.impl import scala.concurrent.Future import scala.concurrent.Promise import scala.util.control.NonFatal - import akka.annotation.InternalApi +import akka.stream.Attributes.SourceLocation import akka.stream._ import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep @@ -26,6 +26,8 @@ import akka.stream.stage.OutHandler private val in = Inlet[T]("SetupSinkStage.in") override val shape = SinkShape(in) + override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { val matPromise = Promise[M]() (createStageLogic(matPromise), matPromise.future) @@ -62,6 +64,8 @@ import akka.stream.stage.OutHandler private val out = Outlet[U]("SetupFlowStage.out") override val shape = FlowShape(in, out) + override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { val matPromise = Promise[M]() (createStageLogic(matPromise), matPromise.future) @@ -105,6 +109,8 @@ import akka.stream.stage.OutHandler private val out = Outlet[T]("SetupSourceStage.out") override val shape = SourceShape(out) + override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { val matPromise = Promise[M]() (createStageLogic(matPromise), matPromise.future) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 18b4e8899f..6e4a5badce 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -5,7 +5,6 @@ package akka.stream.impl import java.util.function.BinaryOperator - import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.collection.mutable @@ -15,10 +14,8 @@ import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.NonFatal - import org.reactivestreams.Publisher import org.reactivestreams.Subscriber - import akka.NotUsed import akka.annotation.DoNotInherit import akka.annotation.InternalApi @@ -27,6 +24,7 @@ import akka.event.Logging import akka.stream._ import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.Attributes.InputBuffer +import akka.stream.Attributes.SourceLocation import akka.stream.impl.QueueSink.Output import akka.stream.impl.QueueSink.Pull import akka.stream.impl.Stages.DefaultAttributes @@ -537,7 +535,7 @@ import akka.util.ccompat._ @InternalApi final private[stream] class LazySink[T, M](sinkFactory: T => Future[Sink[T, M]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { val in = Inlet[T]("lazySink.in") - override def initialAttributes = DefaultAttributes.lazySink + override def initialAttributes = DefaultAttributes.lazySink and SourceLocation.forLambda(sinkFactory) override val shape: SinkShape[T] = SinkShape.of(in) override def toString: String = "LazySink" diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 6d3eadfbb1..b04c90b53e 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -27,6 +27,7 @@ import akka.stream.Attributes._ val filterNot = name("filterNot") val collect = name("collect") val recover = name("recover") + val mapError = name("mapError") val mapAsync = name("mapAsync") val mapAsyncUnordered = name("mapAsyncUnordered") val ask = name("ask") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 91a6731fad..4e997dbc9d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -679,7 +679,7 @@ import akka.util.OptionVal if (!isInitialized) UninitializedInterpreterImpl(logics.zipWithIndex.map { case (logic, idx) => - LogicSnapshotImpl(idx, logic.originalStage.getOrElse(logic).toString, logic.attributes) + LogicSnapshotImpl(idx, logic.toString, logic.attributes) }.toVector) else interpreter.toSnapshot } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index f416a89428..80d9307e53 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -306,7 +306,7 @@ import akka.stream.stage._ logic.preStart() } catch { case NonFatal(e) => - log.error(e, "Error during preStart in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage) + log.error(e, "Error during preStart in [{}]: {}", logic.toString, e.getMessage) logic.failStage(e) } afterStageHasRun(logic) @@ -366,7 +366,7 @@ import akka.stream.stage._ case None => true } if (loggingEnabled) - log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage) + log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage) activeStage.failStage(e) // Abort chasing @@ -600,7 +600,7 @@ import akka.stream.stage._ logic.afterPostStop() } catch { case NonFatal(e) => - log.error(e, s"Error during postStop in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage) + log.error(e, s"Error during postStop in [{}]: {}", logic.toString, e.getMessage) } } @@ -680,8 +680,7 @@ import akka.stream.stage._ val logicSnapshots = logics.zipWithIndex.map { case (logic, idx) => - val label = logic.originalStage.getOrElse(logic).toString - LogicSnapshotImpl(idx, label, logic.attributes) + LogicSnapshotImpl(idx, logic.toString, logic.attributes) } val logicIndexes = logics.zipWithIndex.map { case (stage, idx) => stage -> idx }.toMap val connectionSnapshots = connections.filter(_ != null).map { connection => diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 20e467d8cd..2c40163ba7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -5,12 +5,12 @@ package akka.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS - import akka.actor.{ ActorRef, Terminated } import akka.annotation.{ DoNotInherit, InternalApi } import akka.event.Logging.LogLevel import akka.event._ import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.Attributes.SourceLocation import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.OverflowStrategies._ import akka.stream.impl.Stages.DefaultAttributes @@ -40,7 +40,7 @@ import akka.util.ccompat._ val out = Outlet[Out]("Map.out") override val shape = FlowShape(in, out) - override def initialAttributes: Attributes = DefaultAttributes.map + override def initialAttributes: Attributes = DefaultAttributes.map and SourceLocation.forLambda(f) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -69,7 +69,7 @@ import akka.util.ccompat._ * INTERNAL API */ @InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] { - override def initialAttributes: Attributes = DefaultAttributes.filter + override def initialAttributes: Attributes = DefaultAttributes.filter and SourceLocation.forLambda(p) override def toString: String = "Filter" @@ -121,7 +121,7 @@ import akka.util.ccompat._ */ @InternalApi private[akka] final case class TakeWhile[T](p: T => Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] { - override def initialAttributes: Attributes = DefaultAttributes.takeWhile + override def initialAttributes: Attributes = DefaultAttributes.takeWhile and SourceLocation.forLambda(p) override def toString: String = "TakeWhile" @@ -159,7 +159,7 @@ import akka.util.ccompat._ * INTERNAL API */ @InternalApi private[akka] final case class DropWhile[T](p: T => Boolean) extends SimpleLinearGraphStage[T] { - override def initialAttributes: Attributes = DefaultAttributes.dropWhile + override def initialAttributes: Attributes = DefaultAttributes.dropWhile and SourceLocation.forLambda(p) def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { @@ -231,7 +231,7 @@ private[stream] object Collect { val out = Outlet[Out]("Collect.out") override val shape = FlowShape(in, out) - override def initialAttributes: Attributes = DefaultAttributes.collect + override def initialAttributes: Attributes = DefaultAttributes.collect and SourceLocation.forLambda(pf) def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { @@ -264,7 +264,7 @@ private[stream] object Collect { */ @InternalApi private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] { - override protected def initialAttributes: Attributes = DefaultAttributes.recover + override protected def initialAttributes: Attributes = DefaultAttributes.recover and SourceLocation.forLambda(pf) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -315,6 +315,9 @@ private[stream] object Collect { */ @InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] { + + override protected def initialAttributes: Attributes = DefaultAttributes.mapError + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { override def onPush(): Unit = push(out, grab(in)) @@ -390,7 +393,7 @@ private[stream] object Collect { extends GraphStage[FlowShape[In, Out]] { override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out")) - override def initialAttributes: Attributes = DefaultAttributes.scan + override def initialAttributes: Attributes = DefaultAttributes.scan and SourceLocation.forLambda(f) override def toString: String = "Scan" @@ -457,7 +460,7 @@ private[stream] object Collect { val out = Outlet[Out]("ScanAsync.out") override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out) - override val initialAttributes: Attributes = Attributes.name("scanAsync") + override val initialAttributes: Attributes = Attributes.name("scanAsync") and SourceLocation.forLambda(f) override val toString: String = "ScanAsync" @@ -576,7 +579,7 @@ private[stream] object Collect { override def toString: String = "Fold" - override val initialAttributes = DefaultAttributes.fold + override val initialAttributes = DefaultAttributes.fold and SourceLocation.forLambda(f) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -635,7 +638,7 @@ private[stream] object Collect { override def toString: String = "FoldAsync" - override val initialAttributes = DefaultAttributes.foldAsync + override val initialAttributes = DefaultAttributes.foldAsync and SourceLocation.forLambda(f) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -816,7 +819,7 @@ private[stream] object Collect { */ @InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T => Long) extends SimpleLinearGraphStage[T] { - override def initialAttributes: Attributes = DefaultAttributes.limitWeighted + override def initialAttributes: Attributes = DefaultAttributes.limitWeighted and SourceLocation.forLambda(costFn) def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { @@ -1161,7 +1164,7 @@ private[stream] object Collect { private val in = Inlet[In]("expand.in") private val out = Outlet[Out]("expand.out") - override def initialAttributes = DefaultAttributes.expand + override def initialAttributes = DefaultAttributes.expand and SourceLocation.forLambda(extrapolate) override val shape = FlowShape(in, out) @@ -1253,7 +1256,7 @@ private[stream] object Collect { private val in = Inlet[In]("MapAsync.in") private val out = Outlet[Out]("MapAsync.out") - override def initialAttributes = DefaultAttributes.mapAsync + override def initialAttributes = DefaultAttributes.mapAsync and SourceLocation.forLambda(f) override val shape = FlowShape(in, out) @@ -1353,7 +1356,7 @@ private[stream] object Collect { private val in = Inlet[In]("MapAsyncUnordered.in") private val out = Outlet[Out]("MapAsyncUnordered.out") - override def initialAttributes = DefaultAttributes.mapAsyncUnordered + override def initialAttributes = DefaultAttributes.mapAsyncUnordered and SourceLocation.forLambda(f) override val shape = FlowShape(in, out) @@ -1718,7 +1721,7 @@ private[stream] object Collect { val in = Inlet[T]("in") val out = Outlet[immutable.Seq[T]]("out") - override def initialAttributes = DefaultAttributes.groupedWeightedWithin + override def initialAttributes = DefaultAttributes.groupedWeightedWithin and SourceLocation.forLambda(costFn) val shape = FlowShape(in, out) @@ -2038,7 +2041,7 @@ private[stream] object Collect { * INTERNAL API */ @InternalApi private[akka] final class Reduce[T](val f: (T, T) => T) extends SimpleLinearGraphStage[T] { - override def initialAttributes: Attributes = DefaultAttributes.reduce + override def initialAttributes: Attributes = DefaultAttributes.reduce and SourceLocation.forLambda(f) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self => @@ -2166,7 +2169,7 @@ private[akka] final class StatefulMapConcat[In, Out](val f: () => In => Iterable val out = Outlet[Out]("StatefulMapConcat.out") override val shape = FlowShape(in, out) - override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat + override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat and SourceLocation.forLambda(f) def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 5f781b7cbb..7ed9a4f13d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -6,7 +6,6 @@ package akka.stream.impl.fusing import java.util.Collections import java.util.concurrent.atomic.AtomicReference - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.FiniteDuration @@ -16,6 +15,7 @@ import akka.annotation.InternalApi import akka.stream._ import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.Attributes.SourceLocation import akka.stream.impl.{ Buffer => BufferImpl } import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.ActorSubscriberMessage.OnError @@ -473,8 +473,10 @@ import akka.util.ccompat.JavaConverters._ val p: T => Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { + val in: Inlet[T] = Inlet("Split.in") val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out") + override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) private val propagateSubstreamCancel = substreamCancelStrategy match { @@ -482,6 +484,8 @@ import akka.util.ccompat.JavaConverters._ case SubstreamCancelStrategies.Drain => false } + override protected def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { import Split._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index fbdd7a95bb..9034377b2d 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,14 +10,13 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } - import akka.Done import akka.NotUsed import akka.actor.ActorRef import akka.annotation.DoNotInherit import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } +import akka.stream.Attributes.SourceLocation import akka.stream._ import akka.stream.impl.{ fusing, @@ -663,7 +662,7 @@ object Flow { * this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). */ def lazyFlow[I, O, M](create: () => Flow[I, O, M]): Flow[I, O, Future[M]] = - lazyFutureFlow(() => Future.successful(create())) + lazyFutureFlow(() => Future.successful(create())).addAttributes(Attributes(SourceLocation.forLambda(create))) /** * Defers invoking the `create` function to create a future flow until there downstream demand has caused upstream @@ -704,6 +703,7 @@ object Flow { .mapMaterializedValue(_ => Future.failed[M](new NeverMaterializedException())) f }(Keep.right) + .addAttributes(Attributes(SourceLocation.forLambda(create))) .mapMaterializedValue(_.flatten) } @@ -1204,7 +1204,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def filterNot(p: Out => Boolean): Repr[Out] = - via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot)) + via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot and SourceLocation.forLambda(p))) /** * Terminate processing (and cancel the upstream publisher) after predicate @@ -1776,7 +1776,9 @@ trait FlowOps[+Out, +Mat] { * See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] */ def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S] = - via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) + via( + Batch(1L, ConstantFun.zeroLong, seed, aggregate) + .withAttributes(DefaultAttributes.conflate and SourceLocation.forLambda(aggregate))) /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -1831,7 +1833,9 @@ trait FlowOps[+Out, +Mat] { * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S] = - via(Batch(max, ConstantFun.oneLong, seed, aggregate).withAttributes(DefaultAttributes.batch)) + via( + Batch(max, ConstantFun.oneLong, seed, aggregate) + .withAttributes(DefaultAttributes.batch and SourceLocation.forLambda(aggregate))) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches @@ -1862,7 +1866,9 @@ trait FlowOps[+Out, +Mat] { * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S] = - via(Batch(max, costFn, seed, aggregate).withAttributes(DefaultAttributes.batchWeighted)) + via( + Batch(max, costFn, seed, aggregate).withAttributes( + DefaultAttributes.batchWeighted and SourceLocation.forLambda(aggregate))) /** * Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index ad2b97cd7b..ce0989411a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -5,18 +5,16 @@ package akka.stream.stage import java.util.concurrent.atomic.AtomicReference - import scala.annotation.tailrec import scala.collection.{ immutable, mutable } import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration - import com.github.ghik.silencer.silent - import akka.{ Done, NotUsed } import akka.actor._ import akka.annotation.InternalApi import akka.japi.function.{ Effect, Procedure } +import akka.stream.Attributes.SourceLocation import akka.stream._ import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } import akka.stream.impl.ActorSubscriberMessage @@ -1584,7 +1582,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } override def toString: String = - attributes.get[Attributes.Name].map(attr => s"${getClass.toString}-${attr.n}").getOrElse(getClass.toString) + attributes.get[Attributes.Name] match { + case Some(name) => + attributes.get[SourceLocation] match { + case Some(location) => s"${getClass.getName}-${name.n}(${location.locationName})" + case None => s"${getClass.getName}-${name.n}" + } + + case None => getClass.getName + } }