From 4ade8ef2d146dfd19eb2b8e67916c8ea72b430b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 27 May 2021 17:18:47 +0200 Subject: [PATCH] Lazy and fast concat and prepend (#30252) --- .../stream/operators/Source-or-Flow/concat.md | 11 ++ .../operators/Source-or-Flow/concatLazy.md | 40 +++++ .../operators/Source-or-Flow/prepend.md | 18 ++ .../operators/Source-or-Flow/prependLazy.md | 40 +++++ .../main/paradox/stream/operators/index.md | 4 + .../jdocs/stream/operators/SourceOrFlow.java | 24 +++ .../akka/stream/scaladsl/FlowConcatSpec.scala | 124 ++++++++++--- .../stream/scaladsl/FlowPrependSpec.scala | 10 ++ ...23044-concat-prepend-improvements.excludes | 11 ++ .../scala/akka/stream/impl/SingleConcat.scala | 46 +++++ .../main/scala/akka/stream/javadsl/Flow.scala | 119 ++++++++++++- .../scala/akka/stream/javadsl/Graph.scala | 6 + .../scala/akka/stream/javadsl/Source.scala | 123 +++++++++++-- .../scala/akka/stream/javadsl/SubFlow.scala | 67 ++++++- .../scala/akka/stream/javadsl/SubSource.scala | 67 ++++++- .../scala/akka/stream/scaladsl/Flow.scala | 165 +++++++++++++++--- .../scala/akka/stream/scaladsl/Graph.scala | 27 ++- 17 files changed, 825 insertions(+), 77 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md create mode 100644 akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes create mode 100644 akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md index dd0ef601f8..fe8881d034 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md @@ -14,6 +14,17 @@ After completion of the original upstream the elements of the given source will After completion of the original upstream the elements of the given source will be emitted. +Both streams will be materialized together. + +@@@ note + + The `concat` operator is for backwards compatibility reasons "detached" and will eagerly + demand an element from both upstreams when the stream is materialized and will then have a + one element buffer for each of the upstreams, this is most often not what you want, instead + use @ref(concatLazy)[concatLazy.md] + +@@@ + ## Example Scala : @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concat } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md new file mode 100644 index 0000000000..6617381c12 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md @@ -0,0 +1,40 @@ +# concatLazy + +After completion of the original upstream the elements of the given source will be emitted. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +## Signature + +@apidoc[Source.concat](Source) { scala="#concatLazy[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#concatLazy(akka.stream.Graph)" } +@apidoc[Flow.concat](Flow) { scala="#concatLazy[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#concatLazy(akka.stream.Graph)" } + + +## Description + +After completion of the original upstream the elements of the given source will be emitted. + +Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref(concat)[concat.md], introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.) + +To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref(Source.lazySource)[../Source/lazySource.md]. + +If materialized values needs to be collected `concatLazyMat` is available. + +## Example +Scala +: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concatLazy } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concatLazy } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the current stream has an element available; if the current input completes, it tries the next one + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md index dab4264eb7..7d6b61c9ca 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md @@ -14,8 +14,26 @@ Prepends the given source to the flow, consuming it until completion before the Prepends the given source to the flow, consuming it until completion before the original source is consumed. +@@@ note + + The `prepend` operator is for backwards compatibility reasons "detached" and will eagerly + demand an element from both upstreams when the stream is materialized and will then have a + one element buffer for each of the upstreams, this is most often not what you want, instead + use @ref(prependLazy)[prependLazy.md] + +@@@ + If materialized values needs to be collected `prependMat` is available. +@@@ note + +The `prepend` operator is for backwards compatibility reasons "detached" and will eagerly +demand an element from both upstreams when the stream is materialized and will then have a +one element buffer for each of the upstreams, this is not always what you want, if not, +use @ref(prependLazy)[prependLazy.md] + +@@@ + ## Example Scala : @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prepend } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md new file mode 100644 index 0000000000..9b6ec504fb --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md @@ -0,0 +1,40 @@ +# prependLazy + +Prepends the given source to the flow, consuming it until completion before the original source is consumed. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +## Signature + +@apidoc[Source.prepend](Source) { scala="#prepend[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#prepend(akka.stream.Graph)" } +@apidoc[Flow.prepend](Flow) { scala="#prepend[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#prepend(akka.stream.Graph)" } + + +## Description + +Prepends the given source to the flow, consuming it until completion before the original source is consumed. + +Both streams will be materialized together, however, the original stream will be pulled for the first time only after the prepended upstream was completed. (In contrast, @ref(prepend)[prepend.md], introduces single-element buffers after both, original and given sources so that the original source is also pulled once immediately.) + +If materialized values needs to be collected `prependLazyMat` is available. + +See also @ref[prepend](prepend.md) which is detached. + +## Example +Scala +: @@snip [FlowPrependSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prependLazy } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #prependLazy } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the given stream has an element available; if the given input completes, it tries the current one + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 8a95918bbb..abc0fa1510 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -262,6 +262,7 @@ the inputs in different ways. |--|--|--| | |@ref[MergeSequence](MergeSequence.md)|Merge a linear sequence partitioned across multiple sources.| |Source/Flow|@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.| +|Source/Flow|@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.| |Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.| |Source/Flow|@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.| |Source/Flow|@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.| @@ -270,6 +271,7 @@ the inputs in different ways. |Source/Flow|@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.| |Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.| |Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| +|Source/Flow|@ref[prependLazy](Source-or-Flow/prependLazy.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| |Source/Flow|@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.| |Source/Flow|@ref[zipAll](Source-or-Flow/zipAll.md)|Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source.| |Source/Flow|@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.| @@ -393,6 +395,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [completionStageSource](Source/completionStageSource.md) * [completionTimeout](Source-or-Flow/completionTimeout.md) * [concat](Source-or-Flow/concat.md) +* [concatLazy](Source-or-Flow/concatLazy.md) * [conflate](Source-or-Flow/conflate.md) * [conflateWithSeed](Source-or-Flow/conflateWithSeed.md) * [cycle](Source/cycle.md) @@ -504,6 +507,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [prefixAndTail](Source-or-Flow/prefixAndTail.md) * [preMaterialize](Sink/preMaterialize.md) * [prepend](Source-or-Flow/prepend.md) +* [prependLazy](Source-or-Flow/prependLazy.md) * [queue](Source/queue.md) * [queue](Sink/queue.md) * [range](Source/range.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 7d9bd7af26..31e0e63ad6 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -19,7 +19,9 @@ import akka.japi.function.Function2; // #zip-with-index // #or-else // #prepend +// #prependLazy // #concat +// #concatLazy // #interleave // #merge // #merge-sorted @@ -33,7 +35,9 @@ import java.util.*; // #merge // #interleave // #concat +// #concatLazy // #prepend +// #prependLazy // #or-else // #zip-with-index // #zip-with @@ -124,6 +128,16 @@ class SourceOrFlow { // #prepend } + void prependLazyExample() { + // #prepend + Source ladies = Source.from(Arrays.asList("Emma", "Emily")); + Source gentlemen = Source.from(Arrays.asList("Liam", "William")); + gentlemen.prependLazy(ladies).runWith(Sink.foreach(System.out::print), system); + // this will print "Emma", "Emily", "Liam", "William" + + // #prepend + } + void concatExample() { // #concat Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); @@ -134,6 +148,16 @@ class SourceOrFlow { // #concat } + void concatLazyExample() { + // #concat + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + sourceA.concatLazy(sourceB).runWith(Sink.foreach(System.out::print), system); + // prints 1, 2, 3, 4, 10, 20, 30, 40 + + // #concat + } + void interleaveExample() { // #interleave Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index a55a625bdb..45970405d2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -4,27 +4,36 @@ package akka.stream.scaladsl -import scala.concurrent.{ Await, Promise } -import scala.concurrent.duration._ - -import org.reactivestreams.Publisher - import akka.NotUsed -import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber } +import akka.stream.testkit.BaseTwoStreamsSetup +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink +import org.reactivestreams.Publisher +import org.scalatest.concurrent.ScalaFutures -class FlowConcatSpec extends BaseTwoStreamsSetup { +import java.util.concurrent.atomic.AtomicBoolean +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration._ + +abstract class AbstractFlowConcatSpec extends BaseTwoStreamsSetup { override type Outputs = Int - override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + def eager: Boolean + + // not used but we want the rest of the BaseTwoStreamsSetup infra + override def setup(p1: Publisher[Int], p2: Publisher[Int]): TestSubscriber.Probe[Int] = { val subscriber = TestSubscriber.probe[Outputs]() - Source.fromPublisher(p1).concat(Source.fromPublisher(p2)).runWith(Sink.fromSubscriber(subscriber)) + val s1 = Source.fromPublisher(p1) + val s2 = Source.fromPublisher(p2) + (if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(Sink.fromSubscriber(subscriber)) subscriber } - "A Concat for Flow " must { + s"${if (eager) "An eager" else "A lazy"} Concat for Flow " must { "be able to concat Flow with a Source" in { val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s") @@ -34,7 +43,8 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { val subs = TestSubscriber.manualProbe[Any]() val subSink = Sink.asPublisher[Any](false) - val (_, res) = f1.concat(s2).runWith(s1, subSink) + val (_, res) = + (if (eager) f1.concatLazy(s2) else f1.concat(s2)).runWith(s1, subSink) res.subscribe(subs) val sub = subs.expectSubscription() @@ -51,7 +61,8 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { val subs = TestSubscriber.manualProbe[Any]() val subSink = Sink.asPublisher[Any](false) - val (_, res) = f2.prepend(s1).runWith(s2, subSink) + val (_, res) = + (if (eager) f2.prepend(s1) else f2.prependLazy(s1)).runWith(s2, subSink) res.subscribe(subs) val sub = subs.expectSubscription() @@ -121,7 +132,10 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { "correctly handle async errors in secondary upstream" in assertAllStagesStopped { val promise = Promise[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(List(1, 2, 3)).concat(Source.future(promise.future)).runWith(Sink.fromSubscriber(subscriber)) + val s1 = Source(List(1, 2, 3)) + val s2 = Source.future(promise.future) + + (if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(Sink.fromSubscriber(subscriber)) val subscription = subscriber.expectSubscription() subscription.request(4) @@ -131,7 +145,9 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { } "work with Source DSL" in { - val testSource = Source(1 to 5).concatMat(Source(6 to 10))(Keep.both).grouped(1000) + val s1 = Source(1 to 5) + val s2 = Source(6 to 10) + val testSource = (if (eager) s1.concatMat(s2)(Keep.both) else s1.concatLazyMat(s2)(Keep.both)).grouped(1000) Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10) val runnable = testSource.toMat(Sink.ignore)(Keep.left) @@ -143,9 +159,11 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { } "work with Flow DSL" in { + val s1 = Source(1 to 5) + val s2 = Source(6 to 10) val testFlow: Flow[Int, Seq[Int], (NotUsed, NotUsed)] = - Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000) - Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) + (if (eager) Flow[Int].concatMat(s2)(Keep.both) else Flow[Int].concatLazyMat(s2)(Keep.both)).grouped(1000) + Await.result(s1.viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore) val x = runnable.run() @@ -158,8 +176,12 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { } "work with Flow DSL2" in { - val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000) - Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) + val s1 = Source(1 to 5) + val s2 = Source(6 to 10) + val testFlow = + (if (eager) Flow[Int].concatMat(s2)(Keep.both) + else Flow[Int].concatLazyMat(s2)(Keep.both)).grouped(1000) + Await.result(s1.viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) val sink = testFlow.concatMat(Source(1 to 5))(Keep.both).to(Sink.ignore).mapMaterializedValue[String] { case ((m1, m2), m3) => @@ -174,8 +196,10 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { "subscribe at once to initial source and to one that it's concat to" in { val publisher1 = TestPublisher.probe[Int]() val publisher2 = TestPublisher.probe[Int]() + val s1 = Source.fromPublisher(publisher1) + val s2 = Source.fromPublisher(publisher2) val probeSink = - Source.fromPublisher(publisher1).concat(Source.fromPublisher(publisher2)).runWith(TestSink.probe[Int]) + (if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(TestSink.probe[Int]) val sub1 = publisher1.expectSubscription() val sub2 = publisher2.expectSubscription() @@ -193,11 +217,32 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { probeSink.expectComplete() } + "optimize away empty concat" in { + val s1 = Source.single(1) + val concat = if (eager) s1.concat(Source.empty) else s1.concatLazy(Source.empty) + (concat should be).theSameInstanceAs(s1) + concat.runWith(Sink.seq).futureValue should ===(Seq(1)) + } + "optimize single elem concat" in { + val s1 = Source.single(1) + val s2 = Source.single(2) + val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2) + + // avoids digging too deap into the traversal builder + concat.traversalBuilder.pendingBuilder.toString should include("SingleConcat(2)") + + concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2)) + } + } +} + +class FlowConcatSpec extends AbstractFlowConcatSpec with ScalaFutures { + override def eager: Boolean = true + + "concat" must { "work in example" in { //#concat - import akka.stream.scaladsl.Sink - import akka.stream.scaladsl.Source val sourceA = Source(List(1, 2, 3, 4)) val sourceB = Source(List(10, 20, 30, 40)) @@ -208,3 +253,40 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { } } } + +class FlowConcatLazySpec extends AbstractFlowConcatSpec { + override def eager: Boolean = false + + "concatLazy" must { + "Make it possible to entirely avoid materialization of the second flow" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + val secondStreamWasMaterialized = new AtomicBoolean(false) + Source + .fromPublisher(publisher) + .concatLazy(Source.lazySource { () => + secondStreamWasMaterialized.set(true) + Source.single(3) + }) + .runWith(Sink.fromSubscriber(subscriber)) + subscriber.request(1) + publisher.sendNext(1) + subscriber.expectNext(1) + subscriber.cancel() + publisher.expectCancellation() + // cancellation went all the way upstream across one async boundary so if second source materialization + // would happen it would have happened already + secondStreamWasMaterialized.get should ===(false) + } + + "work in example" in { + //#concatLazy + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + + sourceA.concatLazy(sourceB).runWith(Sink.foreach(println)) + //#concatLazy + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala index 5a0509990b..9a641813a9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala @@ -25,5 +25,15 @@ class FlowPrependSpec extends AkkaSpec { // this will print "Emma", "Emily", "Liam", "William" //#prepend } + + "work in lazy entrance example" in { + //#prependLazy + val ladies = Source(List("Emma", "Emily")) + val gentlemen = Source(List("Liam", "William")) + + gentlemen.prependLazy(ladies).runWith(Sink.foreach(println)) + // this will print "Emma", "Emily", "Liam", "William" + //#prependLazy + } } } diff --git a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes new file mode 100644 index 0000000000..d9684b46b0 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes @@ -0,0 +1,11 @@ +# internal API changes and stream operator additions +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Implicits#PortOpsImpl.concatGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Implicits#PortOpsImpl.prependGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.concatGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.prependGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.concatGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.prependGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.concatGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.prependGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SubFlowImpl.concatGraph") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SubFlowImpl.prependGraph") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala new file mode 100644 index 0000000000..671327cc2d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler + +/** + * Concatenating a single element to a stream is common enough that it warrants this optimization + * which avoids the actual fan-out for such cases. + * + * INTERNAL API + */ +@InternalApi +private[akka] final class SingleConcat[E](singleElem: E) extends GraphStage[FlowShape[E, E]] { + + val in = Inlet[E]("SingleConcat.in") + val out = Outlet[E]("SingleConcat.out") + + override val shape: FlowShape[E, E] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = { + push(out, grab(in)) + } + + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + emit(out, singleElem, () => completeStage()) + } + setHandlers(in, out, this) + } + + override def toString: String = s"SingleConcat($singleElem)" +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 9c8d9b0cc2..008a75aca7 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2355,10 +2355,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazy]] * * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed * @@ -2376,11 +2379,40 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] * * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.concatLazy(that)) + + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazyMat]] + * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. * @@ -2391,15 +2423,40 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = new Flow(delegate.concatMat(that)(combinerToScala(matF))) + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed + * the operator can be combined with `Source.lazy` to defer materialization of `that`. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concatMat]] + * + * @see [[#concatLazy]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def concatLazyMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = + new Flow(delegate.concatMat(that)(combinerToScala(matF))) + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it * is exhausted, at which point Flow elements will start being produced. * - * Note that this Flow will be materialized together with the [[Source]] and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is not detached use [[#prependLazy]] * * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed * @@ -2412,6 +2469,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.prepend(that)) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements + * by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is also detached use [[#prepend]] + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.prepend(that)) + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it @@ -2420,7 +2500,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Note that this Flow will be materialized together with the [[Source]] and just kept * from producing elements by asserting back-pressure until its time comes. * - * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * When needing a prepend operator that is not detached use [[#prependLazyMat]] * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. @@ -2432,6 +2512,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = new Flow(delegate.prependMat(that)(combinerToScala(matF))) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow. + * + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is detached use [[#prependMat]] + * + * @see [[#prependLazy]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def prependLazyMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = + new Flow(delegate.prependLazyMat(that)(combinerToScala(matF))) + /** * Provides a secondary source that will be consumed if this source completes without any * elements passing by. As soon as the first element comes through this stream, the alternative diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 685b88f9f4..fae4713823 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -516,6 +516,12 @@ object Concat { */ def create[T](inputCount: Int): Graph[UniformFanInShape[T, T], NotUsed] = scaladsl.Concat[T](inputCount) + /** + * Create a new anonymous `Concat` operator with the specified input types. + */ + def create[T](inputCount: Int, detachedInputs: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = + scaladsl.Concat[T](inputCount, detachedInputs) + /** * Create a new anonymous `Concat` operator with the specified input types. */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 0c5bb6dc9a..50af2d1f03 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1156,10 +1156,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * is exhausted and all result elements have been generated, * the given source elements will be produced. * - * Note that given [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazy]] * * '''Emits when''' element is available from current source or from the given [[Source]] when current is completed * @@ -1172,15 +1175,44 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = new Source(delegate.concat(that)) + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = + new Source(delegate.concatLazy(that)) + /** * Concatenate this [[Source]] with the given one, meaning that once current * is exhausted and all result elements have been generated, * the given source elements will be produced. * - * Note that given [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazyMat]] * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. @@ -1192,15 +1224,40 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.concatMat(that)(combinerToScala(matF))) + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed + * the operator can be combined with `Source.lazy` to defer materialization of `that`. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concatMat]] + * + * @see [[#concatLazy]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def concatLazyMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = + new Source(delegate.concatLazyMat(that)(combinerToScala(matF))) + /** * Prepend the given [[Source]] to this one, meaning that once the given source * is exhausted and all result elements have been generated, the current source's * elements will be produced. * - * Note that the current [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled. + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is not detached use [[#prependLazy]] * * '''Emits when''' element is available from current source or from the given [[Source]] when current is completed * @@ -1213,15 +1270,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = new Source(delegate.prepend(that)) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements + * by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is also detached use [[#prepend]] + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = + new Source(delegate.prependLazy(that)) + /** * Prepend the given [[Source]] to this one, meaning that once the given source * is exhausted and all result elements have been generated, the current source's * elements will be produced. * - * Note that the current [[Source]] is materialized together with this Flow and just kept + * Note that this Flow will be materialized together with the [[Source]] and just kept * from producing elements by asserting back-pressure until its time comes. * - * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled. + * When needing a prepend operator that is not detached use [[#prependLazyMat]] * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. @@ -1233,6 +1313,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.prependMat(that)(combinerToScala(matF))) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow. + * + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is detached use [[#prependMat]] + * + * @see [[#prependLazy]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def prependLazyMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = + new Source(delegate.prependLazyMat(that)(combinerToScala(matF))) + /** * Provides a secondary source that will be consumed if this source completes without any * elements passing by. As soon as the first element comes through this stream, the alternative diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 8073ae48f9..3ccf927d90 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1494,10 +1494,13 @@ class SubFlow[In, Out, Mat]( * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazy]] * * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed * @@ -1510,13 +1513,65 @@ class SubFlow[In, Out, Mat]( def concat[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = new SubFlow(delegate.concat(that)) + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def concatLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.concatLazy(that)) + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it * is exhausted, at which point Flow elements will start being produced. * - * Note that this Flow will be materialized together with the [[Source]] and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). + * + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is not detached use [[#prependLazy]] + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.prepend(that)) + + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements + * by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is also detached use [[#prepend]] * * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * @@ -1528,7 +1583,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ - def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = + def prependLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = new SubFlow(delegate.prepend(that)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 270a8944c7..c8743974cc 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1470,10 +1470,13 @@ class SubSource[Out, Mat]( * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazyMat]] * * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed * @@ -1486,15 +1489,44 @@ class SubSource[Out, Mat]( def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = new SubSource(delegate.concat(that)) + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = + new SubSource(delegate.concatLazy(that)) + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it * is exhausted, at which point Flow elements will start being produced. * - * Note that this Flow will be materialized together with the [[Source]] and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is not detached use [[#prependLazy]] * * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed * @@ -1507,6 +1539,29 @@ class SubSource[Out, Mat]( def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = new SubSource(delegate.prepend(that)) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements + * by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is also detached use [[#prepend]] + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prependLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = + new SubSource(delegate.prependLazy(that)) + /** * Provides a secondary source that will be consumed if this source completes without any * elements passing by. As soon as the first element comes through this stream, the alternative diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 403bde7647..93c38029d8 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -18,6 +18,7 @@ import akka.annotation.DoNotInherit import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.stream.Attributes.SourceLocation import akka.stream._ +import akka.stream.impl.SingleConcat import akka.stream.impl.{ fusing, LinearTraversalBuilder, @@ -31,6 +32,7 @@ import akka.stream.impl.{ import akka.stream.impl.fusing._ import akka.stream.impl.fusing.FlattenMerge import akka.stream.stage._ +import akka.util.OptionVal import akka.util.{ ConstantFun, Timeout } import akka.util.ccompat._ @@ -2991,8 +2993,13 @@ trait FlowOps[+Out, +Mat] { * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazy]] * * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * @@ -3005,23 +3012,97 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def concat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = - via(concatGraph(that)) + internalConcat(that, detached = true) protected def concatGraph[U >: Out, Mat2]( - that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = + that: Graph[SourceShape[U], Mat2], + detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = GraphDSL.create(that) { implicit b => r => - val merge = b.add(Concat[U]()) + val merge = b.add(Concat[U](2, detached)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) } + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def concatLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = + internalConcat(that, detached = false) + + private def internalConcat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], detached: Boolean): Repr[U] = + that match { + case source if source eq Source.empty => this.asInstanceOf[Repr[U]] + case other => + TraversalBuilder.getSingleSource(other) match { + case OptionVal.Some(singleSource) => + via(new SingleConcat(singleSource.elem.asInstanceOf[U])) + case _ => via(concatGraph(other, detached)) + } + } + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it * is exhausted, at which point Flow elements will start being produced. * - * Note that this Flow will be materialized together with the [[Source]] and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). + * + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is not detached use [[#prependLazy]] + * + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = + via(prependGraph(that, detached = true)) + + protected def prependGraph[U >: Out, Mat2]( + that: Graph[SourceShape[U], Mat2], + detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = + GraphDSL.create(that) { implicit b => r => + val merge = b.add(Concat[U](2, detached)) + r ~> merge.in(0) + FlowShape(merge.in(1), merge.out) + } + + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements + * by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is also detached use [[#prepend]] * * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * @@ -3033,16 +3114,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = - via(prependGraph(that)) - - protected def prependGraph[U >: Out, Mat2]( - that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = - GraphDSL.create(that) { implicit b => r => - val merge = b.add(Concat[U]()) - r ~> merge.in(0) - FlowShape(merge.in(1), merge.out) - } + def prependLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = + via(prependGraph(that, detached = false)) /** * Provides a secondary source that will be consumed if this stream completes without any @@ -3456,10 +3529,13 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the [[Source]] is materialized together with this Flow and just kept - * from producing elements by asserting back-pressure until its time comes. + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). * - * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a concat operator that is not detached use [[#concatLazyMat]] * * @see [[#concat]]. * @@ -3467,7 +3543,28 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * where appropriate instead of manually writing functions that pass through one of the values. */ def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = - viaMat(concatGraph(that))(matF) + viaMat(concatGraph(that, detached = true))(matF) + + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed + * the operator can be combined with `Source.lazy` to defer materialization of `that`. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concatMat]] + * + * @see [[#concatLazy]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def concatLazyMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])( + matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = + viaMat(concatGraph(that, detached = false))(matF) /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements @@ -3479,13 +3576,37 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * + * When needing a concat operator that is not detached use [[#prependLazyMat]] + * * @see [[#prepend]]. * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. */ def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = - viaMat(prependGraph(that))(matF) + viaMat(prependGraph(that, detached = true))(matF) + + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning + * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start + * (so it can not be combined with `Source.lazy` to defer materialization of `that`). + * + * This flow will then be kept from producing elements by asserting back-pressure until its time comes. + * + * When needing a prepend operator that is not detached use [[#prependLazyMat]] + * + * @see [[#prependLazy]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def prependLazyMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])( + matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = + viaMat(prependGraph(that, detached = true))(matF) /** * Provides a secondary source that will be consumed if this stream completes without any diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e0ced0761b..fa1b69b9bb 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -1244,11 +1244,34 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U object Concat { + // two streams is so common that we can re-use a single instance to avoid some allocations + private val _concatTwo = new Concat[Any](2) + private def concatTwo[T]: GraphStage[UniformFanInShape[T, T]] = + _concatTwo.asInstanceOf[GraphStage[UniformFanInShape[T, T]]] + /** - * Create a new `Concat`. + * Create a new `Concat`. Note that this for historical reasons creates a "detached" Concat which + * will eagerly pull each input on materialization and act as a one element buffer for each input. */ def apply[T](inputPorts: Int = 2): Graph[UniformFanInShape[T, T], NotUsed] = - GraphStages.withDetachedInputs(new Concat[T](inputPorts)) + apply(inputPorts, detachedInputs = true) + + /** + * Create a new `Concat` operator that will concatenate two or more streams. + * @param inputPorts The number of fan-in input ports + * @param detachedInputs If the ports should be detached (eagerly pull both inputs) useful to avoid deadlocks in graphs with loops + * @return + */ + def apply[T](inputPorts: Int, detachedInputs: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = { + val concat = { + if (inputPorts == 2) concatTwo[T] + else new Concat[T](inputPorts) + } + + if (detachedInputs) GraphStages.withDetachedInputs(concat) + else concat + } + } /**