diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md index 73f1e537c7..b6979f80f0 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md @@ -4,8 +4,14 @@ Merge multiple sources. @ref[Fan-in operators](../index.md#fan-in-operators) +@@@div { .group-scala } + ## Signature +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mergeLatest } + +@@@ + ## Description MergeLatest joins elements from N input streams into stream of lists of size N. diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md index bf35942353..a86edbd2cd 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md @@ -8,7 +8,7 @@ Merge multiple sources. ## Description -Merge multiple sources. Prefer one source if all sources has elements ready. +Merge multiple sources. Prefer one source if all sources have elements ready. @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 252cf0d601..b249346b16 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -245,6 +245,9 @@ the inputs in different ways. |Source/Flow|@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.| |Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.| |Source/Flow|@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.| +|Source/Flow|@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.| +|Source/Flow|@ref[mergePreferred](Source-or-Flow/mergePreferred.md)|Merge multiple sources.| +|Source/Flow|@ref[mergePrioritized](Source-or-Flow/mergePrioritized.md)|Merge multiple sources.| |Source/Flow|@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.| |Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.| |Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| @@ -345,6 +348,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [wireTap](Source-or-Flow/wireTap.md) * [interleave](Source-or-Flow/interleave.md) * [merge](Source-or-Flow/merge.md) +* [mergeLatest](Source-or-Flow/mergeLatest.md) +* [mergePreferred](Source-or-Flow/mergePreferred.md) +* [mergePrioritized](Source-or-Flow/mergePrioritized.md) * [mergeSorted](Source-or-Flow/mergeSorted.md) * [zip](Source-or-Flow/zip.md) * [zipAll](Source-or-Flow/zipAll.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 0aa6a93ee4..7ef8831e40 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -65,6 +65,9 @@ class DslConsistencySpec extends WordSpec with Matchers { "zipLatestWithGraph", "zipAllFlow", "mergeGraph", + "mergeLatestGraph", + "mergePreferredGraph", + "mergePrioritizedGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index d56d248567..36382c1ebb 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2476,6 +2476,94 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr eagerComplete: Boolean): javadsl.Flow[In, Out, M2] = new Flow(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF))) + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * '''Emits when''' an element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + */ + def mergeLatest( + that: Graph[SourceShape[Out], _], + eagerComplete: Boolean): javadsl.Flow[In, java.util.List[Out], Mat] = + new Flow(delegate.mergeLatest(that, eagerComplete).map(_.asJava)) + + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergeLatestMat[Mat2, Mat3]( + that: Graph[SourceShape[Out], Mat2], + eagerComplete: Boolean, + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, java.util.List[Out], Mat3] = + new Flow(delegate.mergeLatestMat(that, eagerComplete)(combinerToScala(matF))).map(_.asJava) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'. + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePreferred( + that: Graph[SourceShape[Out], _], + preferred: Boolean, + eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.mergePreferred(that, preferred, eagerComplete)) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergePreferredMat[Mat2, Mat3]( + that: Graph[SourceShape[Out], Mat2], + preferred: Boolean, + eagerComplete: Boolean, + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, Out, Mat3] = + new Flow(delegate.mergePreferredMat(that, preferred, eagerComplete)(combinerToScala(matF))) + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePrioritized( + that: Graph[SourceShape[Out], _], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete)) + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergePrioritizedMat[Mat2, Mat3]( + that: Graph[SourceShape[Out], Mat2], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean, + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, Out, Mat3] = + new Flow(delegate.mergePrioritizedMat(that, leftPriority, rightPriority, eagerComplete)(combinerToScala(matF))) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 6cc1a5f531..0a9d6a9ecd 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1091,6 +1091,98 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ eagerComplete: Boolean): javadsl.Source[Out, M2] = new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF))) + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * '''Emits when''' an element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + */ + def mergeLatest[M]( + that: Graph[SourceShape[Out], M], + eagerComplete: Boolean): javadsl.Source[java.util.List[Out], Mat] = + new Source(delegate.mergeLatest(that, eagerComplete).map(_.asJava)) + + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * @see [[#mergeLatest]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergeLatestMat[Mat2, Mat3]( + that: Graph[SourceShape[Out], Mat2], + eagerComplete: Boolean, + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[java.util.List[Out], Mat3] = + new Source(delegate.mergeLatestMat(that, eagerComplete)(combinerToScala(matF))).map(_.asJava) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'. + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePreferred[M]( + that: Graph[SourceShape[Out], M], + preferred: Boolean, + eagerComplete: Boolean): javadsl.Source[Out, Mat] = + new Source(delegate.mergePreferred(that, preferred, eagerComplete)) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * @see [[#mergePreferred]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergePreferredMat[Mat2, Mat3]( + that: Graph[SourceShape[Out], Mat2], + preferred: Boolean, + eagerComplete: Boolean, + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[Out, Mat3] = + new Source(delegate.mergePreferredMat(that, preferred, eagerComplete)(combinerToScala(matF))) + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePrioritized[M]( + that: Graph[SourceShape[Out], M], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean): javadsl.Source[Out, Mat] = + new Source(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete)) + + /** + * Merge multiple sources. Prefer the sources depending on the 'priority' parameters. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergePrioritizedMat[Mat2, Mat3]( + that: Graph[SourceShape[Out], Mat2], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean, + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[Out, Mat3] = + new Source(delegate.mergePrioritizedMat(that, leftPriority, rightPriority, eagerComplete)(combinerToScala(matF))) + /** * Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index c4cad3c7f1..e107b92754 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1509,6 +1509,52 @@ class SubFlow[In, Out, Mat]( def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubFlow[In, Out, Mat] = new SubFlow(delegate.interleave(that, segmentSize)) + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * '''Emits when''' an element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + */ + def mergeLatest[M]( + that: Graph[SourceShape[Out], M], + eagerComplete: Boolean): javadsl.SubFlow[In, java.util.List[Out], Mat] = + new SubFlow(delegate.mergeLatest(that, eagerComplete).map(_.asJava)) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'. + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePreferred[M]( + that: Graph[SourceShape[Out], M], + preferred: Boolean, + eagerComplete: Boolean): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.mergePreferred(that, preferred, eagerComplete)) + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePrioritized[M]( + that: Graph[SourceShape[Out], M], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete)) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 30a2e33b86..4556421946 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1489,6 +1489,52 @@ class SubSource[Out, Mat]( def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] = new SubSource(delegate.interleave(that, segmentSize)) + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * '''Emits when''' an element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + */ + def mergeLatest[M]( + that: Graph[SourceShape[Out], M], + eagerComplete: Boolean): javadsl.SubSource[java.util.List[Out], Mat] = + new SubSource(delegate.mergeLatest(that, eagerComplete).map(_.asJava)) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'. + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePreferred[M]( + that: Graph[SourceShape[Out], M], + preferred: Boolean, + eagerComplete: Boolean): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.mergePreferred(that, preferred, eagerComplete)) + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePrioritized[M]( + that: Graph[SourceShape[Out], M], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.mergePrioritized(that, leftPriority, rightPriority, eagerComplete)) + /** * Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 4969c8f867..3e1ba68f70 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2631,6 +2631,80 @@ trait FlowOps[+Out, +Mat] { FlowShape(merge.in(0), merge.out) } + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * '''Emits when''' an element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + */ + def mergeLatest[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[immutable.Seq[U]] = + via(mergeLatestGraph(that, eagerComplete)) + + protected def mergeLatestGraph[U >: Out, M]( + that: Graph[SourceShape[U], M], + eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] = + GraphDSL.create(that) { implicit b => r => + val merge = b.add(MergeLatest[U](2, eagerComplete)) + r ~> merge.in(1) + FlowShape(merge.in(0), merge.out) + } + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * '''emits''' when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'. + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePreferred[U >: Out, M]( + that: Graph[SourceShape[U], M], + priority: Boolean, + eagerComplete: Boolean = false): Repr[U] = + via(mergePreferredGraph(that, priority, eagerComplete)) + + protected def mergePreferredGraph[U >: Out, M]( + that: Graph[SourceShape[U], M], + priority: Boolean, + eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] = + GraphDSL.create(that) { implicit b => r => + val merge = b.add(MergePreferred[U](1, eagerComplete)) + r ~> merge.in(if (priority) 0 else 1) + FlowShape(merge.in(if (priority) 1 else 0), merge.out) + } + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + */ + def mergePrioritized[U >: Out, M]( + that: Graph[SourceShape[U], M], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean = false): Repr[U] = + via(mergePrioritizedGraph(that, leftPriority, rightPriority, eagerComplete)) + + protected def mergePrioritizedGraph[U >: Out, M]( + that: Graph[SourceShape[U], M], + leftPriority: Int, + rightPriority: Int, + eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] = + GraphDSL.create(that) { implicit b => r => + val merge = b.add(MergePrioritized[U](Seq(leftPriority, rightPriority), eagerComplete)) + r ~> merge.in(1) + FlowShape(merge.in(0), merge.out) + } + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side @@ -3055,6 +3129,48 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = viaMat(interleaveGraph(that, request, eagerClose))(matF) + /** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each input stream emitted at least one element. + * + * @see [[#mergeLatest]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergeLatestMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerClose: Boolean)( + matF: (Mat, Mat2) => Mat3): ReprMat[immutable.Seq[U], Mat3] = + viaMat(mergeLatestGraph(that, eagerClose))(matF) + + /** + * Merge two sources. Prefer one source if both sources have elements ready. + * + * @see [[#mergePreferred]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergePreferredMat[U >: Out, Mat2, Mat3]( + that: Graph[SourceShape[U], Mat2], + preferred: Boolean, + eagerClose: Boolean)(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = + viaMat(mergePreferredGraph(that, preferred, eagerClose))(matF) + + /** + * Merge two sources. Prefer the sources depending on the 'priority' parameters. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def mergePrioritizedMat[U >: Out, Mat2, Mat3]( + that: Graph[SourceShape[U], Mat2], + leftPriority: Int, + rightPriority: Int, + eagerClose: Boolean)(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] = + viaMat(mergePrioritizedGraph(that, leftPriority, rightPriority, eagerClose))(matF) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side