Add fluent API for 2-stream merging (#27020)
* fluent API for 2-stream mergeLatest, mergePreferred and mergePrioritized * Add javadocs and 'eagerClose' paramter to 'mergeLatest' * fluent API's for
This commit is contained in:
parent
0f7dbf6fcb
commit
d3170a56ea
9 changed files with 404 additions and 1 deletions
|
|
@ -4,8 +4,14 @@ Merge multiple sources.
|
||||||
|
|
||||||
@ref[Fan-in operators](../index.md#fan-in-operators)
|
@ref[Fan-in operators](../index.md#fan-in-operators)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
|
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mergeLatest }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
MergeLatest joins elements from N input streams into stream of lists of size N.
|
MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ Merge multiple sources.
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Merge multiple sources. Prefer one source if all sources has elements ready.
|
Merge multiple sources. Prefer one source if all sources have elements ready.
|
||||||
|
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
|
||||||
|
|
@ -245,6 +245,9 @@ the inputs in different ways.
|
||||||
|Source/Flow|<a name="concat"></a>@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
|
|Source/Flow|<a name="concat"></a>@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
|
||||||
|Source/Flow|<a name="interleave"></a>@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|
|Source/Flow|<a name="interleave"></a>@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|
||||||
|Source/Flow|<a name="merge"></a>@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.|
|
|Source/Flow|<a name="merge"></a>@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.|
|
||||||
|
|Source/Flow|<a name="mergelatest"></a>@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.|
|
||||||
|
|Source/Flow|<a name="mergepreferred"></a>@ref[mergePreferred](Source-or-Flow/mergePreferred.md)|Merge multiple sources.|
|
||||||
|
|Source/Flow|<a name="mergeprioritized"></a>@ref[mergePrioritized](Source-or-Flow/mergePrioritized.md)|Merge multiple sources.|
|
||||||
|Source/Flow|<a name="mergesorted"></a>@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.|
|
|Source/Flow|<a name="mergesorted"></a>@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.|
|
||||||
|Source/Flow|<a name="orelse"></a>@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|
|Source/Flow|<a name="orelse"></a>@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|
||||||
|Source/Flow|<a name="prepend"></a>@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|
|Source/Flow|<a name="prepend"></a>@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|
||||||
|
|
@ -345,6 +348,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
* [wireTap](Source-or-Flow/wireTap.md)
|
* [wireTap](Source-or-Flow/wireTap.md)
|
||||||
* [interleave](Source-or-Flow/interleave.md)
|
* [interleave](Source-or-Flow/interleave.md)
|
||||||
* [merge](Source-or-Flow/merge.md)
|
* [merge](Source-or-Flow/merge.md)
|
||||||
|
* [mergeLatest](Source-or-Flow/mergeLatest.md)
|
||||||
|
* [mergePreferred](Source-or-Flow/mergePreferred.md)
|
||||||
|
* [mergePrioritized](Source-or-Flow/mergePrioritized.md)
|
||||||
* [mergeSorted](Source-or-Flow/mergeSorted.md)
|
* [mergeSorted](Source-or-Flow/mergeSorted.md)
|
||||||
* [zip](Source-or-Flow/zip.md)
|
* [zip](Source-or-Flow/zip.md)
|
||||||
* [zipAll](Source-or-Flow/zipAll.md)
|
* [zipAll](Source-or-Flow/zipAll.md)
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,9 @@ class DslConsistencySpec extends WordSpec with Matchers {
|
||||||
"zipLatestWithGraph",
|
"zipLatestWithGraph",
|
||||||
"zipAllFlow",
|
"zipAllFlow",
|
||||||
"mergeGraph",
|
"mergeGraph",
|
||||||
|
"mergeLatestGraph",
|
||||||
|
"mergePreferredGraph",
|
||||||
|
"mergePrioritizedGraph",
|
||||||
"mergeSortedGraph",
|
"mergeSortedGraph",
|
||||||
"interleaveGraph",
|
"interleaveGraph",
|
||||||
"concatGraph",
|
"concatGraph",
|
||||||
|
|
|
||||||
|
|
@ -2476,6 +2476,94 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
eagerComplete: Boolean): javadsl.Flow[In, Out, M2] =
|
eagerComplete: Boolean): javadsl.Flow[In, Out, M2] =
|
||||||
new Flow(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
|
new Flow(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* '''Emits when''' an element is available from some input and each input emits at least one element from stream start
|
||||||
|
*
|
||||||
|
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||||
|
*/
|
||||||
|
def mergeLatest(
|
||||||
|
that: Graph[SourceShape[Out], _],
|
||||||
|
eagerComplete: Boolean): javadsl.Flow[In, java.util.List[Out], Mat] =
|
||||||
|
new Flow(delegate.mergeLatest(that, eagerComplete).map(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergeLatestMat[Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[Out], Mat2],
|
||||||
|
eagerComplete: Boolean,
|
||||||
|
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, java.util.List[Out], Mat3] =
|
||||||
|
new Flow(delegate.mergeLatestMat(that, eagerComplete)(combinerToScala(matF))).map(_.asJava)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'.
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePreferred(
|
||||||
|
that: Graph[SourceShape[Out], _],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] =
|
||||||
|
new Flow(delegate.mergePreferred(that, preferred, eagerComplete))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergePreferredMat[Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[Out], Mat2],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerComplete: Boolean,
|
||||||
|
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, Out, Mat3] =
|
||||||
|
new Flow(delegate.mergePreferredMat(that, preferred, eagerComplete)(combinerToScala(matF)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePrioritized(
|
||||||
|
that: Graph[SourceShape[Out], _],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] =
|
||||||
|
new Flow(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergePrioritizedMat[Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[Out], Mat2],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean,
|
||||||
|
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, Out, Mat3] =
|
||||||
|
new Flow(delegate.mergePrioritizedMat(that, leftPriority, rightPriority, eagerComplete)(combinerToScala(matF)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||||
* picking always the smallest of the available elements (waiting for one element from each side
|
* picking always the smallest of the available elements (waiting for one element from each side
|
||||||
|
|
|
||||||
|
|
@ -1091,6 +1091,98 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
eagerComplete: Boolean): javadsl.Source[Out, M2] =
|
eagerComplete: Boolean): javadsl.Source[Out, M2] =
|
||||||
new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
|
new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* '''Emits when''' an element is available from some input and each input emits at least one element from stream start
|
||||||
|
*
|
||||||
|
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||||
|
*/
|
||||||
|
def mergeLatest[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
eagerComplete: Boolean): javadsl.Source[java.util.List[Out], Mat] =
|
||||||
|
new Source(delegate.mergeLatest(that, eagerComplete).map(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* @see [[#mergeLatest]].
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergeLatestMat[Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[Out], Mat2],
|
||||||
|
eagerComplete: Boolean,
|
||||||
|
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[java.util.List[Out], Mat3] =
|
||||||
|
new Source(delegate.mergeLatestMat(that, eagerComplete)(combinerToScala(matF))).map(_.asJava)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'.
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePreferred[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerComplete: Boolean): javadsl.Source[Out, Mat] =
|
||||||
|
new Source(delegate.mergePreferred(that, preferred, eagerComplete))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* @see [[#mergePreferred]]
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergePreferredMat[Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[Out], Mat2],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerComplete: Boolean,
|
||||||
|
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[Out, Mat3] =
|
||||||
|
new Source(delegate.mergePreferredMat(that, preferred, eagerComplete)(combinerToScala(matF)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePrioritized[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean): javadsl.Source[Out, Mat] =
|
||||||
|
new Source(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge multiple sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergePrioritizedMat[Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[Out], Mat2],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean,
|
||||||
|
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[Out, Mat3] =
|
||||||
|
new Source(delegate.mergePrioritizedMat(that, leftPriority, rightPriority, eagerComplete)(combinerToScala(matF)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
|
||||||
* picking always the smallest of the available elements (waiting for one element from each side
|
* picking always the smallest of the available elements (waiting for one element from each side
|
||||||
|
|
|
||||||
|
|
@ -1509,6 +1509,52 @@ class SubFlow[In, Out, Mat](
|
||||||
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubFlow[In, Out, Mat] =
|
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubFlow[In, Out, Mat] =
|
||||||
new SubFlow(delegate.interleave(that, segmentSize))
|
new SubFlow(delegate.interleave(that, segmentSize))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* '''Emits when''' an element is available from some input and each input emits at least one element from stream start
|
||||||
|
*
|
||||||
|
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||||
|
*/
|
||||||
|
def mergeLatest[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
eagerComplete: Boolean): javadsl.SubFlow[In, java.util.List[Out], Mat] =
|
||||||
|
new SubFlow(delegate.mergeLatest(that, eagerComplete).map(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'.
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePreferred[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerComplete: Boolean): javadsl.SubFlow[In, Out, Mat] =
|
||||||
|
new SubFlow(delegate.mergePreferred(that, preferred, eagerComplete))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePrioritized[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean): javadsl.SubFlow[In, Out, Mat] =
|
||||||
|
new SubFlow(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||||
* picking always the smallest of the available elements (waiting for one element from each side
|
* picking always the smallest of the available elements (waiting for one element from each side
|
||||||
|
|
|
||||||
|
|
@ -1489,6 +1489,52 @@ class SubSource[Out, Mat](
|
||||||
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] =
|
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] =
|
||||||
new SubSource(delegate.interleave(that, segmentSize))
|
new SubSource(delegate.interleave(that, segmentSize))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* '''Emits when''' an element is available from some input and each input emits at least one element from stream start
|
||||||
|
*
|
||||||
|
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||||
|
*/
|
||||||
|
def mergeLatest[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
eagerComplete: Boolean): javadsl.SubSource[java.util.List[Out], Mat] =
|
||||||
|
new SubSource(delegate.mergeLatest(that, eagerComplete).map(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'.
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePreferred[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerComplete: Boolean): javadsl.SubSource[Out, Mat] =
|
||||||
|
new SubSource(delegate.mergePreferred(that, preferred, eagerComplete))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePrioritized[M](
|
||||||
|
that: Graph[SourceShape[Out], M],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean): javadsl.SubSource[Out, Mat] =
|
||||||
|
new SubSource(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
|
||||||
* picking always the smallest of the available elements (waiting for one element from each side
|
* picking always the smallest of the available elements (waiting for one element from each side
|
||||||
|
|
|
||||||
|
|
@ -2631,6 +2631,80 @@ trait FlowOps[+Out, +Mat] {
|
||||||
FlowShape(merge.in(0), merge.out)
|
FlowShape(merge.in(0), merge.out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* '''Emits when''' an element is available from some input and each input emits at least one element from stream start
|
||||||
|
*
|
||||||
|
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||||
|
*/
|
||||||
|
def mergeLatest[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[immutable.Seq[U]] =
|
||||||
|
via(mergeLatestGraph(that, eagerComplete))
|
||||||
|
|
||||||
|
protected def mergeLatestGraph[U >: Out, M](
|
||||||
|
that: Graph[SourceShape[U], M],
|
||||||
|
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] =
|
||||||
|
GraphDSL.create(that) { implicit b => r =>
|
||||||
|
val merge = b.add(MergeLatest[U](2, eagerComplete))
|
||||||
|
r ~> merge.in(1)
|
||||||
|
FlowShape(merge.in(0), merge.out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'.
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePreferred[U >: Out, M](
|
||||||
|
that: Graph[SourceShape[U], M],
|
||||||
|
priority: Boolean,
|
||||||
|
eagerComplete: Boolean = false): Repr[U] =
|
||||||
|
via(mergePreferredGraph(that, priority, eagerComplete))
|
||||||
|
|
||||||
|
protected def mergePreferredGraph[U >: Out, M](
|
||||||
|
that: Graph[SourceShape[U], M],
|
||||||
|
priority: Boolean,
|
||||||
|
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
|
||||||
|
GraphDSL.create(that) { implicit b => r =>
|
||||||
|
val merge = b.add(MergePreferred[U](1, eagerComplete))
|
||||||
|
r ~> merge.in(if (priority) 0 else 1)
|
||||||
|
FlowShape(merge.in(if (priority) 1 else 0), merge.out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
|
||||||
|
*
|
||||||
|
* '''backpressures''' when downstream backpressures
|
||||||
|
*
|
||||||
|
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
|
||||||
|
*/
|
||||||
|
def mergePrioritized[U >: Out, M](
|
||||||
|
that: Graph[SourceShape[U], M],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean = false): Repr[U] =
|
||||||
|
via(mergePrioritizedGraph(that, leftPriority, rightPriority, eagerComplete))
|
||||||
|
|
||||||
|
protected def mergePrioritizedGraph[U >: Out, M](
|
||||||
|
that: Graph[SourceShape[U], M],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
|
||||||
|
GraphDSL.create(that) { implicit b => r =>
|
||||||
|
val merge = b.add(MergePrioritized[U](Seq(leftPriority, rightPriority), eagerComplete))
|
||||||
|
r ~> merge.in(1)
|
||||||
|
FlowShape(merge.in(0), merge.out)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||||
* picking always the smallest of the available elements (waiting for one element from each side
|
* picking always the smallest of the available elements (waiting for one element from each side
|
||||||
|
|
@ -3055,6 +3129,48 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
||||||
matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
|
matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
|
||||||
viaMat(interleaveGraph(that, request, eagerClose))(matF)
|
viaMat(interleaveGraph(that, request, eagerClose))(matF)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MergeLatest joins elements from N input streams into stream of lists of size N.
|
||||||
|
* i-th element in list is the latest emitted element from i-th input stream.
|
||||||
|
* MergeLatest emits list for each element emitted from some input stream,
|
||||||
|
* but only after each input stream emitted at least one element.
|
||||||
|
*
|
||||||
|
* @see [[#mergeLatest]].
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergeLatestMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerClose: Boolean)(
|
||||||
|
matF: (Mat, Mat2) => Mat3): ReprMat[immutable.Seq[U], Mat3] =
|
||||||
|
viaMat(mergeLatestGraph(that, eagerClose))(matF)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer one source if both sources have elements ready.
|
||||||
|
*
|
||||||
|
* @see [[#mergePreferred]]
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergePreferredMat[U >: Out, Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[U], Mat2],
|
||||||
|
preferred: Boolean,
|
||||||
|
eagerClose: Boolean)(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
|
||||||
|
viaMat(mergePreferredGraph(that, preferred, eagerClose))(matF)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge two sources. Prefer the sources depending on the 'priority' parameters.
|
||||||
|
*
|
||||||
|
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||||
|
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||||
|
*/
|
||||||
|
def mergePrioritizedMat[U >: Out, Mat2, Mat3](
|
||||||
|
that: Graph[SourceShape[U], Mat2],
|
||||||
|
leftPriority: Int,
|
||||||
|
rightPriority: Int,
|
||||||
|
eagerClose: Boolean)(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
|
||||||
|
viaMat(mergePrioritizedGraph(that, leftPriority, rightPriority, eagerClose))(matF)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||||
* picking always the smallest of the available elements (waiting for one element from each side
|
* picking always the smallest of the available elements (waiting for one element from each side
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue