diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 0a20fc95d9..0379ea38d9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -8,7 +8,6 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } import akka.NotUsed import akka.http.scaladsl.model.RequestEntity import akka.stream._ -import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule } import akka.stream.scaladsl._ @@ -187,7 +186,7 @@ private[http] object StreamUtils { override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new OneTimePublisherSink[In](attributes, shape, cell) - override def withAttributes(attr: Attributes): Module = + override def withAttributes(attr: Attributes): OneTimePublisherSink[In] = new OneTimePublisherSink[In](attr, amendShape(attr), cell) } /** A copy of SubscriberSource that allows access to the subscriber through the cell but can only materialized once */ @@ -212,7 +211,7 @@ private[http] object StreamUtils { override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new OneTimeSubscriberSource[Out](attributes, shape, cell) - override def withAttributes(attr: Attributes): Module = + override def withAttributes(attr: Attributes): OneTimeSubscriberSource[Out] = new OneTimeSubscriberSource[Out](attr, amendShape(attr), cell) } diff --git a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java index 3143313c88..250dc9d391 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java @@ -4,7 +4,6 @@ package akka.http.javadsl.model; -import akka.http.impl.util.Util; import akka.http.javadsl.model.headers.*; import akka.japi.Pair; diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index d174b2e145..71c766438e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -11,12 +11,10 @@ import akka.stream._ class StreamLayoutSpec extends AkkaSpec { import StreamLayout._ - def testAtomic(inPortCount: Int, outPortCount: Int): Module = new Module { + def testAtomic(inPortCount: Int, outPortCount: Int): Module = new AtomicModule { override val shape = AmorphousShape(List.fill(inPortCount)(Inlet("")), List.fill(outPortCount)(Outlet(""))) override def replaceShape(s: Shape): Module = ??? - override def subModules: Set[Module] = Set.empty - override def carbonCopy: Module = ??? override def attributes: Attributes = Attributes.none @@ -174,7 +172,7 @@ class StreamLayoutSpec extends AkkaSpec { var publishers = Vector.empty[TestPublisher] var subscribers = Vector.empty[TestSubscriber] - override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, + override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: java.util.Map[Module, Any]): Unit = { for (inPort ← atomic.inPorts) { val subscriber = TestSubscriber(atomic, inPort) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index 3fd0327754..ad0e503ee3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -12,109 +12,162 @@ import akka.testkit.AkkaSpec class GraphMatValueSpec extends AkkaSpec { - val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialSize = 2, maxSize = 16) - - implicit val materializer = ActorMaterializer(settings) - import GraphDSL.Implicits._ - "A Graph with materialized value" must { + val foldSink = Sink.fold[Int, Int](0)(_ + _) - val foldSink = Sink.fold[Int, Int](0)(_ + _) + "A Graph with materialized value" when { - "expose the materialized value as source" in { - val sub = TestSubscriber.manualProbe[Int]() - val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b ⇒ - fold ⇒ - Source(1 to 10) ~> fold - b.materializedValue.mapAsync(4)(identity) ~> Sink.fromSubscriber(sub) - ClosedShape - }).run() + for (autoFusing ← Seq(true, false)) { + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withAutoFusing(autoFusing) + implicit val materializer = ActorMaterializer(settings) - val r1 = Await.result(f, 3.seconds) - sub.expectSubscription().request(1) - val r2 = sub.expectNext() + s"using autoFusing=$autoFusing" must { - r1 should ===(r2) - } + "expose the materialized value as source" in { + val sub = TestSubscriber.manualProbe[Int]() + val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b ⇒ + fold ⇒ + Source(1 to 10) ~> fold + b.materializedValue.mapAsync(4)(identity) ~> Sink.fromSubscriber(sub) + ClosedShape + }).run() - "expose the materialized value as source multiple times" in { - val sub = TestSubscriber.manualProbe[Int]() + val r1 = Await.result(f, 3.seconds) + sub.expectSubscription().request(1) + val r2 = sub.expectNext() - val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b ⇒ - fold ⇒ - val zip = b.add(ZipWith[Int, Int, Int](_ + _)) - Source(1 to 10) ~> fold - b.materializedValue.mapAsync(4)(identity) ~> zip.in0 - b.materializedValue.mapAsync(4)(identity) ~> zip.in1 + r1 should ===(r2) + } - zip.out ~> Sink.fromSubscriber(sub) - ClosedShape - }).run() + "expose the materialized value as source multiple times" in { + val sub = TestSubscriber.manualProbe[Int]() - val r1 = Await.result(f, 3.seconds) - sub.expectSubscription().request(1) - val r2 = sub.expectNext() + val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b ⇒ + fold ⇒ + val zip = b.add(ZipWith[Int, Int, Int](_ + _)) + Source(1 to 10) ~> fold + b.materializedValue.mapAsync(4)(identity) ~> zip.in0 + b.materializedValue.mapAsync(4)(identity) ~> zip.in1 - r1 should ===(r2 / 2) - } + zip.out ~> Sink.fromSubscriber(sub) + ClosedShape + }).run() - // Exposes the materialized value as a stream value - val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source.fromGraph(GraphDSL.create(foldSink) { implicit b ⇒ - fold ⇒ - Source(1 to 10) ~> fold - SourceShape(b.materializedValue) - }) + val r1 = Await.result(f, 3.seconds) + sub.expectSubscription().request(1) + val r2 = sub.expectNext() - "allow exposing the materialized value as port" in { - val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() - Await.result(f1, 3.seconds) should ===(55) - Await.result(f2, 3.seconds) should ===(155) - } + r1 should ===(r2 / 2) + } - "allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { - val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ⇒ ()) - Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155) - } - - "work properly with nesting and reusing" in { - val compositeSource1 = Source.fromGraph(GraphDSL.create(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b ⇒ - (s1, s2) ⇒ - val zip = b.add(ZipWith[Int, Int, Int](_ + _)) - - s1.out.mapAsync(4)(identity) ~> zip.in0 - s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1 - SourceShape(zip.out) - }) - - val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b ⇒ - (s1, s2) ⇒ - val zip = b.add(ZipWith[Int, Int, Int](_ + _)) - s1.out ~> zip.in0 - s2.out.map(_ * 10000) ~> zip.in1 - SourceShape(zip.out) - }) - - val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run() - - Await.result(result, 3.seconds) should ===(55555555) - Await.result(f1, 3.seconds) should ===(55) - Await.result(f2, 3.seconds) should ===(55) - Await.result(f3, 3.seconds) should ===(55) - Await.result(f4, 3.seconds) should ===(55) - - } - - "work also when the source’s module is copied" in { - val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { - implicit builder ⇒ + // Exposes the materialized value as a stream value + val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source.fromGraph(GraphDSL.create(foldSink) { implicit b ⇒ fold ⇒ - FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet) - }) + Source(1 to 10) ~> fold + SourceShape(b.materializedValue) + }) - Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55) + "allow exposing the materialized value as port" in { + val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() + Await.result(f1, 3.seconds) should ===(55) + Await.result(f2, 3.seconds) should ===(155) + } + + "allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { + val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ⇒ ()) + Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155) + } + + "work properly with nesting and reusing" in { + val compositeSource1 = Source.fromGraph(GraphDSL.create(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b ⇒ + (s1, s2) ⇒ + val zip = b.add(ZipWith[Int, Int, Int](_ + _)) + + s1.out.mapAsync(4)(identity) ~> zip.in0 + s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1 + SourceShape(zip.out) + }) + + val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b ⇒ + (s1, s2) ⇒ + val zip = b.add(ZipWith[Int, Int, Int](_ + _)) + s1.out ~> zip.in0 + s2.out.map(_ * 10000) ~> zip.in1 + SourceShape(zip.out) + }) + + val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run() + + Await.result(result, 3.seconds) should ===(55555555) + Await.result(f1, 3.seconds) should ===(55) + Await.result(f2, 3.seconds) should ===(55) + Await.result(f3, 3.seconds) should ===(55) + Await.result(f4, 3.seconds) should ===(55) + + } + + "work also when the source’s module is copied" in { + val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(foldSink) { + implicit builder ⇒ + fold ⇒ + FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet) + }) + + Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55) + } + + "work also when the source’s module is copied and the graph is extended before using the matValSrc" in { + val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(foldSink) { + implicit builder ⇒ + fold ⇒ + val map = builder.add(Flow[Future[Int]].mapAsync(4)(identity)) + builder.materializedValue ~> map + FlowShape(fold.in, map.outlet) + }) + + Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55) + } + + "perform side-effecting transformations even when not used as source" in { + var done = false + val g = GraphDSL.create() { implicit b ⇒ + import GraphDSL.Implicits._ + Source.empty.mapMaterializedValue(_ ⇒ done = true) ~> Sink.ignore + ClosedShape + } + val r = RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) { implicit b ⇒ + (s) ⇒ + b.add(g) + Source(1 to 10) ~> s + ClosedShape + }) + r.run().futureValue should ===(akka.Done) + done should ===(true) + } + + "produce NotUsed when not importing materialized value" in { + val source = Source.fromGraph(GraphDSL.create() { implicit b ⇒ + SourceShape(b.materializedValue) + }) + source.runWith(Sink.seq).futureValue should ===(List(akka.NotUsed)) + } + + "produce NotUsed when starting from Flow.via" in { + Source.empty.viaMat(Flow[Int].map(_ * 2))(Keep.right).to(Sink.ignore).run() should ===(akka.NotUsed) + } + + "produce NotUsed when starting from Flow.via with transformation" in { + var done = false + Source.empty.viaMat( + Flow[Int].via(Flow[Int].mapMaterializedValue(_ ⇒ done = true)))(Keep.right) + .to(Sink.ignore).run() should ===(akka.NotUsed) + done should ===(true) + } + + } } - } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index d72a72c1cf..5282c5ca83 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -61,6 +61,8 @@ trait GraphApply { private[stream] object GraphApply { final class GraphImpl[S <: Shape, Mat](override val shape: S, private[stream] override val module: StreamLayout.Module) extends Graph[S, Mat] { + + override def toString: String = s"Graph($shape, $module)" override def withAttributes(attr: Attributes): Graph[S, Mat] = new GraphImpl(shape, module.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/Fusing.scala b/akka-stream/src/main/scala/akka/stream/Fusing.scala index d90abac97f..6ff1277fd5 100644 --- a/akka-stream/src/main/scala/akka/stream/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/Fusing.scala @@ -31,11 +31,7 @@ object Fusing { * implementations based on [[akka.stream.stage.GraphStage]]) and not forbidden * via [[akka.stream.Attributes#AsyncBoundary]]. */ - def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = - g match { - case fg: FusedGraph[_, _] ⇒ fg - case _ ⇒ Impl.aggressive(g) - } + def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = Impl.aggressive(g) /** * A fused graph of the right shape, containing a [[FusedModule]] which @@ -48,6 +44,14 @@ object Fusing { override def withAttributes(attr: Attributes) = copy(module = module.withAttributes(attr)) } + object FusedGraph { + def unapply[S <: Shape, M](g: Graph[S, M]): Option[(FusedModule, S)] = + g.module match { + case f: FusedModule => Some((f, g.shape)) + case _ => None + } + } + /** * When fusing a [[Graph]] a part of the internal stage wirings are hidden within * [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 5e9259cfd9..1afe64802f 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -215,6 +215,8 @@ object ClosedShape extends ClosedShape { * Java API: obtain ClosedShape instance */ def getInstance: ClosedShape = this + + override def toString: String = "ClosedShape" } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 82f20a16ff..e4f3bcdd0a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -11,7 +11,7 @@ import akka.event.Logging import akka.dispatch.Dispatchers import akka.pattern.ask import akka.stream._ -import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.StreamLayout.{ Module, AtomicModule } import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } import akka.stream.impl.io.TLSActor import akka.stream.impl.io.TlsModule @@ -97,7 +97,8 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, name } - override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { + override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { + if (MaterializerSession.Debug) println(s"materializing $atomic") def newMaterializationContext() = new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala index 4789f9e987..0188d83898 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala @@ -5,11 +5,12 @@ package akka.stream.impl import akka.stream._ import akka.stream.impl.StreamLayout.Module +import akka.event.Logging /** * INTERNAL API */ -private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module { +private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.AtomicModule { override def replaceShape(s: Shape) = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") else this @@ -18,6 +19,6 @@ private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module { val outPort = Outlet[Out]("Flow.out") override val shape = new FlowShape(inPort, outPort) - override def subModules: Set[Module] = Set.empty + protected def label: String = Logging.simpleName(this) + final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]" } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index c3b1e232df..ae5771f15d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -6,29 +6,30 @@ package akka.stream.impl import akka.NotUsed import akka.actor._ import akka.stream._ -import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.StreamLayout.AtomicModule import org.reactivestreams._ - import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Promise +import akka.event.Logging /** * INTERNAL API */ -private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module { +private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule { + + protected def label: String = Logging.simpleName(this) + final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]" def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat) - override def replaceShape(s: Shape): Module = + override def replaceShape(s: Shape): AtomicModule = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that") else this // This is okay since the only caller of this method is right below. protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat] - override def carbonCopy: Module = newInstance(SourceShape(shape.out.carbonCopy())) - - override def subModules: Set[Module] = Set.empty + override def carbonCopy: AtomicModule = newInstance(SourceShape(shape.out.carbonCopy())) protected def amendShape(attr: Attributes): SourceShape[Out] = { val thisN = attributes.nameOrDefault(null) @@ -52,7 +53,7 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape) - override def withAttributes(attr: Attributes): Module = new SubscriberSource[Out](attr, amendShape(attr)) + override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSource[Out](attr, amendShape(attr)) } /** @@ -63,22 +64,26 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap * back-pressure upstream. */ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) { + + override protected def label: String = s"PublisherSource($p)" + override def create(context: MaterializationContext) = (p, NotUsed) override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attributes, shape) - override def withAttributes(attr: Attributes): Module = new PublisherSource[Out](p, attr, amendShape(attr)) + override def withAttributes(attr: Attributes): AtomicModule = new PublisherSource[Out](p, attr, amendShape(attr)) } /** * INTERNAL API */ private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { + override def create(context: MaterializationContext) = { val p = Promise[Option[Out]]() new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) → p } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape) - override def withAttributes(attr: Attributes): Module = new MaybeSource(attr, amendShape(attr)) + override def withAttributes(attr: Attributes): AtomicModule = new MaybeSource(attr, amendShape(attr)) } /** @@ -95,7 +100,7 @@ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new ActorPublisherSource[Out](props, attributes, shape) - override def withAttributes(attr: Attributes): Module = new ActorPublisherSource(props, attr, amendShape(attr)) + override def withAttributes(attr: Attributes): AtomicModule = new ActorPublisherSource(props, attr, amendShape(attr)) } /** @@ -105,6 +110,8 @@ private[akka] final class ActorRefSource[Out]( bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { + override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)" + override def create(context: MaterializationContext) = { val mat = ActorMaterializer.downcast(context.materializer) val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) @@ -113,6 +120,6 @@ private[akka] final class ActorRefSource[Out]( override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape) - override def withAttributes(attr: Attributes): Module = + override def withAttributes(attr: Attributes): AtomicModule = new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b1e0b5a32e..ad8780e16a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -8,7 +8,7 @@ import akka.actor.{ ActorRef, Props } import akka.stream.Attributes.InputBuffer import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance @@ -20,11 +20,12 @@ import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import java.util.Optional +import akka.event.Logging /** * INTERNAL API */ -private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { +private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule { /** * Create the Subscriber or VirtualPublisher that consumes the incoming @@ -35,16 +36,14 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte */ def create(context: MaterializationContext): (AnyRef, Mat) - override def replaceShape(s: Shape): Module = + override def replaceShape(s: Shape): AtomicModule = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that") else this // This is okay since we the only caller of this method is right below. protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat] - override def carbonCopy: Module = newInstance(SinkShape(shape.in.carbonCopy())) - - override def subModules: Set[Module] = Set.empty + override def carbonCopy: AtomicModule = newInstance(SinkShape(shape.in.carbonCopy())) protected def amendShape(attr: Attributes): SinkShape[In] = { val thisN = attributes.nameOrDefault(null) @@ -53,6 +52,10 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte if ((thatN eq null) || thisN == thatN) shape else shape.copy(in = Inlet(thatN + ".in")) } + + protected def label: String = Logging.simpleName(this) + final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]" + } /** @@ -64,8 +67,6 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte */ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { - override def toString: String = "PublisherSink" - /* * This method is the reason why SinkModule.create may return something that is * not a Subscriber: a VirtualPublisher is used in order to avoid the immediate @@ -77,7 +78,7 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape) - override def withAttributes(attr: Attributes): Module = new PublisherSink[In](attr, amendShape(attr)) + override def withAttributes(attr: Attributes): AtomicModule = new PublisherSink[In](attr, amendShape(attr)) } /** @@ -100,7 +101,7 @@ private[akka] final class FanoutPublisherSink[In]( override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new FanoutPublisherSink[In](attributes, shape) - override def withAttributes(attr: Attributes): Module = + override def withAttributes(attr: Attributes): AtomicModule = new FanoutPublisherSink[In](attr, amendShape(attr)) } @@ -118,8 +119,7 @@ private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkSh } override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape) - override def withAttributes(attr: Attributes): Module = new SinkholeSink(attr, amendShape(attr)) - override def toString: String = "SinkholeSink" + override def withAttributes(attr: Attributes): AtomicModule = new SinkholeSink(attr, amendShape(attr)) } /** @@ -131,8 +131,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att override def create(context: MaterializationContext) = (subscriber, NotUsed) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape) - override def withAttributes(attr: Attributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr)) - override def toString: String = "SubscriberSink" + override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSink[In](subscriber, attr, amendShape(attr)) } /** @@ -142,8 +141,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) { override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape) - override def withAttributes(attr: Attributes): Module = new CancelSink(attr, amendShape(attr)) - override def toString: String = "CancelSink" + override def withAttributes(attr: Attributes): AtomicModule = new CancelSink(attr, amendShape(attr)) } /** @@ -159,8 +157,7 @@ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape) - override def withAttributes(attr: Attributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr)) - override def toString: String = "ActorSubscriberSink" + override def withAttributes(attr: Attributes): AtomicModule = new ActorSubscriberSink[In](props, attr, amendShape(attr)) } /** @@ -180,9 +177,8 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) - override def withAttributes(attr: Attributes): Module = + override def withAttributes(attr: Attributes): AtomicModule = new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) - override def toString: String = "ActorRefSink" } private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { @@ -257,6 +253,8 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { val in = Inlet[T]("seq.in") + override def toString: String = "SeqStage" + override val shape: SinkShape[T] = SinkShape.of(in) override protected def initialAttributes: Attributes = DefaultAttributes.seqSink @@ -302,6 +300,8 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal override def initialAttributes = DefaultAttributes.queueSink override val shape: SinkShape[T] = SinkShape.of(in) + override def toString: String = "QueueSink" + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] { type Received[E] = Try[Option[E]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index ac9e344abd..34043f90c1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -208,6 +208,7 @@ private[stream] object Stages { final case class GroupBy(maxSubstreams: Int, f: Any ⇒ Any, attributes: Attributes = groupBy) extends StageModule { override def withAttributes(attributes: Attributes) = copy(attributes = attributes) + override protected def label: String = s"GroupBy($maxSubstreams)" } final case class DirectProcessor(p: () ⇒ (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 4510fcc105..8fb58a9b4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -19,6 +19,7 @@ import scala.collection.JavaConverters._ import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.impl.fusing.GraphModule +import akka.event.Logging /** * INTERNAL API @@ -104,9 +105,21 @@ object StreamLayout { if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems") } - // TODO: Materialization order - // TODO: Special case linear composites - // TODO: Cycles + object IgnorableMatValComp { + def apply(comp: MaterializedValueNode): Boolean = + comp match { + case Atomic(module) ⇒ IgnorableMatValComp(module) + case _: Combine | _: Transform ⇒ false + case Ignore ⇒ true + } + def apply(module: Module): Boolean = + module match { + case _: AtomicModule | EmptyModule ⇒ true + case CopiedModule(_, _, module) ⇒ IgnorableMatValComp(module) + case CompositeModule(_, _, _, _, comp, _) ⇒ IgnorableMatValComp(comp) + case FusedModule(_, _, _, _, comp, _, _) ⇒ IgnorableMatValComp(comp) + } + } sealed trait MaterializedValueNode { /* @@ -121,14 +134,14 @@ object StreamLayout { override def toString: String = s"Combine($dep1,$dep2)" } case class Atomic(module: Module) extends MaterializedValueNode { - override def toString: String = s"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)}[${module.hashCode}])" + override def toString: String = f"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)}[${System.identityHashCode(module)}%08x])" } case class Transform(f: Any ⇒ Any, dep: MaterializedValueNode) extends MaterializedValueNode { override def toString: String = s"Transform($dep)" } case object Ignore extends MaterializedValueNode - trait Module { + sealed trait Module { def shape: Shape /** @@ -241,19 +254,30 @@ object StreamLayout { require(that ne this, "A module cannot be added to itself. You should pass a separate instance to compose().") require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.") - val modules1 = if (this.isSealed) Set(this) else this.subModules - val modules2 = if (that.isSealed) Set(that) else that.subModules + val modulesLeft = if (this.isSealed) Set(this) else this.subModules + val modulesRight = if (that.isSealed) Set(that) else that.subModules - val matComputation1 = if (this.isSealed) Atomic(this) else this.materializedValueComputation - val matComputation2 = if (that.isSealed) Atomic(that) else that.materializedValueComputation + val matCompLeft = if (this.isSealed) Atomic(this) else this.materializedValueComputation + val matCompRight = if (that.isSealed) Atomic(that) else that.materializedValueComputation + + val mat = + { + val comp = + if (f == scaladsl.Keep.left) { + if (IgnorableMatValComp(matCompRight)) matCompLeft else null + } else if (f == scaladsl.Keep.right) { + if (IgnorableMatValComp(matCompLeft)) matCompRight else null + } else null + if (comp == null) Combine(f.asInstanceOf[(Any, Any) ⇒ Any], matCompLeft, matCompRight) + else comp + } CompositeModule( - modules1 ++ modules2, + modulesLeft ++ modulesRight, AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets), downstreams ++ that.downstreams, upstreams ++ that.upstreams, - // would like to optimize away this allocation for Keep.{left,right} but that breaks side-effecting transformations - Combine(f.asInstanceOf[(Any, Any) ⇒ Any], matComputation1, matComputation2), + mat, Attributes.none) } @@ -314,7 +338,7 @@ object StreamLayout { final override def equals(obj: scala.Any): Boolean = super.equals(obj) } - object EmptyModule extends Module { + case object EmptyModule extends Module { override def shape = ClosedShape override def replaceShape(s: Shape) = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") @@ -357,7 +381,7 @@ object StreamLayout { override def isCopied: Boolean = true - override def toString: String = s"$copyOf (copy)" + override def toString: String = f"[${System.identityHashCode(this)}%08x] copy of $copyOf" } final case class CompositeModule( @@ -379,13 +403,13 @@ object StreamLayout { override def withAttributes(attributes: Attributes): Module = copy(attributes = attributes) override def toString = - s""" + f"""CompositeModule [${System.identityHashCode(this)}%08x] | Name: ${this.attributes.nameOrDefault("unnamed")} | Modules: - | ${subModules.iterator.map(m ⇒ m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} + | ${subModules.iterator.map(m ⇒ s"(${m.attributes.nameLifted.getOrElse("unnamed")}) ${m.toString.replaceAll("\n", "\n ")}").mkString("\n ")} | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} - |""".stripMargin + | MatValue: $materializedValueComputation""".stripMargin } object CompositeModule { @@ -414,13 +438,24 @@ object StreamLayout { override def withAttributes(attributes: Attributes): FusedModule = copy(attributes = attributes) override def toString = - s""" - | Name: ${this.attributes.nameOrDefault("unnamed")} - | Modules: - | ${subModules.iterator.map(m ⇒ m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} - | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} - | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} - |""".stripMargin + f"""FusedModule [${System.identityHashCode(this)}%08x] + | Name: ${this.attributes.nameOrDefault("unnamed")} + | Modules: + | ${subModules.iterator.map(m ⇒ m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} + | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} + | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} + | MatValue: $materializedValueComputation""".stripMargin + } + + /** + * This is the only extension point for the sealed type hierarchy: composition + * (i.e. the module tree) is managed strictly within this file, only leaf nodes + * may be declared elsewhere. + */ + abstract class AtomicModule extends Module { + final override def subModules: Set[Module] = Set.empty + final override def downstreams: Map[OutPort, InPort] = super.downstreams + final override def upstreams: Map[InPort, OutPort] = super.upstreams } } @@ -718,6 +753,8 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo new ju.HashMap[InPort, AnyRef] :: Nil private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] = new ju.HashMap[OutPort, Publisher[Any]] :: Nil + private var matValSrcStack: List[ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]]] = + new ju.HashMap[MaterializedValueNode, List[MaterializedValueSource[Any]]] :: Nil /* * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule @@ -732,13 +769,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo private def subscribers: ju.Map[InPort, AnyRef] = subscribersStack.head private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head private def currentLayout: Module = moduleStack.head + private def matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = matValSrcStack.head // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies // of the same module. // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter private def enterScope(enclosing: CopiedModule): Unit = { + if (MaterializerSession.Debug) println(f"entering scope [${System.identityHashCode(enclosing)}%08x]") subscribersStack ::= new ju.HashMap publishersStack ::= new ju.HashMap + matValSrcStack ::= new ju.HashMap moduleStack ::= enclosing.copyOf } @@ -747,12 +787,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // leading to port identity collisions) // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter private def exitScope(enclosing: CopiedModule): Unit = { + if (MaterializerSession.Debug) println(f"exiting scope [${System.identityHashCode(enclosing)}%08x]") val scopeSubscribers = subscribers val scopePublishers = publishers subscribersStack = subscribersStack.tail publishersStack = publishersStack.tail + matValSrcStack = matValSrcStack.tail moduleStack = moduleStack.tail + if (MaterializerSession.Debug) println(s" subscribers = $scopeSubscribers\n publishers = $scopePublishers") + // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of // the original module and assign them to the copy ports in the outer scope that we will return to enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { @@ -765,6 +809,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } final def materialize(): Any = { + if (MaterializerSession.Debug) println(s"beginning materialization of $topLevel") require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)") require( topLevel.isRunnable, @@ -789,7 +834,6 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo protected def mergeAttributes(parent: Attributes, current: Attributes): Attributes = parent and current - private val matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = new ju.HashMap def registerSrc(ms: MaterializedValueSource[Any]): Unit = { if (MaterializerSession.Debug) println(s"registering source $ms") matValSrc.get(ms.computation) match { @@ -801,53 +845,60 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo protected def materializeModule(module: Module, effectiveAttributes: Attributes): Any = { val materializedValues: ju.Map[Module, Any] = new ju.HashMap + if (MaterializerSession.Debug) println(f"entering module [${System.identityHashCode(module)}%08x] (${Logging.simpleName(module)})") + for (submodule ← module.subModules) { val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes) submodule match { - case GraphStageModule(shape, attributes, mv: MaterializedValueSource[_]) ⇒ - val copy = mv.copySrc.asInstanceOf[MaterializedValueSource[Any]] - registerSrc(copy) - materializeAtomic(copy.module, subEffectiveAttributes, materializedValues) - case atomic if atomic.isAtomic ⇒ + case atomic: AtomicModule ⇒ materializeAtomic(atomic, subEffectiveAttributes, materializedValues) case copied: CopiedModule ⇒ enterScope(copied) materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes)) exitScope(copied) - case composite ⇒ + case composite @ (_: CompositeModule | _: FusedModule) ⇒ materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes)) + case EmptyModule => // nothing to do or say } } if (MaterializerSession.Debug) { - println("RESOLVING") - println(s" module = $module") - println(s" computation = ${module.materializedValueComputation}") + println(f"resolving module [${System.identityHashCode(module)}%08x] computation ${module.materializedValueComputation}") println(s" matValSrc = $matValSrc") - println(s" matVals = $materializedValues") + println(s" matVals =\n ${materializedValues.asScala.map(p ⇒ "%08x".format(System.identityHashCode(p._1)) -> p._2).mkString("\n ")}") } - resolveMaterialized(module.materializedValueComputation, materializedValues, " ") + + val ret = resolveMaterialized(module.materializedValueComputation, materializedValues, 2) + while (!matValSrc.isEmpty) { + val node = matValSrc.keySet.iterator.next() + if (MaterializerSession.Debug) println(s" delayed computation of $node") + resolveMaterialized(node, materializedValues, 4) + } + + if (MaterializerSession.Debug) println(f"exiting module [${System.identityHashCode(module)}%08x]") + + ret } protected def materializeComposite(composite: Module, effectiveAttributes: Attributes): Any = { materializeModule(composite, effectiveAttributes) } - protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit + protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit - private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], indent: String): Any = { - if (MaterializerSession.Debug) println(indent + matNode) + private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], spaces: Int): Any = { + if (MaterializerSession.Debug) println(" " * spaces + matNode) val ret = matNode match { case Atomic(m) ⇒ matVal.get(m) - case Combine(f, d1, d2) ⇒ f(resolveMaterialized(d1, matVal, indent + " "), resolveMaterialized(d2, matVal, indent + " ")) - case Transform(f, d) ⇒ f(resolveMaterialized(d, matVal, indent + " ")) + case Combine(f, d1, d2) ⇒ f(resolveMaterialized(d1, matVal, spaces + 2), resolveMaterialized(d2, matVal, spaces + 2)) + case Transform(f, d) ⇒ f(resolveMaterialized(d, matVal, spaces + 2)) case Ignore ⇒ NotUsed } - if (MaterializerSession.Debug) println(indent + s"result = $ret") + if (MaterializerSession.Debug) println(" " * spaces + s"result = $ret") matValSrc.remove(matNode) match { case null ⇒ // nothing to do case srcs ⇒ - if (MaterializerSession.Debug) println(indent + s"triggering sources $srcs") + if (MaterializerSession.Debug) println(" " * spaces + s"triggering sources $srcs") srcs.foreach(_.setValue(ret)) } ret diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index dbb16227aa..59faa28470 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -10,7 +10,7 @@ import akka.event.Logging import akka.stream._ import akka.stream.impl._ import akka.stream.impl.ReactiveStreamsCompliance._ -import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module } +import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module, AtomicModule } import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Subscriber, Subscription } @@ -23,9 +23,9 @@ import scala.annotation.tailrec /** * INTERNAL API */ -private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes, - matValIDs: Array[Module]) extends Module { - override def subModules: Set[Module] = Set.empty +private[stream] final case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes, + matValIDs: Array[Module]) extends AtomicModule { + override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr) override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) @@ -34,7 +34,12 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at if (newShape != shape) CompositeModule(this, newShape) else this - override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes" + override def toString: String = + s"""GraphModule + | ${assembly.toString.replace("\n", "\n ")} + | shape=$shape, attributes=$attributes + | matVals= + | ${matValIDs.mkString("\n ")}""".stripMargin } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala index ca1fd38af2..273e0cb672 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala @@ -27,7 +27,14 @@ private[stream] object Fusing { /** * Fuse everything that is not forbidden via AsyncBoundary attribute. */ - def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = { + def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = + g match { + case fg: FusedGraph[_, _] ⇒ fg + case FusedGraph(module, shape) => FusedGraph(module, shape) + case _ ⇒ doAggressive(g) + } + + private def doAggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = { val struct = new BuildStructuralInfo /* * First perform normalization by descending the module tree and recording @@ -153,6 +160,7 @@ private[stream] object Fusing { } pos += 1 + case _ => throw new IllegalArgumentException("unexpected module structure") } val outsB2 = new Array[Outlet[_]](insB2.size) @@ -178,6 +186,7 @@ private[stream] object Fusing { } } pos += 1 + case _ => throw new IllegalArgumentException("unexpected module structure") } /* @@ -207,7 +216,10 @@ private[stream] object Fusing { copyToArray(outOwnersB3.iterator, outOwners, outStart) // FIXME attributes should contain some naming info and async boundary where needed - val firstModule = group.iterator.next() + val firstModule = group.iterator.next() match { + case c: CopiedModule => c + case _ => throw new IllegalArgumentException("unexpected module structure") + } val async = if (isAsync(firstModule)) Attributes(AsyncBoundary) else Attributes.none val disp = dispatcher(firstModule) match { case None ⇒ Attributes.none @@ -253,7 +265,7 @@ private[stream] object Fusing { case _ if m.isAtomic ⇒ true // non-GraphStage atomic or has AsyncBoundary case _ ⇒ m.attributes.contains(AsyncBoundary) } - if (Debug) log(s"entering ${m.getClass} (hash=${m.hashCode}, async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})") + if (Debug) log(s"entering ${m.getClass} (hash=${struct.hash(m)}, async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})") val localGroup = if (async) struct.newGroup(indent) else openGroup @@ -315,6 +327,7 @@ private[stream] object Fusing { struct.registerInternals(newShape, indent) copy + case _ => throw new IllegalArgumentException("unexpected module structure") } val newgm = gm.copy(shape = oldShape.copyFromPorts(oldIns.toList, oldOuts.toList), matValIDs = newids) // make sure to add all the port mappings from old GraphModule Shape to new shape @@ -356,7 +369,7 @@ private[stream] object Fusing { subMatBuilder ++= res } val subMat = subMatBuilder.result() - if (Debug) log(subMat.map(p ⇒ s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, "")) + if (Debug) log(subMat.map(p ⇒ s"${p._1.getClass.getName}[${struct.hash(p._1)}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, "")) // we need to remove all wirings that this module copied from nested modules so that we // don’t do wirings twice val oldDownstreams = m match { @@ -370,17 +383,17 @@ private[stream] object Fusing { // now rewrite the materialized value computation based on the copied modules and their computation nodes val matNodeMapping: ju.Map[MaterializedValueNode, MaterializedValueNode] = new ju.HashMap val newMat = rewriteMat(subMat, m.materializedValueComputation, matNodeMapping) + if (Debug) log(matNodeMapping.asScala.map(p ⇒ s"${p._1} -> ${p._2}").mkString("matNodeMapping\n " + " " * indent, "\n " + " " * indent, "")) // and finally rewire all MaterializedValueSources to their new computation nodes val matSrcs = struct.exitMatCtx() matSrcs.foreach { c ⇒ - if (Debug) log(s"materialized value source: ${struct.hash(c)}") - val ms = c.copyOf match { - case g: GraphStageModule ⇒ g.stage.asInstanceOf[MaterializedValueSource[Any]] - } + val ms = c.copyOf.asInstanceOf[GraphStageModule].stage.asInstanceOf[MaterializedValueSource[Any]] val mapped = ms.computation match { case Atomic(sub) ⇒ subMat(sub) + case Ignore => Ignore case other ⇒ matNodeMapping.get(other) } + if (Debug) log(s"materialized value source: ${c.copyOf} -> $mapped") require(mapped != null, s"mismatch:\n ${ms.computation}\n ${m.materializedValueComputation}") val newSrc = new MaterializedValueSource[Any](mapped, ms.out) val replacement = CopiedModule(c.shape, c.attributes, newSrc.module) @@ -692,7 +705,7 @@ private[stream] object Fusing { /** * Determine whether the given CopiedModule has an AsyncBoundary attribute. */ - private def isAsync(m: Module): Boolean = m match { + private def isAsync(m: CopiedModule): Boolean = m match { case CopiedModule(_, inherited, orig) ⇒ val attr = inherited and orig.attributes attr.contains(AsyncBoundary) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 07bac7504b..751a9ebd2a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -191,14 +191,17 @@ private[akka] object GraphInterpreter { (inHandlers, outHandlers, logics) } - override def toString: String = + override def toString: String = { + val stageList = stages.iterator.zip(originalAttributes.iterator).map { + case (stage, attr) ⇒ s"${stage.module} [${attr.attributeList.mkString(", ")}]" + } "GraphAssembly\n " + - stages.mkString("Stages: [", ",", "]") + "\n " + - originalAttributes.mkString("Attributes: [", ",", "]") + "\n " + - ins.mkString("Inlets: [", ",", "]") + "\n " + - inOwners.mkString("InOwners: [", ",", "]") + "\n " + - outs.mkString("Outlets: [", ",", "]") + "\n " + - outOwners.mkString("OutOwners: [", ",", "]") + stageList.mkString("[ ", "\n ", "\n ]") + "\n " + + ins.mkString("[", ",", "]") + "\n " + + inOwners.mkString("[", ",", "]") + "\n " + + outs.mkString("[", ",", "]") + "\n " + + outOwners.mkString("[", ",", "]") + } } object GraphAssembly { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 37030d83a9..c02fd1d3c6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -24,20 +24,18 @@ import scala.util.Try */ private[akka] final case class GraphStageModule(shape: Shape, attributes: Attributes, - stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module { + stage: GraphStageWithMaterializedValue[Shape, Any]) extends AtomicModule { override def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) override def replaceShape(s: Shape): Module = if (s != shape) CompositeModule(this, s) else this - override def subModules: Set[Module] = Set.empty - override def withAttributes(attributes: Attributes): Module = if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage) else this - override def toString: String = stage.toString + override def toString: String = f"GraphStage($stage) [${System.identityHashCode(this)}%08x]" } /** @@ -133,6 +131,7 @@ object GraphStages { override val initialAttributes = Attributes.name("breaker") override val shape = FlowShape(Inlet[Any]("breaker.in"), Outlet[Any]("breaker.out")) + override def toString: String = "Breaker" override def createLogicAndMaterializedValue(attr: Attributes) = { val promise = Promise[Breaker] @@ -167,6 +166,7 @@ object GraphStages { override val shape = BidiShape( Inlet[Any]("breaker.in1"), Outlet[Any]("breaker.out1"), Inlet[Any]("breaker.in2"), Outlet[Any]("breaker.out2")) + override def toString: String = "BidiBreaker" override def createLogicAndMaterializedValue(attr: Attributes) = { val promise = Promise[Breaker] @@ -215,21 +215,6 @@ object GraphStages { def bidiBreaker[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]] = BidiBreaker.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]]] - private object TickSource { - class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { - private val cancelPromise = Promise[Done]() - - def cancelFuture: Future[Done] = cancelPromise.future - - override def cancel(): Boolean = { - if (!isCancelled) cancelPromise.trySuccess(Done) - true - } - - override def isCancelled: Boolean = cancelled.get() - } - } - private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] { val in = Inlet[Any]("terminationWatcher.in") val out = Outlet[Any]("terminationWatcher.out") @@ -269,6 +254,21 @@ object GraphStages { def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] = TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]] + private object TickSource { + class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { + private val cancelPromise = Promise[Done]() + + def cancelFuture: Future[Done] = cancelPromise.future + + override def cancel(): Boolean = { + if (!isCancelled) cancelPromise.trySuccess(Done) + true + } + + override def isCancelled: Boolean = cancelled.get() + } + } + final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { override val shape = SourceShape(Outlet[T]("TickSource.out")) @@ -302,7 +302,7 @@ object GraphStages { (logic, cancellable) } - override def toString: String = "TickSource" + override def toString: String = s"TickSource($initialDelay, $interval, $tick)" } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index 58f91094d0..ebe6d55500 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -22,6 +22,8 @@ import scala.concurrent.{ Future, Promise } private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) extends SinkModule[ByteString, Future[IOResult]](shape) { + override protected def label: String = s"FileSink($f, $options)" + override def create(context: MaterializationContext) = { val materializer = ActorMaterializer.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) @@ -68,4 +70,3 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va override def withAttributes(attr: Attributes): Module = new OutputStreamSink(createOutput, attr, amendShape(attr), autoFlush) } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 855f241e7f..2a080cdba9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -41,6 +41,8 @@ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: At override def withAttributes(attr: Attributes): Module = new FileSource(f, chunkSize, attr, amendShape(attr)) + + override protected def label: String = s"FileSource($f, $chunkSize)" } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index 4d601836ac..7527b4ace5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -28,8 +28,8 @@ private[akka] object InputStreamPublisher { /** INTERNAL API */ private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int) - extends akka.stream.actor.ActorPublisher[ByteString] - with ActorLogging { + extends akka.stream.actor.ActorPublisher[ByteString] + with ActorLogging { // TODO possibly de-duplicate with FilePublisher? diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 93b0eed1a2..f0ac7c9a1e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -140,7 +140,8 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt case Failed(ex) ⇒ isStageAlive = false throw new IOException(ex) - case null ⇒ throw new IOException("Timeout on waiting for new data") + case null ⇒ throw new IOException("Timeout on waiting for new data") + case Initialized ⇒ throw new IllegalStateException("message 'Initialized' must come first") } } catch { case ex: InterruptedException ⇒ throw new IOException(ex) @@ -215,4 +216,3 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt } } } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 84bcb032ee..1fce7c240b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -3,7 +3,7 @@ package akka.stream.impl.io import javax.net.ssl.SSLContext import akka.stream._ -import akka.stream.impl.StreamLayout.{ CompositeModule, Module } +import akka.stream.impl.StreamLayout.{ CompositeModule, AtomicModule } import akka.stream.TLSProtocol._ import akka.util.ByteString @@ -15,11 +15,10 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu shape: Shape, attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, - role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends Module { - override def subModules: Set[Module] = Set.empty + role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule { - override def withAttributes(att: Attributes): Module = copy(attributes = att) - override def carbonCopy: Module = + override def withAttributes(att: Attributes): TlsModule = copy(attributes = att) + override def carbonCopy: TlsModule = TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) override def replaceShape(s: Shape) = @@ -27,6 +26,8 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu shape.requireSamePortsAs(s) CompositeModule(this, s) } else this + + override def toString: String = f"TlsModule($firstSession, $role, $closing, $hostInfo) [${System.identityHashCode(this)}%08x]" } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 540dd45fb0..027a866a59 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -75,6 +75,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends override def shape: FlowShape[In, Out] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module + override def toString: String = delegate.toString + /** Converts this Flow to its Scala DSL counterpart */ def asScala: scaladsl.Flow[In, Out, Mat] = delegate @@ -119,6 +121,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * }}} * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting Flow. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow)(combinerToScala(combine))) @@ -158,6 +163,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * }}} * The `combine` function is used to compose the materialized values of this flow and that * Sink into the materialized value of the resulting Sink. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = new Sink(delegate.toMat(sink)(combinerToScala(combine))) @@ -189,6 +197,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * }}} * The `combine` function is used to compose the materialized values of this flow and that * Flow into the materialized value of the resulting Flow. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] = RunnableGraph.fromGraph(delegate.joinMat(flow)(combinerToScala(combine))) @@ -228,6 +239,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * }}} * The `combine` function is used to compose the materialized values of this flow and that * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = new Flow(delegate.joinMat(bidi)(combinerToScala(combine))) @@ -1246,6 +1260,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#concat]] */ def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = @@ -1282,7 +1299,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * - * @see [[#prepend]]. + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#prepend]] */ def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.prependMat(that)(combinerToScala(matF))) @@ -1306,6 +1326,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#alsoTo]] */ def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], @@ -1349,7 +1372,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. * - * @see [[#interleave]]. + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#interleave]] */ def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = @@ -1389,6 +1415,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#merge]] */ def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], @@ -1399,6 +1428,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#merge]] */ def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], @@ -1431,6 +1463,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * waiting for elements, this merge will block when one of the inputs does not have more elements (and * does not complete). * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#mergeSorted]]. */ def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: Comparator[U], @@ -1454,12 +1489,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#zip]] */ def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create(that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1) @@ -1487,6 +1525,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#zipWith]] */ def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], @@ -1792,6 +1833,9 @@ object RunnableGraph { private final class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] { def shape = ClosedShape def module = runnable.module + + override def toString: String = runnable.toString + override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraphAdapter[Mat2] = new RunnableGraphAdapter(runnable.mapMaterializedValue(f.apply _)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 68d8ba151f..18335aff7d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -256,6 +256,8 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink override def shape: SinkShape[In] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module + override def toString: String = delegate.toString + /** Converts this Sink to its Scala DSL counterpart */ def asScala: scaladsl.Sink[In, Mat] = delegate diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index dcd39491ba..4bb92f3451 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -323,6 +323,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap private[stream] def module: StreamLayout.Module = delegate.module + override def toString: String = delegate.toString + /** Converts this Java DSL element to its Scala DSL counterpart. */ def asScala: scaladsl.Source[Out, Mat] = delegate @@ -367,6 +369,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * }}} * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting Flow. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.viaMat(flow)(combinerToScala(combine))) @@ -406,6 +411,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * }}} * The `combine` function is used to compose the materialized values of this flow and that * Sink into the materialized value of the resulting Sink. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] = RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine))) @@ -470,6 +478,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#concat]]. */ def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], @@ -507,6 +518,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#prepend]]. */ def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], @@ -532,6 +546,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#alsoTo]] */ def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], @@ -574,6 +591,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * If one of sources gets upstream error - stream completes with failure. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#interleave]]. */ def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, @@ -599,6 +619,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#merge]]. */ def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], @@ -630,6 +653,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * waiting for elements, this merge will block when one of the inputs does not have more elements (and * does not complete). * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#mergeSorted]]. */ def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: util.Comparator[U], @@ -653,6 +679,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap /** * Combine the elements of current [[Source]] and the given one into a stream of tuples. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#zip]]. */ def zipMat[T, M, M2](that: Graph[SourceShape[T], M], @@ -679,6 +708,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Put together the elements of current [[Source]] and the given one * into a stream of combined elements using a combiner function. * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * * @see [[#zipWith]]. */ def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala index cbf1188e02..6de66e5854 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala @@ -6,6 +6,8 @@ package akka.stream package object javadsl { def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) ⇒ M = f match { + case x if x eq Keep.left ⇒ scaladsl.Keep.left.asInstanceOf[(M1, M2) ⇒ M] + case x if x eq Keep.right ⇒ scaladsl.Keep.right.asInstanceOf[(M1, M2) ⇒ M] case s: Function2[_, _, _] ⇒ s.asInstanceOf[(M1, M2) ⇒ M] case other ⇒ other.apply _ } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8da794ede0..d49a36a4f3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -30,6 +30,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]] + override def toString: String = s"Flow($shape, $module)" + override type Repr[+O] = Flow[In @uncheckedVariance, O, Mat @uncheckedVariance] override type ReprMat[+O, +M] = Flow[In @uncheckedVariance, O, M] @@ -42,8 +44,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = if (this.isIdentity) { - Flow.fromGraph(flow.asInstanceOf[Graph[FlowShape[In, T], Mat2]]) - .mapMaterializedValue(combine(NotUsed.asInstanceOf[Mat], _)) + import Predef.Map.empty + import StreamLayout.{ CompositeModule, Ignore, IgnorableMatValComp, Transform, Atomic, Combine } + val m = flow.module + val mat = + if (combine == Keep.left) { + if (IgnorableMatValComp(m)) Ignore else Transform(_ => NotUsed, Atomic(m)) + } else Combine(combine.asInstanceOf[(Any, Any) => Any], Ignore, Atomic(m)) + new Flow(CompositeModule(Set(m), m.shape, empty, empty, mat, Attributes.none)) } else { val flowCopy = flow.module.carbonCopy new Flow( @@ -86,11 +94,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * }}} * The `combine` function is used to compose the materialized values of this flow and that * Sink into the materialized value of the resulting Sink. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[In, Mat3] = { if (isIdentity) Sink.fromGraph(sink.asInstanceOf[Graph[SinkShape[In], Mat2]]) - .mapMaterializedValue(combine(().asInstanceOf[Mat], _)) + .mapMaterializedValue(combine(NotUsed.asInstanceOf[Mat], _)) else { val sinkCopy = sink.module.carbonCopy new Sink( @@ -132,6 +143,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * }}} * The `combine` function is used to compose the materialized values of this flow and that * Flow into the materialized value of the resulting Flow. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = { val flowCopy = flow.module.carbonCopy @@ -176,6 +190,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * }}} * The `combine` function is used to compose the materialized values of this flow and that * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I2, O2, M] = { val copy = bidi.module.carbonCopy @@ -261,8 +278,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) - - override def toString = s"""Flow(${module})""" } object Flow { @@ -410,23 +425,23 @@ trait FlowOps[+Out, +Mat] { def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = andThen(Recover(pf)) /** - * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after - * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new - * Source may be materialized. - * - * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. - * This stage can recover the failure signal, but not the skipped elements, which will be dropped. - * - * '''Emits when''' element is available from the upstream or upstream is failed and element is available - * from alternative Source - * - * '''Backpressures when''' downstream backpressures - * - * '''Completes when''' upstream completes or upstream failed with exception pf can handle - * - * '''Cancels when''' downstream cancels - * - */ + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(pf)) @@ -466,27 +481,27 @@ trait FlowOps[+Out, +Mat] { def mapConcat[T](f: Out ⇒ immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f) /** - * Transform each input element into an `Iterable` of output elements that is - * then flattened into the output stream. The transformation is meant to be stateful, - * which is enabled by creating the transformation function anew for every materialization — - * the returned function will typically close over mutable objects to store state between - * invocations. For the stateless variant see [[FlowOps.mapConcat]]. - * - * The returned `Iterable` MUST NOT contain `null` values, - * as they are illegal as stream elements - according to the Reactive Streams specification. - * - * '''Emits when''' the mapping function returns an element or there are still remaining elements - * from the previously calculated collection - * - * '''Backpressures when''' downstream backpressures or there are still remaining elements from the - * previously calculated collection - * - * '''Completes when''' upstream completes and all remaining elements has been emitted - * - * '''Cancels when''' downstream cancels - * - * See also [[FlowOps.mapConcat]] - */ + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[FlowOps.mapConcat]]. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.mapConcat]] + */ def statefulMapConcat[T](f: () ⇒ Out ⇒ immutable.Iterable[T]): Repr[T] = via(new StatefulMapConcat(f)) @@ -1813,6 +1828,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * }}} * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting Flow. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] @@ -1831,6 +1849,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * }}} * The `combine` function is used to compose the materialized values of this flow and that * Sink into the materialized value of the resulting Sink. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ClosedMat[Mat3] @@ -1838,6 +1859,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * Combine the elements of current flow and the given [[Source]] into a stream of tuples. * * @see [[#zip]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[(Out, U), Mat3] = viaMat(zipGraph(that))(matF) @@ -1847,6 +1871,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * into a stream of combined elements using a combiner function. * * @see [[#zipWith]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) ⇒ Out3)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out3, Mat3] = viaMat(zipWithGraph(that)(combine))(matF) @@ -1856,6 +1883,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * picking randomly when several elements ready. * * @see [[#merge]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(mergeGraph(that, eagerComplete))(matF) @@ -1870,6 +1900,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * If it gets error from one of upstreams - stream completes with failure. * * @see [[#interleave]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(interleaveGraph(that, request))(matF) @@ -1882,6 +1915,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * does not complete). * * @see [[#mergeSorted]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3)(implicit ord: Ordering[U]): ReprMat[U, Mat3] = viaMat(mergeSortedGraph(that))(matF) @@ -1897,6 +1933,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * * @see [[#concat]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(concatGraph(that))(matF) @@ -1912,6 +1951,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * * @see [[#prepend]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(prependGraph(that))(matF) @@ -1921,6 +1963,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * through will also be sent to the [[Sink]]. * * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] = viaMat(alsoToGraph(that))(matF) @@ -1930,6 +1975,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * The Future completes with success when received complete message from upstream or cancel * from downstream. It fails with the same error when received error message from * downstream. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. */ def watchTermination[Mat2]()(matF: (Mat, Future[Done]) ⇒ Mat2): ReprMat[Out, Mat2] = viaMat(GraphStages.terminationWatcher)(matF) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index caca92f7b5..cbdc2d69dd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -8,7 +8,7 @@ import akka.stream._ import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages.MaterializedValueSource -import akka.stream.impl.Stages.{ DefaultAttributes, StageModule} +import akka.stream.impl.Stages.{ DefaultAttributes, StageModule } import akka.stream.impl.StreamLayout._ import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } @@ -862,6 +862,20 @@ object GraphDSL extends GraphApply { * @return The outlet that will emit the materialized value. */ def materializedValue: Outlet[M @uncheckedVariance] = { + /* + * This brings the graph into a homogenous shape: if only one `add` has + * been performed so far, the moduleInProgress will be a CopiedModule + * that upon the next `composeNoMat` will be wrapped together with the + * MaterializedValueSource into a CompositeModule, leading to its + * relevant computation being an Atomic() for the CopiedModule. This is + * what we must reference, and we can only get this reference if we + * create that computation up-front: just making one up will not work + * because that computation node would not be part of the tree and + * the source would not be triggered. + */ + if (moduleInProgress.isInstanceOf[CopiedModule]) { + moduleInProgress = CompositeModule(moduleInProgress, moduleInProgress.shape) + } val source = new MaterializedValueSource[M](moduleInProgress.materializedValueComputation) moduleInProgress = moduleInProgress.composeNoMat(source.module) source.out diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index f480cbe518..1d32708bfd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -27,6 +27,8 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]] + override def toString: String = s"Sink($shape, $module)" + /** * Transform this Sink by applying a function to each *incoming* upstream element before * it is passed to the [[Sink]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index fcdcae35dc..ec08270095 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -38,6 +38,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]] + override def toString: String = s"Source($shape, $module)" + override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { diff --git a/project/MiMa.scala b/project/MiMa.scala index adba135c98..4705b753ed 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -683,7 +683,21 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"), // #20009 internal and shouldn't have been public - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion") + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion"), + + // #20015 simplify materialized value computation tree + ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.subModules"), + ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.downstreams"), + ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.upstreams"), + ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#DirectProcessor.toString"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"), + ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.Stages$StageModule"), + ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#GroupBy.toString"), + ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"), + ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule") ) ) }