diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index cdc5b0e3ac..45b6c42709 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -783,52 +783,82 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the Source is materialized together with this Flow and just kept + * Note that the [[Source]] is materialized together with this Flow and just kept * from producing elements by asserting back-pressure until its time comes. * - * If this [[Flow]] gets upstream error - no elements from the source will be pulled. + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels */ - def concat[T >: Out, M](source: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.concat(source)) + def concat[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.concat(that)) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the Source is materialized together with this Flow and just kept + * Note that the [[Source]] is materialized together with this Flow and just kept * from producing elements by asserting back-pressure until its time comes. * - * If this [[Flow]] gets upstream error - no elements from the source will be pulled. + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * @see [[#concat]] */ - def concatMat[T >: Out, M, M2](source: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.concatMat(source)(combinerToScala(matF))) + def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.concatMat(that)(combinerToScala(matF))) /** - * Merge current [[Flow]] with the given [[Source]], taking elements as they arrive, + * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete + * + * '''Cancels when''' downstream cancels */ - def merge[T >: Out](source: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.merge(source)) + def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.merge(that)) /** - * Merge current [[Flow]] with the given [[Source]], taking elements as they arrive, - * picking randomly when several elements readt. + * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * @see [[#merge]] */ - def mergeMat[T >: Out, M, M2](source: Graph[SourceShape[T], M], + def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.mergeMat(source)(combinerToScala(matF))) + new Flow(delegate.mergeMat(that)(combinerToScala(matF))) /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels */ def zip[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out @uncheckedVariance Pair T, Mat] = zipMat(source, Keep.left) /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. + * + * @see [[#zip]] */ - def zipMat[T, M, M2](source: Graph[SourceShape[T], M], + def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = { //we need this only to have Flow of javadsl.Pair def block(builder: FlowGraph.Builder[M], @@ -837,25 +867,35 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph builder.from(source).to(zip.in1) new Pair(zip.in0, zip.out) } - this.viaMat(Flow.factory.create(source, combinerToJava(block)), matF) + this.viaMat(Flow.factory.create(that, combinerToJava(block)), matF) } /** - * Put together elements of current [[Flow]] and the given [[Source]] + * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels */ - def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _], + def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _], combine: function.Function2[Out, Out2, Out3]): javadsl.Flow[In, Out3, Mat] = - new Flow(delegate.zipWith[Out2, Out3](source)(combinerToScala(combine))) + new Flow(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine))) /** - * Put together elements of current [[Flow]] and the given [[Source]] + * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. + * + * @see [[#zipWith]] */ - def zipWithMat[Out2, Out3, M, M2](source: Graph[SourceShape[Out2], M], + def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], combine: function.Function2[Out, Out2, Out3], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = - new Flow(delegate.zipWithMat[Out2, Out3, M, M2](source)(combinerToScala(combine))(combinerToScala(matF))) + new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 940f3c796f..8a775cd18b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -304,47 +304,87 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour runWith(Sink.fold(zero, f), materializer) /** - * Concatenate the second [[Source]] to current one, meaning that once current + * Concatenate this [[Source]] with the given one, meaning that once current * is exhausted and all result elements have been generated, - * the second Source’s elements will be produced. + * the given source elements will be produced. + * + * Note that given [[Source]] is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * '''Emits when''' element is available from current source or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels */ - def concat[T >: Out, M](second: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = - new Source(delegate.concat(second)) + def concat[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + new Source(delegate.concat(that)) /** - * Concatenate the second [[Source]] to current one, meaning that once current + * Concatenate this [[Source]] with the given one, meaning that once current * is exhausted and all result elements have been generated, - * the second Source’s elements will be produced. + * the given source elements will be produced. + * + * Note that given [[Source]] is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * @see [[#concat]]. */ - def concatMat[T >: Out, M, M2](second: Graph[SourceShape[T], M], + def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.concatMat(second)(combinerToScala(matF))) + new Source(delegate.concatMat(that)(combinerToScala(matF))) /** - * Merge current source with the second one, taking elements as they arrive, + * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete + * + * '''Cancels when''' downstream cancels */ - def merge[T >: Out](second: Graph[SourceShape[T], _]): javadsl.Source[T, Mat] = - new Source(delegate.merge(second)) + def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Source[T, Mat] = + new Source(delegate.merge(that)) /** - * Merge current source with the second one, taking elements as they arrive, + * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. + * + * @see [[#merge]]. */ - def mergeMat[T >: Out, M, M2](second: Graph[SourceShape[T], M], + def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.mergeMat(second)(combinerToScala(matF))) + new Source(delegate.mergeMat(that)(combinerToScala(matF))) /** - * Combine the elements of current [[Source]] and the second one into a stream of tuples. + * Combine the elements of current [[Source]] and the given one into a stream of tuples. + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels */ - def zip[T](second: Graph[SourceShape[T], _]): javadsl.Source[Out @uncheckedVariance Pair T, Mat] = - zipMat(second, combinerToJava((a: Mat, b: Any) ⇒ a)) + def zip[T](that: Graph[SourceShape[T], _]): javadsl.Source[Out @uncheckedVariance Pair T, Mat] = + zipMat(that, combinerToJava((a: Mat, b: Any) ⇒ a)) /** - * Combine the elements of current [[Source]] and the second one into a stream of tuples. + * Combine the elements of current [[Source]] and the given one into a stream of tuples. + * + * @see [[#zip]]. */ - def zipMat[T, M, M2](second: Graph[SourceShape[T], M], + def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = { //we need this only to have Flow of javadsl.Pair def block(builder: FlowGraph.Builder[M], @@ -353,25 +393,35 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour builder.from(source).to(zip.in1) new Pair(zip.in0, zip.out) } - this.viaMat(Flow.factory.create(second, combinerToJava(block)), matF) + this.viaMat(Flow.factory.create(that, combinerToJava(block)), matF) } /** - * Put together elements of current [[Source]] and the second one + * Put together the elements of current [[Source]] and the given one * into a stream of combined elements using a combiner function. + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels */ - def zipWith[Out2, Out3](second: Graph[SourceShape[Out2], _], + def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _], combine: function.Function2[Out, Out2, Out3]): javadsl.Source[Out3, Mat] = - new Source(delegate.zipWith[Out2, Out3](second)(combinerToScala(combine))) + new Source(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine))) /** - * Put together elements of current [[Source]] and the second one + * Put together the elements of current [[Source]] and the given one * into a stream of combined elements using a combiner function. + * + * @see [[#zipWith]]. */ - def zipWithMat[Out2, Out3, M, M2](second: Graph[SourceShape[Out2], M], + def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], combine: function.Function2[Out, Out2, Out3], matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] = - new Source(delegate.zipWithMat[Out2, Out3, M, M2](second)(combinerToScala(combine))(combinerToScala(matF))) + new Source(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c544f07332..8ac2d31770 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -956,7 +956,7 @@ trait FlowOps[+Out, +Mat] { andThen(Stages.Log(name, extract.asInstanceOf[Any ⇒ Any], Option(log))) /** - * Combine the elements of current flow and given [[Source]] into a stream of tuples. + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. * * '''Emits when''' all of the inputs has an element available * @@ -966,13 +966,15 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zip[U](source: Graph[SourceShape[U], _]): Repr[(Out, U), Mat] = zipMat(source)(Keep.left) + def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U), Mat] = zipMat(that)(Keep.left) /** - * Combine the elements of current flow and given [[Source]] into a stream of tuples. + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * @see [[#zip]]. */ - def zipMat[U, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[(Out, U), Mat3] = - this.viaMat(Flow(source) { implicit b ⇒ + def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[(Out, U), Mat3] = + this.viaMat(Flow(that) { implicit b ⇒ r ⇒ import FlowGraph.Implicits._ val zip = b.add(Zip[Out, U]()) @@ -981,7 +983,7 @@ trait FlowOps[+Out, +Mat] { })(matF) /** - * Put together the elements of current flow and given [[Source]] + * Put together the elements of current flow and the given [[Source]] * into a stream of combined elements using a combiner function. * * '''Emits when''' all of the inputs has an element available @@ -992,15 +994,17 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _])(combine: (Out, Out2) ⇒ Out3): Repr[Out3, Mat] = - zipWithMat(source)(combine)(Keep.left) + def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) ⇒ Out3): Repr[Out3, Mat] = + zipWithMat(that)(combine)(Keep.left) /** - * Put together the elements of current flow and given [[Source]] + * Put together the elements of current flow and the given [[Source]] * into a stream of combined elements using a combiner function. + * + * @see [[#zipWith]]. */ - def zipWithMat[Out2, Out3, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) ⇒ Out3)(matF: (Mat, Mat2) ⇒ Mat3): Repr[Out3, Mat3] = - this.viaMat(Flow(source) { implicit b ⇒ + def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) ⇒ Out3)(matF: (Mat, Mat2) ⇒ Mat3): Repr[Out3, Mat3] = + this.viaMat(Flow(that) { implicit b ⇒ r ⇒ import FlowGraph.Implicits._ val zip = b.add(ZipWith[Out, Out2, Out3](combine)) @@ -1020,15 +1024,17 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def merge[U >: Out](source: Graph[SourceShape[U], _]): Repr[U, Mat] = - mergeMat(source)(Keep.left) + def merge[U >: Out](that: Graph[SourceShape[U], _]): Repr[U, Mat] = + mergeMat(that)(Keep.left) /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. + * + * @see [[#merge]]. */ - def mergeMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = - this.viaMat(Flow(source) { implicit b ⇒ + def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + this.viaMat(Flow(that) { implicit b ⇒ r ⇒ import FlowGraph.Implicits._ val merge = b.add(Merge[U](2)) @@ -1041,34 +1047,36 @@ trait FlowOps[+Out, +Mat] { * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the Source is materialized together with this Flow and just kept + * Note that the [[Source]] is materialized together with this Flow and just kept * from producing elements by asserting back-pressure until its time comes. * - * If this [[Flow]] gets upstream error - no elements from the source will be pulled. + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * - * '''Emits when''' element is available from current stream or from second stream when current is completed + * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' second stream completes + * '''Completes when''' given [[Source]] completes * * '''Cancels when''' downstream cancels */ - def concat[U >: Out, Mat2](source: Graph[SourceShape[U], Mat2]): Repr[U, Mat] = - concatMat(source)(Keep.left) + def concat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U, Mat] = + concatMat(that)(Keep.left) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, * the Source’s elements will be produced. * - * Note that the Source is materialized together with this Flow and just kept + * Note that the [[Source]] is materialized together with this Flow and just kept * from producing elements by asserting back-pressure until its time comes. * - * If this [[Flow]] gets upstream error - no elements from the source will be pulled. + * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. + * + * @see [[#concat]]. */ - def concatMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = - this.viaMat(Flow(source) { implicit b ⇒ + def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + this.viaMat(Flow(that) { implicit b ⇒ r ⇒ import FlowGraph.Implicits._ val merge = b.add(Concat[U]()) @@ -1076,6 +1084,15 @@ trait FlowOps[+Out, +Mat] { (merge.in(0), merge.out) })(matF) + /** + * Concatenates this [[Flow]] with the given [[Source]] so the first element + * emitted by that source is emitted after the last element of this + * flow. + * + * This is a shorthand for [[concat]] + */ + def ++[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U, Mat] = concat(that) + def withAttributes(attr: Attributes): Repr[Out, Mat] /** INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 2b7f5c4eda..f50964f817 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -104,15 +104,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) */ def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f)) - /** - * Concatenates a second source so that the first element - * emitted by that source is emitted after the last element of this - * source. - * - * This is a shorthand for [[concat]] - */ - def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, Mat] = concat(second) - /** * Nests the current Source and returns a Source with the given Attributes * @param attr the attributes to add