diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md
index fe8881d034..d05439eb2e 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md
@@ -21,7 +21,7 @@ Both streams will be materialized together.
The `concat` operator is for backwards compatibility reasons "detached" and will eagerly
demand an element from both upstreams when the stream is materialized and will then have a
one element buffer for each of the upstreams, this is most often not what you want, instead
- use @ref(concatLazy)[concatLazy.md]
+ use @ref:[`concatLazy`](concatLazy.md)
@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md
new file mode 100644
index 0000000000..2ea476b4b2
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatAllLazy.md
@@ -0,0 +1,38 @@
+# concatAllLazy
+
+After completion of the original upstream the elements of the given sources will be emitted sequentially.
+
+@ref[Fan-in operators](../index.md#fan-in-operators)
+
+## Signature
+
+@apidoc[Source.concatAllLazy](Source) { scala="#concatAllLazy[U>:Out](those:akka.stream.Graph[akka.stream.SourceShape[U],_]*):FlowOps.this.Repr[U]" java="#concatAllLazy(akka.stream.Graph*)" }
+@apidoc[Flow.concatAllLazy](Flow) { scala="#concatAllLazy[U>:Out](those:akka.stream.Graph[akka.stream.SourceShape[U],_]*):FlowOps.this.Repr[U]" java="#concatAllLazy(akka.stream.Graph*)" }
+
+
+## Description
+
+After completion of the original upstream the elements of the given sources will be emitted sequentially.
+
+Both streams will be materialized together, however, the given streams will be pulled for the first time only after the original upstream was completed.
+
+To defer the materialization of the given sources (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref:[`Source.lazySource`](../Source/lazySource.md).
+
+## Example
+Scala
+: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala) { #concatAllLazy }
+
+Java
+: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concatAllLazy }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the current stream has an element available; if the current input completes, it tries the next one
+
+**backpressures** when downstream backpressures
+
+**completes** when all upstreams complete
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md
index 6617381c12..8b8aa983fb 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md
@@ -14,9 +14,9 @@ After completion of the original upstream the elements of the given source will
After completion of the original upstream the elements of the given source will be emitted.
-Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref(concat)[concat.md], introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.)
+Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref:[`concat`](concat.md), introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.)
-To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref(Source.lazySource)[../Source/lazySource.md].
+To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref:[`Source.lazySource`](../Source/lazySource.md).
If materialized values needs to be collected `concatLazyMat` is available.
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index d8d893e138..a4170a4f46 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -268,6 +268,7 @@ the inputs in different ways.
|--|--|--|
| |@ref[MergeSequence](MergeSequence.md)|Merge a linear sequence partitioned across multiple sources.|
|Source/Flow|@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
+|Source/Flow|@ref[concatAllLazy](Source-or-Flow/concatAllLazy.md)|After completion of the original upstream the elements of the given sources will be emitted sequentially.|
|Source/Flow|@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|Source/Flow|@ref[interleaveAll](Source-or-Flow/interleaveAll.md)|Emits a specifiable number of elements from the original source, then from the provided sources and repeats.|
@@ -413,6 +414,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [completionStageSource](Source/completionStageSource.md)
* [completionTimeout](Source-or-Flow/completionTimeout.md)
* [concat](Source-or-Flow/concat.md)
+* [concatAllLazy](Source-or-Flow/concatAllLazy.md)
* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index 8a4fde1e74..31109f5c8b 100644
--- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -22,6 +22,7 @@ import akka.japi.function.Function2;
// #prependLazy
// #concat
// #concatLazy
+// #concatAllLazy
// #interleave
// #interleaveAll
// #merge
@@ -38,6 +39,7 @@ import java.util.*;
// #interleaveAll
// #concat
// #concatLazy
+// #concatAllLazy
// #prepend
// #prependLazy
// #or-else
@@ -54,6 +56,7 @@ import akka.stream.Attributes;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
class SourceOrFlow {
private static ActorSystem system = null;
@@ -149,13 +152,25 @@ class SourceOrFlow {
}
void concatLazyExample() {
- // #concat
+ // #concatLazy
Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.concatLazy(sourceB).runForeach(System.out::println, system);
// prints 1, 2, 3, 4, 10, 20, 30, 40
- // #concat
+ // #concatLazy
+ }
+
+ void concatAllLazyExample() {
+ // #concatAllLazy
+ Source sourceA = Source.from(Arrays.asList(1, 2, 3));
+ Source sourceB = Source.from(Arrays.asList(4, 5, 6));
+ Source sourceC = Source.from(Arrays.asList(7, 8 , 9));
+ sourceA.concatAllLazy(sourceB, sourceC)
+ .fold(new StringJoiner(","), (joiner, input) -> joiner.add(String.valueOf(input)))
+ .runForeach(System.out::println, system);
+ //prints 1,2,3,4,5,6,7,8,9
+ // #concatAllLazy
}
void interleaveExample() {
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
index 2a20fed8b5..563c18d1ba 100644
--- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
@@ -277,6 +277,17 @@ public class SourceTest extends StreamTest {
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
}
+ @Test
+ public void mustBeAbleToUseConcatAll() {
+ final Source sourceA = Source.from(Arrays.asList(1, 2, 3));
+ final Source sourceB = Source.from(Arrays.asList(4, 5, 6));
+ final Source sourceC = Source.from(Arrays.asList(7, 8, 9));
+ final TestSubscriber.Probe sub =
+ sourceA.concatAllLazy(sourceB, sourceC).runWith(TestSink.probe(system), system);
+ sub.expectSubscription().request(9);
+ sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete();
+ }
+
@Test
public void mustBeAbleToUsePrepend() {
final TestKit probe = new TestKit(system);
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala
new file mode 100644
index 0000000000..8e099a23fe
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2022 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.stream.testkit._
+import akka.stream.testkit.scaladsl.TestSink
+
+import java.util.StringJoiner
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.util.control.NoStackTrace
+
+class FlowConcatAllLazySpec extends StreamSpec("""
+ akka.stream.materializer.initial-input-buffer-size = 2
+ akka.stream.materializer.max-input-buffer-size = 2
+ """) {
+
+ "ConcatAllLazy" must {
+
+ val testException = new Exception("test") with NoStackTrace
+
+ "work in the happy case" in {
+ val s1 = Source(1 to 2)
+ val s2 = Source(List.empty[Int])
+ val s3 = Source(List(3))
+ val s4 = Source(4 to 6)
+ val s5 = Source(7 to 10)
+ val s6 = Source.empty
+ val s7 = Source.single(11)
+
+ val sub = s1.concatAllLazy(s2, s3, s4, s5, s6, s7).runWith(TestSink.probe[Int]);
+ sub.expectSubscription().request(11)
+ sub.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).expectComplete()
+ }
+
+ "concat single upstream elements to its downstream" in {
+ val sub = Source(1 to 3).concatAllLazy().runWith(TestSink.probe[Int])
+ sub.expectSubscription().request(3)
+ sub.expectNextN(List(1, 2, 3)).expectComplete()
+ }
+
+ "can cancel other upstream sources" in {
+ val pub1 = TestPublisher.probe[Int]()
+ val pub2 = TestPublisher.probe[Int]()
+ Source(1 to 3)
+ .concatAllLazy(Source.fromPublisher(pub1), Source.fromPublisher(pub2))
+ .runWith(TestSink.probe[Int])
+ .request(2)
+ .expectNext(1, 2)
+ .cancel()
+ .expectNoMessage()
+ pub1.expectCancellation()
+ pub2.expectCancellation()
+ }
+
+ "can cancel other upstream sources with error" in {
+ val pub1 = TestPublisher.probe[Int]()
+ val pub2 = TestPublisher.probe[Int]()
+ val (promise, sub) = Source
+ .maybe[Int]
+ .concatAllLazy(Source.fromPublisher(pub1), Source.fromPublisher(pub2))
+ .toMat(TestSink.probe[Int])(Keep.both)
+ .run()
+ promise.tryFailure(testException)
+ sub.expectSubscriptionAndError(testException)
+ pub1.expectCancellationWithCause(testException)
+ pub2.expectCancellationWithCause(testException)
+ }
+
+ "lazy materialization other sources" in {
+ val materialized = new AtomicBoolean()
+ Source(1 to 3)
+ .concatAllLazy(Source.lazySource(() => {
+ materialized.set(true)
+ Source.single(4)
+ }))
+ .runWith(TestSink.probe)
+ .request(2)
+ .expectNext(1, 2)
+ .cancel()
+ .expectNoMessage()
+ materialized.get() shouldBe (false)
+ }
+
+ "work in example" in {
+ //#concatAllLazy
+ val sourceA = Source(List(1, 2, 3))
+ val sourceB = Source(List(4, 5, 6))
+ val sourceC = Source(List(7, 8, 9))
+ sourceA
+ .concatAllLazy(sourceB, sourceC)
+ .fold(new StringJoiner(","))((joiner, input) => joiner.add(String.valueOf(input)))
+ .runWith(Sink.foreach(println))
+ //prints 1,2,3,4,5,6,7,8,9
+ //#concatAllLazy
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 183707ad44..62e6586cb4 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -2474,6 +2474,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concatLazy(that))
+ /**
+ * Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given all those [[Source]]s completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ @varargs
+ @SafeVarargs
+ def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Flow[In, Out, Mat] =
+ new Flow(delegate.concatAllLazy(those: _*))
+
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 973d29ec72..7ffa1ecee1 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -1211,7 +1211,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* For a concat operator that is detached, use [[#concat]]
*
- * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@@ -1224,6 +1224,34 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concatLazy(that))
+ /**
+ * Concatenate the given [[Source]]s to this one, meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' all the given [[Source]]s completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ @varargs
+ @SafeVarargs
+ def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Source[Out, Mat] =
+ new Source(delegate.concatAllLazy(those: _*))
+
/**
* Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated,
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index a13d8b960f..2b943e1ac0 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -1579,6 +1579,34 @@ class SubFlow[In, Out, Mat](
def concatLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.concatLazy(that))
+ /**
+ * Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given all those [[Source]]s completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ @varargs
+ @SafeVarargs
+ def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.concatAllLazy(those: _*))
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index 5c2b100608..ce667d0e66 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -1555,6 +1555,34 @@ class SubSource[Out, Mat](
def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concatLazy(that))
+ /**
+ * Concatenate the given [[Source]]s to this [[Source]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]]s is materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' all the given [[Source]]s completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ @varargs
+ @SafeVarargs
+ def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubSource[Out, Mat] =
+ new SubSource(delegate.concatAllLazy(those: _*))
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 61258c7aea..9aa126af80 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -3157,6 +3157,32 @@ trait FlowOps[+Out, +Mat] {
def concatLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
internalConcat(that, detached = false)
+ /**
+ * Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the [[Source]]s' elements will be produced.
+ *
+ * Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given all those [[Source]]s completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def concatAllLazy[U >: Out](those: Graph[SourceShape[U], _]*): Repr[U] =
+ internalConcatAll(those.toArray, detached = false)
+
private def internalConcat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], detached: Boolean): Repr[U] =
that match {
case source if source eq Source.empty => this.asInstanceOf[Repr[U]]
@@ -3168,6 +3194,20 @@ trait FlowOps[+Out, +Mat] {
}
}
+ private def internalConcatAll[U >: Out](those: Array[Graph[SourceShape[U], _]], detached: Boolean): Repr[U] =
+ those match {
+ case those if those.isEmpty => this.asInstanceOf[Repr[U]]
+ case those if those.length == 1 => internalConcat(those.head, detached)
+ case _ =>
+ via(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val concat = b.add(Concat[U](those.length + 1, detached))
+ for ((that, idx) <- those.zipWithIndex)
+ that ~> concat.in(idx + 1)
+ FlowShape(concat.in(0), concat.out)
+ })
+ }
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it