From fe3c34ed004ef41f7d6b781b540fcfbccbd53f31 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 10 Dec 2015 21:44:02 -0500 Subject: [PATCH] +str #19041 deterministic `interleave` operation --- .../akka/stream/scaladsl/FlowMergeSpec.scala | 6 ++-- .../main/scala/akka/stream/javadsl/Flow.scala | 29 ++++++++-------- .../scala/akka/stream/javadsl/Source.scala | 25 +++++++------- .../scala/akka/stream/scaladsl/Flow.scala | 33 ++++++++++--------- .../scala/akka/stream/scaladsl/Graph.scala | 13 +++++--- 5 files changed, 56 insertions(+), 50 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index 68272655ad..41f3259c7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -22,11 +22,11 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { "work in the happy case" in assertAllStagesStopped { // Different input sizes (4 and 6) val source1 = Source(0 to 3) - val source2 = Source(4 to 9) - val source3 = Source(List[Int]()) + val source2 = Source(List[Int]()) + val source3 = Source(4 to 9) val probe = TestSubscriber.manualProbe[Int]() - source1.merge(source3).merge(source2) + source1.merge(source2).merge(source3) .map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe)) val subscription = probe.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 07f3830a4e..0bb70c8f32 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1005,9 +1005,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * * Example: * {{{ @@ -1016,35 +1016,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * src.via(flow) // 1, 2, 4, 5, 3, 6, 7 * }}} * - * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * After one of upstreams is complete than all the rest elements will be emitted from the second one * * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. * - * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * '''Emits when''' element is available from the currently consumed upstream * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements * * '''Completes when''' the [[Flow]] and given [[Source]] completes * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Flow[In, T, Mat] = - new Flow(delegate.interleave(that, result)) + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Flow[In, T, Mat] = + new Flow(delegate.interleave(that, segmentSize)) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * - * After [[Flow]] or [[Source]] is compete than all the rest elements will be emitted from the second one + * After one of upstreams is compete than all the rest elements will be emitted from the second one * * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. * * @see [[#interleave]]. */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int, + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.interleaveMat(that, result)(combinerToScala(matF))) + new Flow(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 17d0e20808..a6eac1a10e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -499,9 +499,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.alsoToMat(that)(combinerToScala(matF))) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]]. - * It takes `request` number of elements from this source to emit downstream, then - same amount for given one, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * * Example: * {{{ @@ -513,21 +513,22 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * If one of sources gets upstream error - stream completes with failure. * - * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * '''Emits when''' element is available from the currently consumed upstream * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements * * '''Completes when''' this [[Source]] and given one completes * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Source[T, Mat] = - new Source(delegate.interleave(that, result)) + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Source[T, Mat] = + new Source(delegate.interleave(that, segmentSize)) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]]. - * It takes `request` number of elements from this source to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * * After one of sources is complete than all the rest elements will be emitted from the second one * @@ -535,9 +536,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#interleave]]. */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int, + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.interleaveMat(that, result)(combinerToScala(matF))) + new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 31626cde40..7e4805cfd3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1301,46 +1301,47 @@ trait FlowOps[+Out, +Mat] { } /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` + * source, then repeat process. * * Example: * {{{ * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 * }}} * - * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * After one of upstreams is complete than all the rest elements will be emitted from the second one * - * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * If it gets error from one of upstreams - stream completes with failure. * - * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * '''Emits when''' element is available from the currently consumed upstream * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements * * '''Completes when''' the [[Flow]] and given [[Source]] completes * * '''Cancels when''' downstream cancels */ - def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], request: Int): Repr[U, Mat] = - interleaveMat(that, request)(Keep.left) + def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], segmentSize: Int): Repr[U, Mat] = + interleaveMat(that, segmentSize)(Keep.left) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * - * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * After one of upstreams is complete than all the rest elements will be emitted from the second one * - * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * If it gets error from one of upstreams - stream completes with failure. * * @see [[#interleave]]. */ - def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], segmentSize: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = this.viaMat(GraphDSL.create(that) { implicit b ⇒ r ⇒ import GraphDSL.Implicits._ - val interleave = b.add(Interleave[U](2, request)) + val interleave = b.add(Interleave[U](2, segmentSize)) r ~> interleave.in(1) FlowShape(interleave.in(0), interleave.out) })(matF) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 0aef5f544e..7cbdaa7281 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -233,11 +233,11 @@ object Interleave { * to take from each input. * * @param inputPorts number of input ports - * @param request number of elements to send downstream before switching to next input port + * @param segmentSize number of elements to send downstream before switching to next input port * @param eagerClose if true, interleave completes upstream if any of its upstream completes. */ - def apply[T](inputPorts: Int, request: Int, eagerClose: Boolean = false): Interleave[T] = - new Interleave(inputPorts, request, eagerClose) + def apply[T](inputPorts: Int, segmentSize: Int, eagerClose: Boolean = false): Interleave[T] = + new Interleave(inputPorts, segmentSize, eagerClose) } /** @@ -253,7 +253,10 @@ object Interleave { * '''Cancels when''' downstream cancels * */ -class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { +final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { + require(inputPorts > 1, "input ports must be > 1") + require(segmentSize > 0, "segmentSize must be > 0") + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Interleave.in" + i)) val out: Outlet[T] = Outlet[T]("Interleave.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) @@ -280,7 +283,7 @@ class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClo override def onPush(): Unit = { push(out, grab(i)) counter += 1 - if (counter == request) switchToNextInput() + if (counter == segmentSize) switchToNextInput() } override def onUpstreamFinish() = {