From b9faf9d628f4ab08273b6c825d177599bd55f05c Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 10 Dec 2015 22:20:42 -0500 Subject: [PATCH] +str #19041 deterministic `interleave` operation --- .../akka/stream/DslConsistencySpec.scala | 2 +- .../scala/akka/stream/javadsl/SubFlow.scala | 26 ++++++++++++++ .../scala/akka/stream/javadsl/SubSource.scala | 27 ++++++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 36 ++++++++++--------- 4 files changed, 74 insertions(+), 17 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index b929528e5d..d55d2bb76f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "concatGraph", "alsoToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "interleaveGraph", "concatGraph", "alsoToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass -> graphHelpers, jSourceClass -> graphHelpers, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 70f7c43cc3..e6781edab2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -698,6 +698,32 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def merge[T >: Out](that: Graph[SourceShape[T], _]): SubFlow[In, T, Mat] = new SubFlow(delegate.merge(that)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` + * source, then repeat process. + * + * Example: + * {{{ + * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubFlow[In, T, Mat] = + new SubFlow(delegate.interleave(that, segmentSize)) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 3200688900..6d7171d888 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -696,6 +696,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] = new SubSource(delegate.merge(that)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * Example: + * {{{ + * Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2) + * // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' this [[Source]] and given one completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] = + new SubSource(delegate.interleave(that, segmentSize)) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7e4805cfd3..14b36a4329 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1323,28 +1323,18 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], segmentSize: Int): Repr[U, Mat] = - interleaveMat(that, segmentSize)(Keep.left) + def interleave[U >: Out](that: Graph[SourceShape[U], _], segmentSize: Int): Repr[U] = + via(interleaveGraph(that, segmentSize)) - /** - * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, - * then repeat process. - * - * After one of upstreams is complete than all the rest elements will be emitted from the second one - * - * If it gets error from one of upstreams - stream completes with failure. - * - * @see [[#interleave]]. - */ - def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], segmentSize: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = - this.viaMat(GraphDSL.create(that) { implicit b ⇒ + protected def interleaveGraph[U >: Out, M](that: Graph[SourceShape[U], M], + segmentSize: Int): Graph[FlowShape[Out @uncheckedVariance, U], M] = + GraphDSL.create(that) { implicit b ⇒ r ⇒ import GraphDSL.Implicits._ val interleave = b.add(Interleave[U](2, segmentSize)) r ~> interleave.in(1) FlowShape(interleave.in(0), interleave.out) - })(matF) + } /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, @@ -1532,6 +1522,20 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(mergeGraph(that))(matF) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = + viaMat(interleaveGraph(that, request))(matF) + /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated,