#19145 fix FlowGroupBySpec

also fix concatSubstreams to have right semantics on groupBy and
introduce mergeSubstreamsWithLimit
This commit is contained in:
Roland Kuhn 2015-12-11 11:15:19 +01:00
parent 15cc65ce9d
commit 149e783363
6 changed files with 63 additions and 17 deletions

View file

@ -23,7 +23,7 @@ object FlowGroupBySpec {
import language.higherKinds
implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal {
def lift(key: Int Int) = f.prefixAndTail(1).map(p key(p._1.head) -> (Source.single(p._1.head) ++ p._2)).mergeSubstreams
def lift(key: Int Int) = f.prefixAndTail(1).map(p key(p._1.head) -> (Source.single(p._1.head) ++ p._2)).concatSubstreams
}
}
@ -117,7 +117,8 @@ class FlowGroupBySpec extends AkkaSpec with ScalaFutures with ConversionCheckedT
.mergeSubstreams
.grouped(10)
.runWith(Sink.head)
.futureValue(Timeout(3.seconds)) should ===(List(List("Aaa", "Abb"), List("Bcc"), List("Cdd", "Cee")))
.futureValue(Timeout(3.seconds))
.sortBy(_.head) should ===(List(List("Aaa", "Abb"), List("Bcc"), List("Cdd", "Cee")))
}
"accept cancellation of substreams" in assertAllStagesStopped {

View file

@ -28,9 +28,7 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit],
override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction)
override def mergeSubstreams: F[Out] = mergeBackFunction(subFlow, Int.MaxValue)
override def concatSubstreams: F[Out] = mergeBackFunction(subFlow, 1)
override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth)
def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink))

View file

@ -27,17 +27,34 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[In, Out, Mat]#Repr, scaladsl.Sink[In, Mat]] @uncheckedVariance = delegate
/**
* Flatten the sub-flows back into the super-flow by performing a merge.
* Flatten the sub-flows back into the super-flow by performing a merge
* without parallelism limit (i.e. having an unbounded number of sub-flows
* active concurrently).
*
* This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`.
*/
def mergeSubstreams(): Flow[In, Out, Mat] =
new Flow(delegate.mergeSubstreams)
/**
* Flatten the sub-flows back into the super-flow by performing a merge
* with the given parallelism limit. This means that only up to `parallelism`
* substreams will be executed at any given time. Substreams that are not
* yet executed are also not materialized, meaning that back-pressure will
* be exerted at the operator that creates the substreams when the parallelism
* limit is reached.
*/
def mergeSubstreamsWithParallelism(parallelism: Int): Flow[In, Out, Mat] =
new Flow(delegate.mergeSubstreamsWithParallelism(parallelism))
/**
* Flatten the sub-flows back into the super-flow by concatenating them.
* This is usually a bad idea when combined with `groupBy` since it can
* easily lead to deadlockthe concatenation does not consume from the second
* substream until the first has finished and the `groupBy` stage will get
* back-pressure from the second stream.
*
* This is identical in effect to `mergeSubstreamsWithParallelism(1)`.
*/
def concatSubstreams(): Flow[In, Out, Mat] =
new Flow(delegate.concatSubstreams)

View file

@ -27,17 +27,34 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]] @uncheckedVariance = delegate
/**
* Flatten the sub-flows back into the super-flow by performing a merge.
* Flatten the sub-flows back into the super-source by performing a merge
* without parallelism limit (i.e. having an unbounded number of sub-flows
* active concurrently).
*
* This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`.
*/
def mergeSubstreams(): Source[Out, Mat] =
new Source(delegate.mergeSubstreams)
/**
* Flatten the sub-flows back into the super-flow by concatenating them.
* Flatten the sub-flows back into the super-source by performing a merge
* with the given parallelism limit. This means that only up to `parallelism`
* substreams will be executed at any given time. Substreams that are not
* yet executed are also not materialized, meaning that back-pressure will
* be exerted at the operator that creates the substreams when the parallelism
* limit is reached.
*/
def mergeSubstreamsWithParallelism(parallelism: Int): Source[Out, Mat] =
new Source(delegate.mergeSubstreamsWithParallelism(parallelism))
/**
* Flatten the sub-flows back into the super-source by concatenating them.
* This is usually a bad idea when combined with `groupBy` since it can
* easily lead to deadlockthe concatenation does not consume from the second
* substream until the first has finished and the `groupBy` stage will get
* back-pressure from the second stream.
*
* This is identical in effect to `mergeSubstreamsWithParallelism(1)`.
*/
def concatSubstreams(): Source[Out, Mat] =
new Source(delegate.concatSubstreams)

View file

@ -931,10 +931,7 @@ trait FlowOps[+Out, +Mat] {
override def apply[T](flow: Flow[Out, T, Unit], breadth: Int): Repr[T] =
deprecatedAndThen[Source[Out, Unit]](GroupBy(maxSubstreams, f.asInstanceOf[Any Any]))
.map(_.via(flow))
.flatMapMerge(maxSubstreams, conforms)
/*
* FIXME remove all those commented workarounds above by implementing flatMapMerge(breadth)
*/
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, Unit]) Closed = s
deprecatedAndThen[Source[Out, Unit]](GroupBy(maxSubstreams, f.asInstanceOf[Any Any]))
@ -1000,7 +997,7 @@ trait FlowOps[+Out, +Mat] {
override def apply[T](flow: Flow[Out, T, Unit], breadth: Int): Repr[T] =
deprecatedAndThen[Source[Out, Unit]](Split.when(p.asInstanceOf[Any Boolean]))
.map(_.via(flow))
.flatMapConcat(conforms)
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, Unit]) Closed = s
deprecatedAndThen[Source[Out, Unit]](Split.when(p.asInstanceOf[Any Boolean]))
@ -1057,7 +1054,7 @@ trait FlowOps[+Out, +Mat] {
override def apply[T](flow: Flow[Out, T, Unit], breadth: Int): Repr[T] =
deprecatedAndThen[Source[Out, Unit]](Split.after(p.asInstanceOf[Any Boolean]))
.map(_.via(flow))
.flatMapConcat(conforms)
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, Unit]) Closed = s
deprecatedAndThen[Source[Out, Unit]](Split.after(p.asInstanceOf[Any Boolean]))

View file

@ -25,9 +25,23 @@ trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] {
def to[M](sink: Graph[SinkShape[Out], M]): C
/**
* Flatten the sub-flows back into the super-flow by performing a merge.
* Flatten the sub-flows back into the super-flow by performing a merge
* without parallelism limit (i.e. having an unbounded number of sub-flows
* active concurrently).
*
* This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`.
*/
def mergeSubstreams: F[Out]
def mergeSubstreams: F[Out] = mergeSubstreamsWithParallelism(Int.MaxValue)
/**
* Flatten the sub-flows back into the super-flow by performing a merge
* with the given parallelism limit. This means that only up to `parallelism`
* substreams will be executed at any given time. Substreams that are not
* yet executed are also not materialized, meaning that back-pressure will
* be exerted at the operator that creates the substreams when the parallelism
* limit is reached.
*/
def mergeSubstreamsWithParallelism(parallelism: Int): F[Out]
/**
* Flatten the sub-flows back into the super-flow by concatenating them.
@ -35,6 +49,8 @@ trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] {
* easily lead to deadlockthe concatenation does not consume from the second
* substream until the first has finished and the `groupBy` stage will get
* back-pressure from the second stream.
*
* This is identical in effect to `mergeSubstreamsWithParallelism(1)`.
*/
def concatSubstreams: F[Out]
def concatSubstreams: F[Out] = mergeSubstreamsWithParallelism(1)
}