+str Add concatAllLazy stream operator (#31299)

This commit is contained in:
kerr 2022-09-01 21:07:31 +08:00 committed by GitHub
parent d09d69198e
commit 078ddfa88c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 325 additions and 6 deletions

View file

@ -21,7 +21,7 @@ Both streams will be materialized together.
The `concat` operator is for backwards compatibility reasons "detached" and will eagerly
demand an element from both upstreams when the stream is materialized and will then have a
one element buffer for each of the upstreams, this is most often not what you want, instead
use @ref(concatLazy)[concatLazy.md]
use @ref:[`concatLazy`](concatLazy.md)
@@@

View file

@ -0,0 +1,38 @@
# concatAllLazy
After completion of the original upstream the elements of the given sources will be emitted sequentially.
@ref[Fan-in operators](../index.md#fan-in-operators)
## Signature
@apidoc[Source.concatAllLazy](Source) { scala="#concatAllLazy[U>:Out](those:akka.stream.Graph[akka.stream.SourceShape[U],_]*):FlowOps.this.Repr[U]" java="#concatAllLazy(akka.stream.Graph*)" }
@apidoc[Flow.concatAllLazy](Flow) { scala="#concatAllLazy[U>:Out](those:akka.stream.Graph[akka.stream.SourceShape[U],_]*):FlowOps.this.Repr[U]" java="#concatAllLazy(akka.stream.Graph*)" }
## Description
After completion of the original upstream the elements of the given sources will be emitted sequentially.
Both streams will be materialized together, however, the given streams will be pulled for the first time only after the original upstream was completed.
To defer the materialization of the given sources (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref:[`Source.lazySource`](../Source/lazySource.md).
## Example
Scala
: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllLazySpec.scala) { #concatAllLazy }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concatAllLazy }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the current stream has an element available; if the current input completes, it tries the next one
**backpressures** when downstream backpressures
**completes** when all upstreams complete
@@@

View file

@ -14,9 +14,9 @@ After completion of the original upstream the elements of the given source will
After completion of the original upstream the elements of the given source will be emitted.
Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref(concat)[concat.md], introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.)
Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref:[`concat`](concat.md), introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.)
To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref(Source.lazySource)[../Source/lazySource.md].
To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref:[`Source.lazySource`](../Source/lazySource.md).
If materialized values needs to be collected `concatLazyMat` is available.

View file

@ -268,6 +268,7 @@ the inputs in different ways.
|--|--|--|
| |<a name="mergesequence"></a>@ref[MergeSequence](MergeSequence.md)|Merge a linear sequence partitioned across multiple sources.|
|Source/Flow|<a name="concat"></a>@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|<a name="concatalllazy"></a>@ref[concatAllLazy](Source-or-Flow/concatAllLazy.md)|After completion of the original upstream the elements of the given sources will be emitted sequentially.|
|Source/Flow|<a name="concatlazy"></a>@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|<a name="interleave"></a>@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|Source/Flow|<a name="interleaveall"></a>@ref[interleaveAll](Source-or-Flow/interleaveAll.md)|Emits a specifiable number of elements from the original source, then from the provided sources and repeats.|
@ -413,6 +414,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [completionStageSource](Source/completionStageSource.md)
* [completionTimeout](Source-or-Flow/completionTimeout.md)
* [concat](Source-or-Flow/concat.md)
* [concatAllLazy](Source-or-Flow/concatAllLazy.md)
* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)

View file

