chore: Add Flow#flattenMerge operator. (#1045)

This commit is contained in:
He-Pin(kerr) 2024-01-27 18:03:23 +08:00 committed by GitHub
parent 8e7ae52a06
commit 4c20580ff2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 59 additions and 2 deletions

View file

@ -0,0 +1,29 @@
# flattenMerge
Flattens a stream of Source into a single output stream by merging.
@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
## Signature
@apidoc[Flow.flattenMerge](Flow) { scala="#flattenMerge%5BT%2C%20M%5D(breadth%3A%20Int)(implicit%20ev%3A%20Out%20%3C%3A%3C%20Graph%5BSourceShape%5BT%5D%2C%20M%5D):FlowOps.this.Repr[T]" }
## Description
Flattens a stream of `Source` into a single output stream by merging, where at most breadth substreams are being consumed
at any given time. This function is equivalent to `flatMapMerge(breadth, identity)`.
Emits when a currently consumed substream has an element available
## Reactive Streams semantics
@@@div { .callout }
**emits** when one of the currently consumed substreams has an element available
**backpressures** when downstream backpressures or the max number of substreams is reached
**completes** when upstream completes and all consumed substreams complete
@@@

View file

@ -251,6 +251,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code
|Source/Flow|<a name="flatmapconcat"></a>@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.|
|Source/Flow|<a name="flatmapmerge"></a>@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.|
|Source/Flow|<a name="flatmapprefix"></a>@ref[flatMapPrefix](Source-or-Flow/flatMapPrefix.md)|Use the first `n` elements from the stream to determine how to process the rest.|
|Flow|<a name="flattenmerge"></a>@ref[flattenMerge](Flow/flattenMerge.md)|Flattens a stream of Source into a single output stream by merging.|
|Source/Flow|<a name="groupby"></a>@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.|
|Source/Flow|<a name="prefixandtail"></a>@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.|
|Source/Flow|<a name="splitafter"></a>@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.|
@ -451,6 +452,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [flatMapConcat](Source-or-Flow/flatMapConcat.md)
* [flatMapMerge](Source-or-Flow/flatMapMerge.md)
* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md)
* [flattenMerge](Flow/flattenMerge.md)
* [flattenOptional](Flow/flattenOptional.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)

View file

@ -91,7 +91,8 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"wireTapGraph",
"orElseGraph",
"divertToGraph",
"flatten")
"flatten",
"flattenMerge")
val forComprehensions = Set("withFilter", "flatMap", "foreach")

View file

@ -51,6 +51,13 @@ class FlowFlattenMergeSpec extends StreamSpec {
.futureValue should ===((0 until 40).toSet)
}
"work in the nominal case with flattenMerge" in {
Source(List(src10(0), src10(10), src10(20), src10(30)))
.flattenMerge(4)
.runWith(toSet)
.futureValue should ===((0 until 40).toSet)
}
"not be held back by one slow stream" in {
Source(List(src10(0), src10(10), blocked, src10(20), src10(30)))
.flatMapMerge(3, identity)

View file

@ -2667,7 +2667,7 @@ trait FlowOps[+Out, +Mat] {
/**
* Flattens a stream of `Source` into a single output stream by concatenation,
* fully consuming one `Source` after the other. This function is qquivalent to <code>flatMapConcat(identity)</code>.
* fully consuming one `Source` after the other. This function is equivalent to <code>flatMapConcat(identity)</code>.
*
* '''Emits when''' a currently consumed substream has an element available
*
@ -2697,6 +2697,24 @@ trait FlowOps[+Out, +Mat] {
def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
map(f).via(new FlattenMerge[T, M](breadth))
/**
* Flattens a stream of `Source` into a single output stream by merging,
* where at most `breadth` substreams are being consumed at any given time.
* This function is equivalent to <code>flatMapMerge(breadth, identity)</code>.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
*/
def flattenMerge[T, M](breadth: Int)(implicit ev: Out <:< Graph[SourceShape[T], M]): Repr[T] =
flatMapMerge(breadth, ev)
/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.InitialTimeoutException]].