Improved operator string represenation #29935

* New attribute with source location information introduced and added to stages that takes lambdas
* Better default toString for GraphStageLogic including source location where possible
  and used that for debugging, errors and stream snapshots
This commit is contained in:
Johan Andrén 2021-01-11 15:17:50 +01:00 committed by GitHub
parent b8c79f8695
commit ccc4a2f48b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 112 additions and 65 deletions

View file

@ -282,7 +282,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
})
.runWith(Sink.ignore)
}
ex.getMessage should startWith("No handler defined in stage [stage-name] for in port [in")
ex.getMessage should startWith("No handler defined in stage [").and(include("] for in port [in"))
}
"give a good error message if out handler missing" in {
@ -308,7 +308,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
.map(_ => "whatever")
.runWith(Sink.ignore)
}
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out"))
}
"give a good error message if out handler missing with downstream boundary" in {
@ -332,7 +332,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
})
.runWith(Sink.ignore.async)
}
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out"))
}
"give a good error message if handler missing with downstream publisher" in {
@ -357,7 +357,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
})
.runWith(Sink.ignore)
}
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out"))
}
"give a good error message if handler missing when stage is an island" in {
@ -382,7 +382,7 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
.async
.runWith(Sink.ignore)
}
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
ex.getMessage should startWith("No handler defined in stage [").and(include("] for out port [out"))
}
"give a good error message if sub source is pushed twice" in {

View file

@ -46,7 +46,7 @@ class FromMaterializerSpec extends StreamSpec {
}
.named("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"propagate attributes when nested" in {
@ -58,7 +58,7 @@ class FromMaterializerSpec extends StreamSpec {
}
.named("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"handle factory failure" in {
@ -120,7 +120,7 @@ class FromMaterializerSpec extends StreamSpec {
}
.named("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"propagate attributes when nested" in {
@ -132,7 +132,7 @@ class FromMaterializerSpec extends StreamSpec {
}
.named("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"handle factory failure" in {
@ -192,7 +192,7 @@ class FromMaterializerSpec extends StreamSpec {
}
.named("my-name")
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name")
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("setup-my-name")
}
"propagate attributes when nested" in {
@ -204,7 +204,7 @@ class FromMaterializerSpec extends StreamSpec {
}
.named("my-name")
Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name")
Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("setup-my-name")
}
"handle factory failure" in {

View file

@ -49,7 +49,7 @@ class SetupSpec extends StreamSpec {
}
.named("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"propagate attributes when nested" in {
@ -61,7 +61,7 @@ class SetupSpec extends StreamSpec {
}
.named("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"handle factory failure" in {
@ -123,7 +123,7 @@ class SetupSpec extends StreamSpec {
}
.named("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"propagate attributes when nested" in {
@ -135,7 +135,7 @@ class SetupSpec extends StreamSpec {
}
.named("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
}
"handle factory failure" in {
@ -195,7 +195,7 @@ class SetupSpec extends StreamSpec {
}
.named("my-name")
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name")
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("setup-my-name")
}
"propagate attributes when nested" in {
@ -207,7 +207,7 @@ class SetupSpec extends StreamSpec {
}
.named("my-name")
Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name")
Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("setup-my-name")
}
"handle factory failure" in {

View file

@ -7,12 +7,10 @@ package akka.stream
import java.net.URLEncoder
import java.time.Duration
import java.util.Optional
import scala.annotation.tailrec
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.FiniteDuration
import scala.reflect.{ classTag, ClassTag }
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
@ -21,6 +19,7 @@ import akka.japi.function
import akka.stream.impl.TraversalBuilder
import akka.util.{ ByteString, OptionVal }
import akka.util.JavaDurationConverters._
import akka.util.LineNumbers
/**
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
@ -304,6 +303,32 @@ object Attributes {
final case class Name(n: String) extends Attribute
/**
* Attribute that contains the source location of for example a lambda passed to an operator, useful for example
* for debugging. Included in the default toString of GraphStageLogic if present
*/
final class SourceLocation(lambda: AnyRef) extends Attribute {
lazy val locationName: String = {
val locationName = LineNumbers(lambda) match {
case LineNumbers.NoSourceInfo => "unknown"
case LineNumbers.UnknownSourceFormat(_) => "unknown"
case LineNumbers.SourceFile(filename) => filename
case LineNumbers.SourceFileLines(filename, from, _) =>
s"$filename:$from"
}
s"${lambda.getClass.getPackage.getName}-$locationName"
}
override def toString: String = locationName
}
object SourceLocation {
def forLambda(lambda: AnyRef): SourceLocation = new SourceLocation(lambda)
def stringFrom(attributes: Attributes): String =
attributes.get[SourceLocation].map(_.locationName).getOrElse("unknown")
}
/**
* Each asynchronous piece of a materialized stream topology is executed by one Actor
* that manages an input buffer for all inlets of its shape. This attribute configures

View file

@ -6,8 +6,8 @@ package akka.stream.impl
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import akka.annotation.InternalApi
import akka.stream.Attributes.SourceLocation
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.scaladsl.{ Keep, Source }
@ -28,7 +28,7 @@ import akka.stream.stage._
val out = Outlet[T]("LazySource.out")
override val shape = SourceShape(out)
override protected def initialAttributes = DefaultAttributes.lazySource
override protected def initialAttributes = DefaultAttributes.lazySource and SourceLocation.forLambda(sourceFactory)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()

View file

@ -834,9 +834,8 @@ private final case class SavedIslandData(
if (isIn) s"in port id [$missingHandlerIdx]"
else s"out port id [$missingHandlerIdx]"
}
throw new IllegalStateException(
s"No handler defined in stage [${logic.originalStage.getOrElse(logic).toString}] for $portLabel." +
" All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.")
throw new IllegalStateException(s"No handler defined in stage [${logic.toString}] for $portLabel." +
" All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.")
}
override def toString: String = "GraphStagePhase"

View file

@ -7,8 +7,8 @@ package akka.stream.impl
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.control.NonFatal
import akka.annotation.InternalApi
import akka.stream.Attributes.SourceLocation
import akka.stream._
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
@ -26,6 +26,8 @@ import akka.stream.stage.OutHandler
private val in = Inlet[T]("SetupSinkStage.in")
override val shape = SinkShape(in)
override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()
(createStageLogic(matPromise), matPromise.future)
@ -62,6 +64,8 @@ import akka.stream.stage.OutHandler
private val out = Outlet[U]("SetupFlowStage.out")
override val shape = FlowShape(in, out)
override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()
(createStageLogic(matPromise), matPromise.future)
@ -105,6 +109,8 @@ import akka.stream.stage.OutHandler
private val out = Outlet[T]("SetupSourceStage.out")
override val shape = SourceShape(out)
override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()
(createStageLogic(matPromise), matPromise.future)

View file

@ -5,7 +5,6 @@
package akka.stream.impl
import java.util.function.BinaryOperator
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
@ -15,10 +14,8 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import akka.NotUsed
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
@ -27,6 +24,7 @@ import akka.event.Logging
import akka.stream._
import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.Attributes.InputBuffer
import akka.stream.Attributes.SourceLocation
import akka.stream.impl.QueueSink.Output
import akka.stream.impl.QueueSink.Pull
import akka.stream.impl.Stages.DefaultAttributes
@ -537,7 +535,7 @@ import akka.util.ccompat._
@InternalApi final private[stream] class LazySink[T, M](sinkFactory: T => Future[Sink[T, M]])
extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
val in = Inlet[T]("lazySink.in")
override def initialAttributes = DefaultAttributes.lazySink
override def initialAttributes = DefaultAttributes.lazySink and SourceLocation.forLambda(sinkFactory)
override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "LazySink"

View file

@ -27,6 +27,7 @@ import akka.stream.Attributes._
val filterNot = name("filterNot")
val collect = name("collect")
val recover = name("recover")
val mapError = name("mapError")
val mapAsync = name("mapAsync")
val mapAsyncUnordered = name("mapAsyncUnordered")
val ask = name("ask")

View file

@ -679,7 +679,7 @@ import akka.util.OptionVal
if (!isInitialized)
UninitializedInterpreterImpl(logics.zipWithIndex.map {
case (logic, idx) =>
LogicSnapshotImpl(idx, logic.originalStage.getOrElse(logic).toString, logic.attributes)
LogicSnapshotImpl(idx, logic.toString, logic.attributes)
}.toVector)
else interpreter.toSnapshot
}

View file

@ -306,7 +306,7 @@ import akka.stream.stage._
logic.preStart()
} catch {
case NonFatal(e) =>
log.error(e, "Error during preStart in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage)
log.error(e, "Error during preStart in [{}]: {}", logic.toString, e.getMessage)
logic.failStage(e)
}
afterStageHasRun(logic)
@ -366,7 +366,7 @@ import akka.stream.stage._
case None => true
}
if (loggingEnabled)
log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage)
activeStage.failStage(e)
// Abort chasing
@ -600,7 +600,7 @@ import akka.stream.stage._
logic.afterPostStop()
} catch {
case NonFatal(e) =>
log.error(e, s"Error during postStop in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage)
log.error(e, s"Error during postStop in [{}]: {}", logic.toString, e.getMessage)
}
}
@ -680,8 +680,7 @@ import akka.stream.stage._
val logicSnapshots = logics.zipWithIndex.map {
case (logic, idx) =>
val label = logic.originalStage.getOrElse(logic).toString
LogicSnapshotImpl(idx, label, logic.attributes)
LogicSnapshotImpl(idx, logic.toString, logic.attributes)
}
val logicIndexes = logics.zipWithIndex.map { case (stage, idx) => stage -> idx }.toMap
val connectionSnapshots = connections.filter(_ != null).map { connection =>

View file

@ -5,12 +5,12 @@
package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging.LogLevel
import akka.event._
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes.SourceLocation
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.Stages.DefaultAttributes
@ -40,7 +40,7 @@ import akka.util.ccompat._
val out = Outlet[Out]("Map.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.map
override def initialAttributes: Attributes = DefaultAttributes.map and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
@ -69,7 +69,7 @@ import akka.util.ccompat._
* INTERNAL API
*/
@InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def initialAttributes: Attributes = DefaultAttributes.filter and SourceLocation.forLambda(p)
override def toString: String = "Filter"
@ -121,7 +121,7 @@ import akka.util.ccompat._
*/
@InternalApi private[akka] final case class TakeWhile[T](p: T => Boolean, inclusive: Boolean = false)
extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
override def initialAttributes: Attributes = DefaultAttributes.takeWhile and SourceLocation.forLambda(p)
override def toString: String = "TakeWhile"
@ -159,7 +159,7 @@ import akka.util.ccompat._
* INTERNAL API
*/
@InternalApi private[akka] final case class DropWhile[T](p: T => Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.dropWhile
override def initialAttributes: Attributes = DefaultAttributes.dropWhile and SourceLocation.forLambda(p)
def createLogic(inheritedAttributes: Attributes) =
new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
@ -231,7 +231,7 @@ private[stream] object Collect {
val out = Outlet[Out]("Collect.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.collect
override def initialAttributes: Attributes = DefaultAttributes.collect and SourceLocation.forLambda(pf)
def createLogic(inheritedAttributes: Attributes) =
new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
@ -264,7 +264,7 @@ private[stream] object Collect {
*/
@InternalApi private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T])
extends SimpleLinearGraphStage[T] {
override protected def initialAttributes: Attributes = DefaultAttributes.recover
override protected def initialAttributes: Attributes = DefaultAttributes.recover and SourceLocation.forLambda(pf)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
@ -315,6 +315,9 @@ private[stream] object Collect {
*/
@InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable])
extends SimpleLinearGraphStage[T] {
override protected def initialAttributes: Attributes = DefaultAttributes.mapError
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
@ -390,7 +393,7 @@ private[stream] object Collect {
extends GraphStage[FlowShape[In, Out]] {
override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
override def initialAttributes: Attributes = DefaultAttributes.scan
override def initialAttributes: Attributes = DefaultAttributes.scan and SourceLocation.forLambda(f)
override def toString: String = "Scan"
@ -457,7 +460,7 @@ private[stream] object Collect {
val out = Outlet[Out]("ScanAsync.out")
override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out)
override val initialAttributes: Attributes = Attributes.name("scanAsync")
override val initialAttributes: Attributes = Attributes.name("scanAsync") and SourceLocation.forLambda(f)
override val toString: String = "ScanAsync"
@ -576,7 +579,7 @@ private[stream] object Collect {
override def toString: String = "Fold"
override val initialAttributes = DefaultAttributes.fold
override val initialAttributes = DefaultAttributes.fold and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
@ -635,7 +638,7 @@ private[stream] object Collect {
override def toString: String = "FoldAsync"
override val initialAttributes = DefaultAttributes.foldAsync
override val initialAttributes = DefaultAttributes.foldAsync and SourceLocation.forLambda(f)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
@ -816,7 +819,7 @@ private[stream] object Collect {
*/
@InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T => Long)
extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.limitWeighted
override def initialAttributes: Attributes = DefaultAttributes.limitWeighted and SourceLocation.forLambda(costFn)
def createLogic(inheritedAttributes: Attributes) =
new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
@ -1161,7 +1164,7 @@ private[stream] object Collect {
private val in = Inlet[In]("expand.in")
private val out = Outlet[Out]("expand.out")
override def initialAttributes = DefaultAttributes.expand
override def initialAttributes = DefaultAttributes.expand and SourceLocation.forLambda(extrapolate)
override val shape = FlowShape(in, out)
@ -1253,7 +1256,7 @@ private[stream] object Collect {
private val in = Inlet[In]("MapAsync.in")
private val out = Outlet[Out]("MapAsync.out")
override def initialAttributes = DefaultAttributes.mapAsync
override def initialAttributes = DefaultAttributes.mapAsync and SourceLocation.forLambda(f)
override val shape = FlowShape(in, out)
@ -1353,7 +1356,7 @@ private[stream] object Collect {
private val in = Inlet[In]("MapAsyncUnordered.in")
private val out = Outlet[Out]("MapAsyncUnordered.out")
override def initialAttributes = DefaultAttributes.mapAsyncUnordered
override def initialAttributes = DefaultAttributes.mapAsyncUnordered and SourceLocation.forLambda(f)
override val shape = FlowShape(in, out)
@ -1718,7 +1721,7 @@ private[stream] object Collect {
val in = Inlet[T]("in")
val out = Outlet[immutable.Seq[T]]("out")
override def initialAttributes = DefaultAttributes.groupedWeightedWithin
override def initialAttributes = DefaultAttributes.groupedWeightedWithin and SourceLocation.forLambda(costFn)
val shape = FlowShape(in, out)
@ -2038,7 +2041,7 @@ private[stream] object Collect {
* INTERNAL API
*/
@InternalApi private[akka] final class Reduce[T](val f: (T, T) => T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def initialAttributes: Attributes = DefaultAttributes.reduce and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler { self =>
@ -2166,7 +2169,7 @@ private[akka] final class StatefulMapConcat[In, Out](val f: () => In => Iterable
val out = Outlet[Out]("StatefulMapConcat.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat
override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat and SourceLocation.forLambda(f)
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

View file

@ -6,7 +6,6 @@ package akka.stream.impl.fusing
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
@ -16,6 +15,7 @@ import akka.annotation.InternalApi
import akka.stream._
import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes.SourceLocation
import akka.stream.impl.{ Buffer => BufferImpl }
import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.ActorSubscriberMessage.OnError
@ -473,8 +473,10 @@ import akka.util.ccompat.JavaConverters._
val p: T => Boolean,
val substreamCancelStrategy: SubstreamCancelStrategy)
extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("Split.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
private val propagateSubstreamCancel = substreamCancelStrategy match {
@ -482,6 +484,8 @@ import akka.util.ccompat.JavaConverters._
case SubstreamCancelStrategies.Drain => false
}
override protected def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
import Split._

View file

@ -10,14 +10,13 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
import akka.annotation.DoNotInherit
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream.Attributes.SourceLocation
import akka.stream._
import akka.stream.impl.{
fusing,
@ -663,7 +662,7 @@ object Flow {
* this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
*/
def lazyFlow[I, O, M](create: () => Flow[I, O, M]): Flow[I, O, Future[M]] =
lazyFutureFlow(() => Future.successful(create()))
lazyFutureFlow(() => Future.successful(create())).addAttributes(Attributes(SourceLocation.forLambda(create)))
/**
* Defers invoking the `create` function to create a future flow until there downstream demand has caused upstream
@ -704,6 +703,7 @@ object Flow {
.mapMaterializedValue(_ => Future.failed[M](new NeverMaterializedException()))
f
}(Keep.right)
.addAttributes(Attributes(SourceLocation.forLambda(create)))
.mapMaterializedValue(_.flatten)
}
@ -1204,7 +1204,7 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels
*/
def filterNot(p: Out => Boolean): Repr[Out] =
via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot))
via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot and SourceLocation.forLambda(p)))
/**
* Terminate processing (and cancel the upstream publisher) after predicate
@ -1776,7 +1776,9 @@ trait FlowOps[+Out, +Mat] {
* See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]]
*/
def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S] =
via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate))
via(
Batch(1L, ConstantFun.zeroLong, seed, aggregate)
.withAttributes(DefaultAttributes.conflate and SourceLocation.forLambda(aggregate)))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -1831,7 +1833,9 @@ trait FlowOps[+Out, +Mat] {
* @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate
*/
def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S] =
via(Batch(max, ConstantFun.oneLong, seed, aggregate).withAttributes(DefaultAttributes.batch))
via(
Batch(max, ConstantFun.oneLong, seed, aggregate)
.withAttributes(DefaultAttributes.batch and SourceLocation.forLambda(aggregate)))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
@ -1862,7 +1866,9 @@ trait FlowOps[+Out, +Mat] {
* @param aggregate Takes the currently batched value and the current pending element to produce a new batch
*/
def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S] =
via(Batch(max, costFn, seed, aggregate).withAttributes(DefaultAttributes.batchWeighted))
via(
Batch(max, costFn, seed, aggregate).withAttributes(
DefaultAttributes.batchWeighted and SourceLocation.forLambda(aggregate)))
/**
* Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older

View file

@ -5,18 +5,16 @@
package akka.stream.stage
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.{ immutable, mutable }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import com.github.ghik.silencer.silent
import akka.{ Done, NotUsed }
import akka.actor._
import akka.annotation.InternalApi
import akka.japi.function.{ Effect, Procedure }
import akka.stream.Attributes.SourceLocation
import akka.stream._
import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder }
import akka.stream.impl.ActorSubscriberMessage
@ -1584,7 +1582,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
override def toString: String =
attributes.get[Attributes.Name].map(attr => s"${getClass.toString}-${attr.n}").getOrElse(getClass.toString)
attributes.get[Attributes.Name] match {
case Some(name) =>
attributes.get[SourceLocation] match {
case Some(location) => s"${getClass.getName}-${name.n}(${location.locationName})"
case None => s"${getClass.getName}-${name.n}"
}
case None => getClass.getName
}
}