@ -22,6 +22,7 @@ import akka.japi.function.Function2;
// #prependLazy
// #concat
// #concatLazy
// #concatAllLazy
// #interleave
// #interleaveAll
// #merge
@ -38,6 +39,7 @@ import java.util.*;
// #interleaveAll
// #concat
// #concatLazy
// #concatAllLazy
// #prepend
// #prependLazy
// #or-else
@ -54,6 +56,7 @@ import akka.stream.Attributes;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
class SourceOrFlow {
private static ActorSystem system = null;
@ -149,13 +152,25 @@ class SourceOrFlow {
}
void concatLazyExample() {
// #concat
// #concatLazy
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.concatLazy(sourceB).runForeach(System.out::println, system);
// prints 1, 2, 3, 4, 10, 20, 30, 40
// #concat
// #concatLazy
}
void concatAllLazyExample() {
// #concatAllLazy
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(4, 5, 6));
Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(7, 8 , 9));
sourceA.concatAllLazy(sourceB, sourceC)
.fold(new StringJoiner(","), (joiner, input) -> joiner.add(String.valueOf(input)))
.runForeach(System.out::println, system);
//prints 1,2,3,4,5,6,7,8,9
// #concatAllLazy
}
void interleaveExample() {

View file

@ -277,6 +277,17 @@ public class SourceTest extends StreamTest {
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
}
@Test
public void mustBeAbleToUseConcatAll() {
final Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3));
final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(4, 5, 6));
final Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(7, 8, 9));
final TestSubscriber.Probe<Integer> sub =
sourceA.concatAllLazy(sourceB, sourceC).runWith(TestSink.probe(system), system);
sub.expectSubscription().request(9);
sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete();
}
@Test
public void mustBeAbleToUsePrepend() {
final TestKit probe = new TestKit(system);

View file

@ -0,0 +1,101 @@
/*
* Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import java.util.StringJoiner
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NoStackTrace
class FlowConcatAllLazySpec extends StreamSpec("""
akka.stream.materializer.initial-input-buffer-size = 2
akka.stream.materializer.max-input-buffer-size = 2
""") {
"ConcatAllLazy" must {
val testException = new Exception("test") with NoStackTrace
"work in the happy case" in {
val s1 = Source(1 to 2)
val s2 = Source(List.empty[Int])
val s3 = Source(List(3))
val s4 = Source(4 to 6)
val s5 = Source(7 to 10)
val s6 = Source.empty
val s7 = Source.single(11)
val sub = s1.concatAllLazy(s2, s3, s4, s5, s6, s7).runWith(TestSink.probe[Int]);
sub.expectSubscription().request(11)
sub.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).expectComplete()
}
"concat single upstream elements to its downstream" in {
val sub = Source(1 to 3).concatAllLazy().runWith(TestSink.probe[Int])
sub.expectSubscription().request(3)
sub.expectNextN(List(1, 2, 3)).expectComplete()
}
"can cancel other upstream sources" in {
val pub1 = TestPublisher.probe[Int]()
val pub2 = TestPublisher.probe[Int]()
Source(1 to 3)
.concatAllLazy(Source.fromPublisher(pub1), Source.fromPublisher(pub2))
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(1, 2)
.cancel()
.expectNoMessage()
pub1.expectCancellation()
pub2.expectCancellation()
}
"can cancel other upstream sources with error" in {
val pub1 = TestPublisher.probe[Int]()
val pub2 = TestPublisher.probe[Int]()
val (promise, sub) = Source
.maybe[Int]
.concatAllLazy(Source.fromPublisher(pub1), Source.fromPublisher(pub2))
.toMat(TestSink.probe[Int])(Keep.both)
.run()
promise.tryFailure(testException)
sub.expectSubscriptionAndError(testException)
pub1.expectCancellationWithCause(testException)
pub2.expectCancellationWithCause(testException)
}
"lazy materialization other sources" in {
val materialized = new AtomicBoolean()
Source(1 to 3)
.concatAllLazy(Source.lazySource(() => {
materialized.set(true)
Source.single(4)
}))
.runWith(TestSink.probe)
.request(2)
.expectNext(1, 2)
.cancel()
.expectNoMessage()
materialized.get() shouldBe (false)
}
"work in example" in {
//#concatAllLazy
val sourceA = Source(List(1, 2, 3))
val sourceB = Source(List(4, 5, 6))
val sourceC = Source(List(7, 8, 9))
sourceA
.concatAllLazy(sourceB, sourceC)
.fold(new StringJoiner(","))((joiner, input) => joiner.add(String.valueOf(input)))
.runWith(Sink.foreach(println))
//prints 1,2,3,4,5,6,7,8,9
//#concatAllLazy
}
}
}

View file

@ -2474,6 +2474,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given all those [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
@varargs
@SafeVarargs
def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concatAllLazy(those: _*))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,

View file

@ -1211,7 +1211,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@ -1224,6 +1224,34 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]]s to this one, meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all the given [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
@varargs
@SafeVarargs
def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Source[Out, Mat] =
new Source(delegate.concatAllLazy(those: _*))
/**
* Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated,

View file

@ -1579,6 +1579,34 @@ class SubFlow[In, Out, Mat](
def concatLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given all those [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
@varargs
@SafeVarargs
def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubFlow[In, Out, Mat] =
new SubFlow(delegate.concatAllLazy(those: _*))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it

View file

@ -1555,6 +1555,34 @@ class SubSource[Out, Mat](
def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]]s to this [[Source]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]]s is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all the given [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
@varargs
@SafeVarargs
def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubSource[Out, Mat] =
new SubSource(delegate.concatAllLazy(those: _*))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it

View file

@ -3157,6 +3157,32 @@ trait FlowOps[+Out, +Mat] {
def concatLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
internalConcat(that, detached = false)
/**
* Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the [[Source]]s' elements will be produced.
*
* Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given all those [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
def concatAllLazy[U >: Out](those: Graph[SourceShape[U], _]*): Repr[U] =
internalConcatAll(those.toArray, detached = false)
private def internalConcat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], detached: Boolean): Repr[U] =
that match {
case source if source eq Source.empty => this.asInstanceOf[Repr[U]]
@ -3168,6 +3194,20 @@ trait FlowOps[+Out, +Mat] {
}
}
private def internalConcatAll[U >: Out](those: Array[Graph[SourceShape[U], _]], detached: Boolean): Repr[U] =
those match {
case those if those.isEmpty => this.asInstanceOf[Repr[U]]
case those if those.length == 1 => internalConcat(those.head, detached)
case _ =>
via(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val concat = b.add(Concat[U](those.length + 1, detached))
for ((that, idx) <- those.zipWithIndex)
that ~> concat.in(idx + 1)
FlowShape(concat.in(0), concat.out)
})
}
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it