+str #17039 add concatMat to Flow and Source
This commit is contained in:
parent
d92fcf211a
commit
1c112b935d
2 changed files with 31 additions and 5 deletions
|
|
@ -221,15 +221,24 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
* The resulting Flow’s materialized value is a Tuple2 containing both materialized
|
||||
* values (of this Flow and that Source).
|
||||
*/
|
||||
def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] = {
|
||||
def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] =
|
||||
concatMat[Out2, Mat2, (Mat, Mat2)](source, Keep.both)
|
||||
|
||||
/**
|
||||
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
|
||||
* Flow’s input is exhausted and all result elements have been generated,
|
||||
* the Source’s elements will be produced. Note that the Source is materialized
|
||||
* together with this Flow and just kept from producing elements by asserting
|
||||
* back-pressure until its time comes.
|
||||
*/
|
||||
def concatMat[Out2 >: Out, Mat2, Mat3](source: Source[Out2, Mat2], combine: (Mat, Mat2) ⇒ Mat3): Flow[In, Out2, Mat3] =
|
||||
this.viaMat(Flow(source) { implicit builder ⇒
|
||||
s ⇒
|
||||
import FlowGraph.Implicits._
|
||||
val concat = builder.add(Concat[Out2]())
|
||||
s.outlet ~> concat.in(1)
|
||||
(concat.in(0), concat.out)
|
||||
})(Keep.both)
|
||||
}
|
||||
})(combine)
|
||||
|
||||
/** INTERNAL API */
|
||||
override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = {
|
||||
|
|
|
|||
|
|
@ -125,7 +125,15 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
* emitted by that source is emitted after the last element of this
|
||||
* source.
|
||||
*/
|
||||
def concat[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = Source.concat(this, second)
|
||||
def concat[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both)
|
||||
|
||||
/**
|
||||
* Concatenates a second source so that the first element
|
||||
* emitted by that source is emitted after the last element of this
|
||||
* source.
|
||||
*/
|
||||
def concatMat[Out2 >: Out, Mat2, Mat3](second: Source[Out2, Mat2])(
|
||||
combine: (Mat, Mat2) ⇒ Mat3): Source[Out2, Mat3] = Source.concatMat(this, second)(combine)
|
||||
|
||||
/**
|
||||
* Concatenates a second source so that the first element
|
||||
|
|
@ -304,7 +312,16 @@ object Source extends SourceApply {
|
|||
* source.
|
||||
*/
|
||||
def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] =
|
||||
wrap(FlowGraph.partial(source1, source2)(Keep.both) { implicit b ⇒
|
||||
concatMat(source1, source2)(Keep.both)
|
||||
|
||||
/**
|
||||
* Concatenates two sources so that the first element
|
||||
* emitted by the second source is emitted after the last element of the first
|
||||
* source.
|
||||
*/
|
||||
def concatMat[T, Mat1, Mat2, Mat3](source1: Source[T, Mat1], source2: Source[T, Mat2])(
|
||||
combine: (Mat1, Mat2) ⇒ Mat3): Source[T, Mat3] =
|
||||
wrap(FlowGraph.partial(source1, source2)(combine) { implicit b ⇒
|
||||
(s1, s2) ⇒
|
||||
import FlowGraph.Implicits._
|
||||
val c = b.add(Concat[T]())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue