+str #18411 add FlowOps.zip/zipWith/merge/concat operators

This commit is contained in:
Alexander Golubev 2015-10-03 23:26:06 -04:00
parent 993e545e99
commit c8428a1bc3
4 changed files with 181 additions and 83 deletions

View file

@ -956,7 +956,7 @@ trait FlowOps[+Out, +Mat] {
andThen(Stages.Log(name, extract.asInstanceOf[Any Any], Option(log)))
/**
* Combine the elements of current flow and given [[Source]] into a stream of tuples.
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' all of the inputs has an element available
*
@ -966,13 +966,15 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def zip[U](source: Graph[SourceShape[U], _]): Repr[(Out, U), Mat] = zipMat(source)(Keep.left)
def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U), Mat] = zipMat(that)(Keep.left)
/**
* Combine the elements of current flow and given [[Source]] into a stream of tuples.
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* @see [[#zip]].
*/
def zipMat[U, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[(Out, U), Mat3] =
this.viaMat(Flow(source) { implicit b
def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[(Out, U), Mat3] =
this.viaMat(Flow(that) { implicit b
r
import FlowGraph.Implicits._
val zip = b.add(Zip[Out, U]())
@ -981,7 +983,7 @@ trait FlowOps[+Out, +Mat] {
})(matF)
/**
* Put together the elements of current flow and given [[Source]]
* Put together the elements of current flow and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* '''Emits when''' all of the inputs has an element available
@ -992,15 +994,17 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _])(combine: (Out, Out2) Out3): Repr[Out3, Mat] =
zipWithMat(source)(combine)(Keep.left)
def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) Out3): Repr[Out3, Mat] =
zipWithMat(that)(combine)(Keep.left)
/**
* Put together the elements of current flow and given [[Source]]
* Put together the elements of current flow and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* @see [[#zipWith]].
*/
def zipWithMat[Out2, Out3, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): Repr[Out3, Mat3] =
this.viaMat(Flow(source) { implicit b
def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): Repr[Out3, Mat3] =
this.viaMat(Flow(that) { implicit b
r
import FlowGraph.Implicits._
val zip = b.add(ZipWith[Out, Out2, Out3](combine))
@ -1020,15 +1024,17 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def merge[U >: Out](source: Graph[SourceShape[U], _]): Repr[U, Mat] =
mergeMat(source)(Keep.left)
def merge[U >: Out](that: Graph[SourceShape[U], _]): Repr[U, Mat] =
mergeMat(that)(Keep.left)
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* @see [[#merge]].
*/
def mergeMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(Flow(source) { implicit b
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(Flow(that) { implicit b
r
import FlowGraph.Implicits._
val merge = b.add(Merge[U](2))
@ -1041,34 +1047,36 @@ trait FlowOps[+Out, +Mat] {
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the Source is materialized together with this Flow and just kept
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the source will be pulled.
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from second stream when current is completed
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' second stream completes
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concat[U >: Out, Mat2](source: Graph[SourceShape[U], Mat2]): Repr[U, Mat] =
concatMat(source)(Keep.left)
def concat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U, Mat] =
concatMat(that)(Keep.left)
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the Source is materialized together with this Flow and just kept
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the source will be pulled.
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* @see [[#concat]].
*/
def concatMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(Flow(source) { implicit b
def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(Flow(that) { implicit b
r
import FlowGraph.Implicits._
val merge = b.add(Concat[U]())
@ -1076,6 +1084,15 @@ trait FlowOps[+Out, +Mat] {
(merge.in(0), merge.out)
})(matF)
/**
* Concatenates this [[Flow]] with the given [[Source]] so the first element
* emitted by that source is emitted after the last element of this
* flow.
*
* This is a shorthand for [[concat]]
*/
def ++[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U, Mat] = concat(that)
def withAttributes(attr: Attributes): Repr[Out, Mat]
/** INTERNAL API */