diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index aa381192af..47652cc5ce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -23,7 +23,7 @@ object FlowGroupBySpec { import language.higherKinds implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal { - def lift(key: Int ⇒ Int) = f.prefixAndTail(1).map(p ⇒ key(p._1.head) -> (Source.single(p._1.head) ++ p._2)).mergeSubstreams + def lift(key: Int ⇒ Int) = f.prefixAndTail(1).map(p ⇒ key(p._1.head) -> (Source.single(p._1.head) ++ p._2)).concatSubstreams } } @@ -117,7 +117,8 @@ class FlowGroupBySpec extends AkkaSpec with ScalaFutures with ConversionCheckedT .mergeSubstreams .grouped(10) .runWith(Sink.head) - .futureValue(Timeout(3.seconds)) should ===(List(List("Aaa", "Abb"), List("Bcc"), List("Cdd", "Cee"))) + .futureValue(Timeout(3.seconds)) + .sortBy(_.head) should ===(List(List("Aaa", "Abb"), List("Bcc"), List("Cdd", "Cee"))) } "accept cancellation of substreams" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index 5a257b743d..cb8951606f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -28,9 +28,7 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit], override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction) - override def mergeSubstreams: F[Out] = mergeBackFunction(subFlow, Int.MaxValue) - - override def concatSubstreams: F[Out] = mergeBackFunction(subFlow, 1) + override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth) def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 70f7c43cc3..9962413f3a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -27,17 +27,34 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[In, Out, Mat]#Repr, scaladsl.Sink[In, Mat]] @uncheckedVariance = delegate /** - * Flatten the sub-flows back into the super-flow by performing a merge. + * Flatten the sub-flows back into the super-flow by performing a merge + * without parallelism limit (i.e. having an unbounded number of sub-flows + * active concurrently). + * + * This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`. */ def mergeSubstreams(): Flow[In, Out, Mat] = new Flow(delegate.mergeSubstreams) + /** + * Flatten the sub-flows back into the super-flow by performing a merge + * with the given parallelism limit. This means that only up to `parallelism` + * substreams will be executed at any given time. Substreams that are not + * yet executed are also not materialized, meaning that back-pressure will + * be exerted at the operator that creates the substreams when the parallelism + * limit is reached. + */ + def mergeSubstreamsWithParallelism(parallelism: Int): Flow[In, Out, Mat] = + new Flow(delegate.mergeSubstreamsWithParallelism(parallelism)) + /** * Flatten the sub-flows back into the super-flow by concatenating them. * This is usually a bad idea when combined with `groupBy` since it can * easily lead to deadlock—the concatenation does not consume from the second * substream until the first has finished and the `groupBy` stage will get * back-pressure from the second stream. + * + * This is identical in effect to `mergeSubstreamsWithParallelism(1)`. */ def concatSubstreams(): Flow[In, Out, Mat] = new Flow(delegate.concatSubstreams) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 3200688900..c56c767a99 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -27,17 +27,34 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]] @uncheckedVariance = delegate /** - * Flatten the sub-flows back into the super-flow by performing a merge. + * Flatten the sub-flows back into the super-source by performing a merge + * without parallelism limit (i.e. having an unbounded number of sub-flows + * active concurrently). + * + * This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`. */ def mergeSubstreams(): Source[Out, Mat] = new Source(delegate.mergeSubstreams) /** - * Flatten the sub-flows back into the super-flow by concatenating them. + * Flatten the sub-flows back into the super-source by performing a merge + * with the given parallelism limit. This means that only up to `parallelism` + * substreams will be executed at any given time. Substreams that are not + * yet executed are also not materialized, meaning that back-pressure will + * be exerted at the operator that creates the substreams when the parallelism + * limit is reached. + */ + def mergeSubstreamsWithParallelism(parallelism: Int): Source[Out, Mat] = + new Source(delegate.mergeSubstreamsWithParallelism(parallelism)) + + /** + * Flatten the sub-flows back into the super-source by concatenating them. * This is usually a bad idea when combined with `groupBy` since it can * easily lead to deadlock—the concatenation does not consume from the second * substream until the first has finished and the `groupBy` stage will get * back-pressure from the second stream. + * + * This is identical in effect to `mergeSubstreamsWithParallelism(1)`. */ def concatSubstreams(): Source[Out, Mat] = new Source(delegate.concatSubstreams) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 15e0939782..697436402d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -931,10 +931,7 @@ trait FlowOps[+Out, +Mat] { override def apply[T](flow: Flow[Out, T, Unit], breadth: Int): Repr[T] = deprecatedAndThen[Source[Out, Unit]](GroupBy(maxSubstreams, f.asInstanceOf[Any ⇒ Any])) .map(_.via(flow)) - .flatMapMerge(maxSubstreams, conforms) - /* - * FIXME remove all those commented workarounds above by implementing flatMapMerge(breadth) - */ + .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, Unit]) ⇒ Closed = s ⇒ deprecatedAndThen[Source[Out, Unit]](GroupBy(maxSubstreams, f.asInstanceOf[Any ⇒ Any])) @@ -1000,7 +997,7 @@ trait FlowOps[+Out, +Mat] { override def apply[T](flow: Flow[Out, T, Unit], breadth: Int): Repr[T] = deprecatedAndThen[Source[Out, Unit]](Split.when(p.asInstanceOf[Any ⇒ Boolean])) .map(_.via(flow)) - .flatMapConcat(conforms) + .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, Unit]) ⇒ Closed = s ⇒ deprecatedAndThen[Source[Out, Unit]](Split.when(p.asInstanceOf[Any ⇒ Boolean])) @@ -1057,7 +1054,7 @@ trait FlowOps[+Out, +Mat] { override def apply[T](flow: Flow[Out, T, Unit], breadth: Int): Repr[T] = deprecatedAndThen[Source[Out, Unit]](Split.after(p.asInstanceOf[Any ⇒ Boolean])) .map(_.via(flow)) - .flatMapConcat(conforms) + .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, Unit]) ⇒ Closed = s ⇒ deprecatedAndThen[Source[Out, Unit]](Split.after(p.asInstanceOf[Any ⇒ Boolean])) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala index 083af1b901..7ae0a29ced 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala @@ -25,9 +25,23 @@ trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] { def to[M](sink: Graph[SinkShape[Out], M]): C /** - * Flatten the sub-flows back into the super-flow by performing a merge. + * Flatten the sub-flows back into the super-flow by performing a merge + * without parallelism limit (i.e. having an unbounded number of sub-flows + * active concurrently). + * + * This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`. */ - def mergeSubstreams: F[Out] + def mergeSubstreams: F[Out] = mergeSubstreamsWithParallelism(Int.MaxValue) + + /** + * Flatten the sub-flows back into the super-flow by performing a merge + * with the given parallelism limit. This means that only up to `parallelism` + * substreams will be executed at any given time. Substreams that are not + * yet executed are also not materialized, meaning that back-pressure will + * be exerted at the operator that creates the substreams when the parallelism + * limit is reached. + */ + def mergeSubstreamsWithParallelism(parallelism: Int): F[Out] /** * Flatten the sub-flows back into the super-flow by concatenating them. @@ -35,6 +49,8 @@ trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] { * easily lead to deadlock—the concatenation does not consume from the second * substream until the first has finished and the `groupBy` stage will get * back-pressure from the second stream. + * + * This is identical in effect to `mergeSubstreamsWithParallelism(1)`. */ - def concatSubstreams: F[Out] + def concatSubstreams: F[Out] = mergeSubstreamsWithParallelism(1) }