From 4c20580ff2a8bbab86f78fa782fd45dfd6db774a Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sat, 27 Jan 2024 18:03:23 +0800 Subject: [PATCH] chore: Add Flow#flattenMerge operator. (#1045) --- .../stream/operators/Flow/flattenMerge.md | 29 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 ++ .../pekko/stream/DslConsistencySpec.scala | 3 +- .../scaladsl/FlowFlattenMergeSpec.scala | 7 +++++ .../apache/pekko/stream/scaladsl/Flow.scala | 20 ++++++++++++- 5 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Flow/flattenMerge.md diff --git a/docs/src/main/paradox/stream/operators/Flow/flattenMerge.md b/docs/src/main/paradox/stream/operators/Flow/flattenMerge.md new file mode 100644 index 0000000000..900bbcb7e5 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Flow/flattenMerge.md @@ -0,0 +1,29 @@ +# flattenMerge + +Flattens a stream of Source into a single output stream by merging. + +@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators) + +## Signature + +@apidoc[Flow.flattenMerge](Flow) { scala="#flattenMerge%5BT%2C%20M%5D(breadth%3A%20Int)(implicit%20ev%3A%20Out%20%3C%3A%3C%20Graph%5BSourceShape%5BT%5D%2C%20M%5D):FlowOps.this.Repr[T]" } + +## Description + +Flattens a stream of `Source` into a single output stream by merging, where at most breadth substreams are being consumed +at any given time. This function is equivalent to `flatMapMerge(breadth, identity)`. +Emits when a currently consumed substream has an element available + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when one of the currently consumed substreams has an element available + +**backpressures** when downstream backpressures or the max number of substreams is reached + +**completes** when upstream completes and all consumed substreams complete + +@@@ + + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index e79c880f21..adbb3183bf 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -251,6 +251,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code |Source/Flow|@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.| |Source/Flow|@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.| |Source/Flow|@ref[flatMapPrefix](Source-or-Flow/flatMapPrefix.md)|Use the first `n` elements from the stream to determine how to process the rest.| +|Flow|@ref[flattenMerge](Flow/flattenMerge.md)|Flattens a stream of Source into a single output stream by merging.| |Source/Flow|@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.| |Source/Flow|@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.| |Source/Flow|@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.| @@ -451,6 +452,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [flatMapConcat](Source-or-Flow/flatMapConcat.md) * [flatMapMerge](Source-or-Flow/flatMapMerge.md) * [flatMapPrefix](Source-or-Flow/flatMapPrefix.md) +* [flattenMerge](Flow/flattenMerge.md) * [flattenOptional](Flow/flattenOptional.md) * [fold](Source-or-Flow/fold.md) * [fold](Sink/fold.md) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index ce1d60bee9..7c78fe1af3 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -91,7 +91,8 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "wireTapGraph", "orElseGraph", "divertToGraph", - "flatten") + "flatten", + "flattenMerge") val forComprehensions = Set("withFilter", "flatMap", "foreach") diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala index 8198d35265..99b342d74b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -51,6 +51,13 @@ class FlowFlattenMergeSpec extends StreamSpec { .futureValue should ===((0 until 40).toSet) } + "work in the nominal case with flattenMerge" in { + Source(List(src10(0), src10(10), src10(20), src10(30))) + .flattenMerge(4) + .runWith(toSet) + .futureValue should ===((0 until 40).toSet) + } + "not be held back by one slow stream" in { Source(List(src10(0), src10(10), blocked, src10(20), src10(30))) .flatMapMerge(3, identity) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index f365368f91..da519cf81b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2667,7 +2667,7 @@ trait FlowOps[+Out, +Mat] { /** * Flattens a stream of `Source` into a single output stream by concatenation, - * fully consuming one `Source` after the other. This function is qquivalent to flatMapConcat(identity). + * fully consuming one `Source` after the other. This function is equivalent to flatMapConcat(identity). * * '''Emits when''' a currently consumed substream has an element available * @@ -2697,6 +2697,24 @@ trait FlowOps[+Out, +Mat] { def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](breadth)) + /** + * Flattens a stream of `Source` into a single output stream by merging, + * where at most `breadth` substreams are being consumed at any given time. + * This function is equivalent to flatMapMerge(breadth, identity). + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + * + * @since 1.1.0 + */ + def flattenMerge[T, M](breadth: Int)(implicit ev: Out <:< Graph[SourceShape[T], M]): Repr[T] = + flatMapMerge(breadth, ev) + /** * If the first element has not passed through this operator before the provided timeout, the stream is failed * with a [[org.apache.pekko.stream.InitialTimeoutException]].