From 078ddfa88c5a61a702568f79efa5ea3215c6c0b0 Mon Sep 17 00:00:00 2001 From: kerr Date: Thu, 1 Sep 2022 21:07:31 +0800 Subject: [PATCH] +str Add concatAllLazy stream operator (#31299) --- .../stream/operators/Source-or-Flow/concat.md | 2 +- .../operators/Source-or-Flow/concatAllLazy.md | 38 +++++++ .../operators/Source-or-Flow/concatLazy.md | 4 +- .../main/paradox/stream/operators/index.md | 2 + .../jdocs/stream/operators/SourceOrFlow.java | 19 +++- .../java/akka/stream/javadsl/SourceTest.java | 11 ++ .../scaladsl/FlowConcatAllLazySpec.scala | 101 ++++++++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 28 +++++ .../scala/akka/stream/javadsl/Source.scala | 30 +++++- .../scala/akka/stream/javadsl/SubFlow.scala | 28 +++++ .../scala/akka/stream/javadsl/SubSource.scala | 28 +++++ .../scala/akka/stream/scaladsl/Flow.scala | 40 +++++++ 12 files changed, 325 insertions(+), 6 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md index fe8881d034..d05439eb2e 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md @@ -21,7 +21,7 @@ Both streams will be materialized together. The `concat` operator is for backwards compatibility reasons "detached" and will eagerly demand an element from both upstreams when the stream is materialized and will then have a one element buffer for each of the upstreams, this is most often not what you want, instead - use @ref(concatLazy)[concatLazy.md] + use @ref:[`concatLazy`](concatLazy.md) @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md new file mode 100644 index 0000000000..2ea476b4b2 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md @@ -0,0 +1,38 @@ +# concatAllLazy + +After completion of the original upstream the elements of the given sources will be emitted sequentially. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +## Signature + +@apidoc[Source.concatAllLazy](Source) { scala="#concatAllLazy[U>:Out](those:akka.stream.Graph[akka.stream.SourceShape[U],_]*):FlowOps.this.Repr[U]" java="#concatAllLazy(akka.stream.Graph*)" } +@apidoc[Flow.concatAllLazy](Flow) { scala="#concatAllLazy[U>:Out](those:akka.stream.Graph[akka.stream.SourceShape[U],_]*):FlowOps.this.Repr[U]" java="#concatAllLazy(akka.stream.Graph*)" } + + +## Description + +After completion of the original upstream the elements of the given sources will be emitted sequentially. + +Both streams will be materialized together, however, the given streams will be pulled for the first time only after the original upstream was completed. + +To defer the materialization of the given sources (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref:[`Source.lazySource`](../Source/lazySource.md). + +## Example +Scala +: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala) { #concatAllLazy } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concatAllLazy } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the current stream has an element available; if the current input completes, it tries the next one + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md index 6617381c12..8b8aa983fb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md @@ -14,9 +14,9 @@ After completion of the original upstream the elements of the given source will After completion of the original upstream the elements of the given source will be emitted. -Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref(concat)[concat.md], introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.) +Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref:[`concat`](concat.md), introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.) -To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref(Source.lazySource)[../Source/lazySource.md]. +To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref:[`Source.lazySource`](../Source/lazySource.md). If materialized values needs to be collected `concatLazyMat` is available. diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index d8d893e138..a4170a4f46 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -268,6 +268,7 @@ the inputs in different ways. |--|--|--| | |@ref[MergeSequence](MergeSequence.md)|Merge a linear sequence partitioned across multiple sources.| |Source/Flow|@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.| +|Source/Flow|@ref[concatAllLazy](Source-or-Flow/concatAllLazy.md)|After completion of the original upstream the elements of the given sources will be emitted sequentially.| |Source/Flow|@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.| |Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.| |Source/Flow|@ref[interleaveAll](Source-or-Flow/interleaveAll.md)|Emits a specifiable number of elements from the original source, then from the provided sources and repeats.| @@ -413,6 +414,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [completionStageSource](Source/completionStageSource.md) * [completionTimeout](Source-or-Flow/completionTimeout.md) * [concat](Source-or-Flow/concat.md) +* [concatAllLazy](Source-or-Flow/concatAllLazy.md) * [concatLazy](Source-or-Flow/concatLazy.md) * [conflate](Source-or-Flow/conflate.md) * [conflateWithSeed](Source-or-Flow/conflateWithSeed.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 8a4fde1e74..31109f5c8b 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -22,6 +22,7 @@ import akka.japi.function.Function2; // #prependLazy // #concat // #concatLazy +// #concatAllLazy // #interleave // #interleaveAll // #merge @@ -38,6 +39,7 @@ import java.util.*; // #interleaveAll // #concat // #concatLazy +// #concatAllLazy // #prepend // #prependLazy // #or-else @@ -54,6 +56,7 @@ import akka.stream.Attributes; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; class SourceOrFlow { private static ActorSystem system = null; @@ -149,13 +152,25 @@ class SourceOrFlow { } void concatLazyExample() { - // #concat + // #concatLazy Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.concatLazy(sourceB).runForeach(System.out::println, system); // prints 1, 2, 3, 4, 10, 20, 30, 40 - // #concat + // #concatLazy + } + + void concatAllLazyExample() { + // #concatAllLazy + Source sourceA = Source.from(Arrays.asList(1, 2, 3)); + Source sourceB = Source.from(Arrays.asList(4, 5, 6)); + Source sourceC = Source.from(Arrays.asList(7, 8 , 9)); + sourceA.concatAllLazy(sourceB, sourceC) + .fold(new StringJoiner(","), (joiner, input) -> joiner.add(String.valueOf(input))) + .runForeach(System.out::println, system); + //prints 1,2,3,4,5,6,7,8,9 + // #concatAllLazy } void interleaveExample() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 2a20fed8b5..563c18d1ba 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -277,6 +277,17 @@ public class SourceTest extends StreamTest { assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); } + @Test + public void mustBeAbleToUseConcatAll() { + final Source sourceA = Source.from(Arrays.asList(1, 2, 3)); + final Source sourceB = Source.from(Arrays.asList(4, 5, 6)); + final Source sourceC = Source.from(Arrays.asList(7, 8, 9)); + final TestSubscriber.Probe sub = + sourceA.concatAllLazy(sourceB, sourceC).runWith(TestSink.probe(system), system); + sub.expectSubscription().request(9); + sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete(); + } + @Test public void mustBeAbleToUsePrepend() { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala new file mode 100644 index 0000000000..8e099a23fe --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +import java.util.StringJoiner +import java.util.concurrent.atomic.AtomicBoolean +import scala.util.control.NoStackTrace + +class FlowConcatAllLazySpec extends StreamSpec(""" + akka.stream.materializer.initial-input-buffer-size = 2 + akka.stream.materializer.max-input-buffer-size = 2 + """) { + + "ConcatAllLazy" must { + + val testException = new Exception("test") with NoStackTrace + + "work in the happy case" in { + val s1 = Source(1 to 2) + val s2 = Source(List.empty[Int]) + val s3 = Source(List(3)) + val s4 = Source(4 to 6) + val s5 = Source(7 to 10) + val s6 = Source.empty + val s7 = Source.single(11) + + val sub = s1.concatAllLazy(s2, s3, s4, s5, s6, s7).runWith(TestSink.probe[Int]); + sub.expectSubscription().request(11) + sub.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).expectComplete() + } + + "concat single upstream elements to its downstream" in { + val sub = Source(1 to 3).concatAllLazy().runWith(TestSink.probe[Int]) + sub.expectSubscription().request(3) + sub.expectNextN(List(1, 2, 3)).expectComplete() + } + + "can cancel other upstream sources" in { + val pub1 = TestPublisher.probe[Int]() + val pub2 = TestPublisher.probe[Int]() + Source(1 to 3) + .concatAllLazy(Source.fromPublisher(pub1), Source.fromPublisher(pub2)) + .runWith(TestSink.probe[Int]) + .request(2) + .expectNext(1, 2) + .cancel() + .expectNoMessage() + pub1.expectCancellation() + pub2.expectCancellation() + } + + "can cancel other upstream sources with error" in { + val pub1 = TestPublisher.probe[Int]() + val pub2 = TestPublisher.probe[Int]() + val (promise, sub) = Source + .maybe[Int] + .concatAllLazy(Source.fromPublisher(pub1), Source.fromPublisher(pub2)) + .toMat(TestSink.probe[Int])(Keep.both) + .run() + promise.tryFailure(testException) + sub.expectSubscriptionAndError(testException) + pub1.expectCancellationWithCause(testException) + pub2.expectCancellationWithCause(testException) + } + + "lazy materialization other sources" in { + val materialized = new AtomicBoolean() + Source(1 to 3) + .concatAllLazy(Source.lazySource(() => { + materialized.set(true) + Source.single(4) + })) + .runWith(TestSink.probe) + .request(2) + .expectNext(1, 2) + .cancel() + .expectNoMessage() + materialized.get() shouldBe (false) + } + + "work in example" in { + //#concatAllLazy + val sourceA = Source(List(1, 2, 3)) + val sourceB = Source(List(4, 5, 6)) + val sourceC = Source(List(7, 8, 9)) + sourceA + .concatAllLazy(sourceB, sourceC) + .fold(new StringJoiner(","))((joiner, input) => joiner.add(String.valueOf(input))) + .runWith(Sink.foreach(println)) + //prints 1,2,3,4,5,6,7,8,9 + //#concatAllLazy + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 183707ad44..62e6586cb4 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2474,6 +2474,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.concatLazy(that)) + /** + * Concatenate the given [[Source]]s to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given all those [[Source]]s completes + * + * '''Cancels when''' downstream cancels + */ + @varargs + @SafeVarargs + def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.concatAllLazy(those: _*)) + /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 973d29ec72..7ffa1ecee1 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1211,7 +1211,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * For a concat operator that is detached, use [[#concat]] * - * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. * * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed * @@ -1224,6 +1224,34 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = new Source(delegate.concatLazy(that)) + /** + * Concatenate the given [[Source]]s to this one, meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all the given [[Source]]s completes + * + * '''Cancels when''' downstream cancels + */ + @varargs + @SafeVarargs + def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Source[Out, Mat] = + new Source(delegate.concatAllLazy(those: _*)) + /** * Concatenate this [[Source]] with the given one, meaning that once current * is exhausted and all result elements have been generated, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index a13d8b960f..2b943e1ac0 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1579,6 +1579,34 @@ class SubFlow[In, Out, Mat]( def concatLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = new SubFlow(delegate.concatLazy(that)) + /** + * Concatenate the given [[Source]]s to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given all those [[Source]]s completes + * + * '''Cancels when''' downstream cancels + */ + @varargs + @SafeVarargs + def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubFlow[In, Out, Mat] = + new SubFlow(delegate.concatAllLazy(those: _*)) + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 5c2b100608..ce667d0e66 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1555,6 +1555,34 @@ class SubSource[Out, Mat]( def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = new SubSource(delegate.concatLazy(that)) + /** + * Concatenate the given [[Source]]s to this [[Source]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the [[Source]]s is materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all the given [[Source]]s completes + * + * '''Cancels when''' downstream cancels + */ + @varargs + @SafeVarargs + def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubSource[Out, Mat] = + new SubSource(delegate.concatAllLazy(those: _*)) + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 61258c7aea..9aa126af80 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3157,6 +3157,32 @@ trait FlowOps[+Out, +Mat] { def concatLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = internalConcat(that, detached = false) + /** + * Concatenate the given [[Source]]s to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the [[Source]]s' elements will be produced. + * + * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed + * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the + * time when this source completes. + * + * The second source is then kept from producing elements by asserting back-pressure until its time comes. + * + * For a concat operator that is detached, use [[#concat]] + * + * If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given all those [[Source]]s completes + * + * '''Cancels when''' downstream cancels + */ + def concatAllLazy[U >: Out](those: Graph[SourceShape[U], _]*): Repr[U] = + internalConcatAll(those.toArray, detached = false) + private def internalConcat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], detached: Boolean): Repr[U] = that match { case source if source eq Source.empty => this.asInstanceOf[Repr[U]] @@ -3168,6 +3194,20 @@ trait FlowOps[+Out, +Mat] { } } + private def internalConcatAll[U >: Out](those: Array[Graph[SourceShape[U], _]], detached: Boolean): Repr[U] = + those match { + case those if those.isEmpty => this.asInstanceOf[Repr[U]] + case those if those.length == 1 => internalConcat(those.head, detached) + case _ => + via(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val concat = b.add(Concat[U](those.length + 1, detached)) + for ((that, idx) <- those.zipWithIndex) + that ~> concat.in(idx + 1) + FlowShape(concat.in(0), concat.out) + }) + } + /** * Prepend the given [[Source]] to this [[Flow]], meaning that before elements * are generated from this Flow, the Source's elements will be produced until it