diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 5deebf282f..1f59f6a48f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -221,15 +221,24 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The resulting Flow’s materialized value is a Tuple2 containing both materialized * values (of this Flow and that Source). */ - def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] = { + def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] = + concatMat[Out2, Mat2, (Mat, Mat2)](source, Keep.both) + + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. Note that the Source is materialized + * together with this Flow and just kept from producing elements by asserting + * back-pressure until its time comes. + */ + def concatMat[Out2 >: Out, Mat2, Mat3](source: Source[Out2, Mat2], combine: (Mat, Mat2) ⇒ Mat3): Flow[In, Out2, Mat3] = this.viaMat(Flow(source) { implicit builder ⇒ s ⇒ import FlowGraph.Implicits._ val concat = builder.add(Concat[Out2]()) s.outlet ~> concat.in(1) (concat.in(0), concat.out) - })(Keep.both) - } + })(combine) /** INTERNAL API */ override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 54fb7ca5b2..917bca52b5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -125,7 +125,15 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * emitted by that source is emitted after the last element of this * source. */ - def concat[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = Source.concat(this, second) + def concat[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both) + + /** + * Concatenates a second source so that the first element + * emitted by that source is emitted after the last element of this + * source. + */ + def concatMat[Out2 >: Out, Mat2, Mat3](second: Source[Out2, Mat2])( + combine: (Mat, Mat2) ⇒ Mat3): Source[Out2, Mat3] = Source.concatMat(this, second)(combine) /** * Concatenates a second source so that the first element @@ -304,7 +312,16 @@ object Source extends SourceApply { * source. */ def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] = - wrap(FlowGraph.partial(source1, source2)(Keep.both) { implicit b ⇒ + concatMat(source1, source2)(Keep.both) + + /** + * Concatenates two sources so that the first element + * emitted by the second source is emitted after the last element of the first + * source. + */ + def concatMat[T, Mat1, Mat2, Mat3](source1: Source[T, Mat1], source2: Source[T, Mat2])( + combine: (Mat1, Mat2) ⇒ Mat3): Source[T, Mat3] = + wrap(FlowGraph.partial(source1, source2)(combine) { implicit b ⇒ (s1, s2) ⇒ import FlowGraph.Implicits._ val c = b.add(Concat[T]())