From 737e7b8dfc038c4472dd72b513d192333580139a Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Wed, 9 Dec 2015 21:52:53 -0500 Subject: [PATCH 1/5] +str #19041 deterministic `interleave` operation --- .../stream/scaladsl/FlowInterleaveSpec.scala | 121 ++++++++++++++++++ .../akka/stream/scaladsl/FlowMergeSpec.scala | 4 +- .../main/scala/akka/stream/javadsl/Flow.scala | 42 ++++++ .../scala/akka/stream/javadsl/Source.scala | 41 ++++++ .../scala/akka/stream/scaladsl/Flow.scala | 45 +++++++ .../scala/akka/stream/scaladsl/Graph.scala | 81 ++++++++++++ 6 files changed, 332 insertions(+), 2 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala new file mode 100644 index 0000000000..d750282a19 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import org.reactivestreams.Publisher + +class FlowInterleaveSpec extends BaseTwoStreamsSetup { + + override type Outputs = Int + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source(p1).interleave(Source(p2), 2).runWith(Sink(subscriber)) + subscriber + } + + "An Interleave for Flow " must { + + "work in the happy case" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + Source(0 to 3).interleave(Source(List[Int]()), 2).interleave(Source(4 to 9), 2).runWith(Sink(probe)) + + val subscription = probe.expectSubscription() + + var collected = Set.empty[Int] + for (_ ← 1 to 10) { + subscription.request(1) + collected += probe.expectNext() + } + + collected should be(Set(0, 1, 4, 5, 2, 3, 6, 7, 8, 9)) + probe.expectComplete() + } + + "work when bucket is not equal elements in stream" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + + Source(0 to 2).interleave(Source(3 to 5), 2).runWith(Sink(probe)) + probe.expectSubscription().request(10) + probe.expectNext(0, 1, 3, 4, 2, 5) + probe.expectComplete() + } + + "work with bucket = 1" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + + Source(0 to 2).interleave(Source(3 to 5), 1).runWith(Sink(probe)) + probe.expectSubscription().request(10) + probe.expectNext(0, 3, 1, 4, 2, 5) + probe.expectComplete() + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + subscriber1.expectError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + subscriber2.expectError(TestException) + + } + + "work with one delayed failed and one nonempty publisher" in { + // This is nondeterministic, multiple scenarios can happen + pending + } + + "pass along early cancellation" in assertAllStagesStopped { + val up1 = TestPublisher.manualProbe[Int]() + val up2 = TestPublisher.manualProbe[Int]() + val down = TestSubscriber.manualProbe[Int]() + + val (graphSubscriber1, graphSubscriber2) = Source.subscriber[Int] + .interleaveMat(Source.subscriber[Int], 2)((_, _)).toMat(Sink(down))(Keep.left).run + + val downstream = down.expectSubscription() + downstream.cancel() + up1.subscribe(graphSubscriber1) + up2.subscribe(graphSubscriber2) + up1.expectSubscription().expectCancellation() + up2.expectSubscription().expectCancellation() + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index 4941e05222..68272655ad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ import akka.stream.testkit._ -import org.reactivestreams.{ Subscriber, Publisher } +import org.reactivestreams.Publisher class FlowMergeSpec extends BaseTwoStreamsSetup { @@ -26,7 +26,7 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { val source3 = Source(List[Int]()) val probe = TestSubscriber.manualProbe[Int]() - Source(0 to 3).merge(Source(List[Int]())).merge(Source(4 to 9)) + source1.merge(source3).merge(source2) .map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe)) val subscription = probe.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f1913fe19a..07f3830a4e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1004,6 +1004,48 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, + * then repeat process from the beginning. + * + * Example: + * {{{ + * Source src = Source.from(Arrays.asList(1, 2, 3)) + * Flow flow = flow.interleave(Source.from(Arrays.asList(4, 5, 6, 7)), 2) + * src.via(flow) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Flow[In, T, Mat] = + new Flow(delegate.interleave(that, result)) + + /** + * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, + * then repeat process from the beginning. + * + * After [[Flow]] or [[Source]] is compete than all the rest elements will be emitted from the second one + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int, + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.interleaveMat(that, result)(combinerToScala(matF))) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1f583f5e01..17d0e20808 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -498,6 +498,47 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = new Source(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It takes `request` number of elements from this source to emit downstream, then - same amount for given one, + * then repeat process from the beginning. + * + * Example: + * {{{ + * Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2) + * // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Source]] and given one completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Source[T, Mat] = + new Source(delegate.interleave(that, result)) + + /** + * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It takes `request` number of elements from this source to emit downstream, then - same amount for given source, + * then repeat process from the beginning. + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int, + matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.interleaveMat(that, result)(combinerToScala(matF))) + /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 15e0939782..31626cde40 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1300,6 +1300,51 @@ trait FlowOps[+Out, +Mat] { FlowShape(zip.in0, zip.out) } + /** + * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, + * then repeat process from the beginning. + * + * Example: + * {{{ + * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], request: Int): Repr[U, Mat] = + interleaveMat(that, request)(Keep.left) + + /** + * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, + * then repeat process from the beginning. + * + * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + this.viaMat(GraphDSL.create(that) { implicit b ⇒ + r ⇒ + import GraphDSL.Implicits._ + val interleave = b.add(Interleave[U](2, request)) + r ~> interleave.in(1) + FlowShape(interleave.in(0), interleave.out) + })(matF) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index d9a545e895..0aef5f544e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -227,6 +227,87 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: } } +object Interleave { + /** + * Create a new `Interleave` with the specified number of input ports and given size of elements + * to take from each input. + * + * @param inputPorts number of input ports + * @param request number of elements to send downstream before switching to next input port + * @param eagerClose if true, interleave completes upstream if any of its upstream completes. + */ + def apply[T](inputPorts: Int, request: Int, eagerClose: Boolean = false): Interleave[T] = + new Interleave(inputPorts, request, eagerClose) +} + +/** + * Interleave represents deterministic merge which takes N elements per input stream, + * in-order of inputs, emits them downstream and then cycles/"wraps-around" the inputs. + * + * '''Emits when''' element is available from current input (depending on phase) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * + * '''Cancels when''' downstream cancels + * + */ +class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Interleave.in" + i)) + val out: Outlet[T] = Outlet[T]("Interleave.out") + override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + private var counter = 0 + private var currentUpstreamIndex = 0 + private var runningUpstreams = inputPorts + private def upstreamsClosed = runningUpstreams == 0 + private def currentUpstream = in(currentUpstreamIndex) + + private def switchToNextInput(): Unit = { + @tailrec + def nextInletIndex(index: Int): Int = { + val reduced = (index + 1) % inputPorts + if (!isClosed(in(reduced))) reduced else nextInletIndex(index + 1) + } + counter = 0 + currentUpstreamIndex = nextInletIndex(currentUpstreamIndex) + } + + in.foreach { i ⇒ + setHandler(i, new InHandler { + override def onPush(): Unit = { + push(out, grab(i)) + counter += 1 + if (counter == request) switchToNextInput() + } + + override def onUpstreamFinish() = { + if (eagerClose) { + in.foreach(cancel) + completeStage() + } else { + runningUpstreams -= 1 + if (!upstreamsClosed) { + if (i == currentUpstream) { + switchToNextInput() + if (isAvailable(out)) pull(currentUpstream) + } + } else completeStage() + } + } + }) + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) pull(currentUpstream) + }) + } + + override def toString = "Interleave" +} + object Broadcast { /** * Create a new `Broadcast` with the specified number of output ports. From fe3c34ed004ef41f7d6b781b540fcfbccbd53f31 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 10 Dec 2015 21:44:02 -0500 Subject: [PATCH 2/5] +str #19041 deterministic `interleave` operation --- .../akka/stream/scaladsl/FlowMergeSpec.scala | 6 ++-- .../main/scala/akka/stream/javadsl/Flow.scala | 29 ++++++++-------- .../scala/akka/stream/javadsl/Source.scala | 25 +++++++------- .../scala/akka/stream/scaladsl/Flow.scala | 33 ++++++++++--------- .../scala/akka/stream/scaladsl/Graph.scala | 13 +++++--- 5 files changed, 56 insertions(+), 50 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index 68272655ad..41f3259c7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -22,11 +22,11 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { "work in the happy case" in assertAllStagesStopped { // Different input sizes (4 and 6) val source1 = Source(0 to 3) - val source2 = Source(4 to 9) - val source3 = Source(List[Int]()) + val source2 = Source(List[Int]()) + val source3 = Source(4 to 9) val probe = TestSubscriber.manualProbe[Int]() - source1.merge(source3).merge(source2) + source1.merge(source2).merge(source3) .map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe)) val subscription = probe.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 07f3830a4e..0bb70c8f32 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1005,9 +1005,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * * Example: * {{{ @@ -1016,35 +1016,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * src.via(flow) // 1, 2, 4, 5, 3, 6, 7 * }}} * - * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * After one of upstreams is complete than all the rest elements will be emitted from the second one * * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. * - * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * '''Emits when''' element is available from the currently consumed upstream * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements * * '''Completes when''' the [[Flow]] and given [[Source]] completes * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Flow[In, T, Mat] = - new Flow(delegate.interleave(that, result)) + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Flow[In, T, Mat] = + new Flow(delegate.interleave(that, segmentSize)) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * - * After [[Flow]] or [[Source]] is compete than all the rest elements will be emitted from the second one + * After one of upstreams is compete than all the rest elements will be emitted from the second one * * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. * * @see [[#interleave]]. */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int, + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.interleaveMat(that, result)(combinerToScala(matF))) + new Flow(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 17d0e20808..a6eac1a10e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -499,9 +499,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.alsoToMat(that)(combinerToScala(matF))) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]]. - * It takes `request` number of elements from this source to emit downstream, then - same amount for given one, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * * Example: * {{{ @@ -513,21 +513,22 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * If one of sources gets upstream error - stream completes with failure. * - * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * '''Emits when''' element is available from the currently consumed upstream * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements * * '''Completes when''' this [[Source]] and given one completes * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Source[T, Mat] = - new Source(delegate.interleave(that, result)) + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Source[T, Mat] = + new Source(delegate.interleave(that, segmentSize)) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]]. - * It takes `request` number of elements from this source to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * * After one of sources is complete than all the rest elements will be emitted from the second one * @@ -535,9 +536,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#interleave]]. */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int, + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.interleaveMat(that, result)(combinerToScala(matF))) + new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 31626cde40..7e4805cfd3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1301,46 +1301,47 @@ trait FlowOps[+Out, +Mat] { } /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` + * source, then repeat process. * * Example: * {{{ * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 * }}} * - * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * After one of upstreams is complete than all the rest elements will be emitted from the second one * - * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * If it gets error from one of upstreams - stream completes with failure. * - * '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled + * '''Emits when''' element is available from the currently consumed upstream * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements * * '''Completes when''' the [[Flow]] and given [[Source]] completes * * '''Cancels when''' downstream cancels */ - def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], request: Int): Repr[U, Mat] = - interleaveMat(that, request)(Keep.left) + def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], segmentSize: Int): Repr[U, Mat] = + interleaveMat(that, segmentSize)(Keep.left) /** - * Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It takes `request` number of elements from this flow to emit downstream, then - same amount for given source, - * then repeat process from the beginning. + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. * - * After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one + * After one of upstreams is complete than all the rest elements will be emitted from the second one * - * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * If it gets error from one of upstreams - stream completes with failure. * * @see [[#interleave]]. */ - def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], segmentSize: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = this.viaMat(GraphDSL.create(that) { implicit b ⇒ r ⇒ import GraphDSL.Implicits._ - val interleave = b.add(Interleave[U](2, request)) + val interleave = b.add(Interleave[U](2, segmentSize)) r ~> interleave.in(1) FlowShape(interleave.in(0), interleave.out) })(matF) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 0aef5f544e..7cbdaa7281 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -233,11 +233,11 @@ object Interleave { * to take from each input. * * @param inputPorts number of input ports - * @param request number of elements to send downstream before switching to next input port + * @param segmentSize number of elements to send downstream before switching to next input port * @param eagerClose if true, interleave completes upstream if any of its upstream completes. */ - def apply[T](inputPorts: Int, request: Int, eagerClose: Boolean = false): Interleave[T] = - new Interleave(inputPorts, request, eagerClose) + def apply[T](inputPorts: Int, segmentSize: Int, eagerClose: Boolean = false): Interleave[T] = + new Interleave(inputPorts, segmentSize, eagerClose) } /** @@ -253,7 +253,10 @@ object Interleave { * '''Cancels when''' downstream cancels * */ -class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { +final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { + require(inputPorts > 1, "input ports must be > 1") + require(segmentSize > 0, "segmentSize must be > 0") + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Interleave.in" + i)) val out: Outlet[T] = Outlet[T]("Interleave.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) @@ -280,7 +283,7 @@ class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClo override def onPush(): Unit = { push(out, grab(i)) counter += 1 - if (counter == request) switchToNextInput() + if (counter == segmentSize) switchToNextInput() } override def onUpstreamFinish() = { From b9faf9d628f4ab08273b6c825d177599bd55f05c Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 10 Dec 2015 22:20:42 -0500 Subject: [PATCH 3/5] +str #19041 deterministic `interleave` operation --- .../akka/stream/DslConsistencySpec.scala | 2 +- .../scala/akka/stream/javadsl/SubFlow.scala | 26 ++++++++++++++ .../scala/akka/stream/javadsl/SubSource.scala | 27 ++++++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 36 ++++++++++--------- 4 files changed, 74 insertions(+), 17 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index b929528e5d..d55d2bb76f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "concatGraph", "alsoToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "interleaveGraph", "concatGraph", "alsoToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass -> graphHelpers, jSourceClass -> graphHelpers, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 70f7c43cc3..e6781edab2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -698,6 +698,32 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def merge[T >: Out](that: Graph[SourceShape[T], _]): SubFlow[In, T, Mat] = new SubFlow(delegate.merge(that)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` + * source, then repeat process. + * + * Example: + * {{{ + * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubFlow[In, T, Mat] = + new SubFlow(delegate.interleave(that, segmentSize)) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 3200688900..6d7171d888 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -696,6 +696,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] = new SubSource(delegate.merge(that)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * Example: + * {{{ + * Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2) + * // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' this [[Source]] and given one completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] = + new SubSource(delegate.interleave(that, segmentSize)) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7e4805cfd3..14b36a4329 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1323,28 +1323,18 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], segmentSize: Int): Repr[U, Mat] = - interleaveMat(that, segmentSize)(Keep.left) + def interleave[U >: Out](that: Graph[SourceShape[U], _], segmentSize: Int): Repr[U] = + via(interleaveGraph(that, segmentSize)) - /** - * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. - * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, - * then repeat process. - * - * After one of upstreams is complete than all the rest elements will be emitted from the second one - * - * If it gets error from one of upstreams - stream completes with failure. - * - * @see [[#interleave]]. - */ - def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], segmentSize: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = - this.viaMat(GraphDSL.create(that) { implicit b ⇒ + protected def interleaveGraph[U >: Out, M](that: Graph[SourceShape[U], M], + segmentSize: Int): Graph[FlowShape[Out @uncheckedVariance, U], M] = + GraphDSL.create(that) { implicit b ⇒ r ⇒ import GraphDSL.Implicits._ val interleave = b.add(Interleave[U](2, segmentSize)) r ~> interleave.in(1) FlowShape(interleave.in(0), interleave.out) - })(matF) + } /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, @@ -1532,6 +1522,20 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(mergeGraph(that))(matF) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = + viaMat(interleaveGraph(that, request))(matF) + /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, From 926e0cb8b40273073fb16310943e7d4bb48867b2 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 11 Dec 2015 21:58:26 -0500 Subject: [PATCH 4/5] +str #19041 deterministic `interleave` operation --- .../stream/scaladsl/FlowInterleaveSpec.scala | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala index d750282a19..1cb6264654 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -21,7 +21,7 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { "work in the happy case" in assertAllStagesStopped { val probe = TestSubscriber.manualProbe[Int]() - Source(0 to 3).interleave(Source(List[Int]()), 2).interleave(Source(4 to 9), 2).runWith(Sink(probe)) + Source(0 to 3).interleave(Source(List[Int]()), 2).interleave(Source(4 to 9), 3).runWith(Sink(probe)) val subscription = probe.expectSubscription() @@ -31,11 +31,11 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { collected += probe.expectNext() } - collected should be(Set(0, 1, 4, 5, 2, 3, 6, 7, 8, 9)) + collected should be(Set(0, 1, 4, 5, 6, 2, 3, 7, 8, 9)) probe.expectComplete() } - "work when bucket is not equal elements in stream" in assertAllStagesStopped { + "work when segmentSize is not equal elements in stream" in assertAllStagesStopped { val probe = TestSubscriber.manualProbe[Int]() Source(0 to 2).interleave(Source(3 to 5), 2).runWith(Sink(probe)) @@ -44,7 +44,7 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { probe.expectComplete() } - "work with bucket = 1" in assertAllStagesStopped { + "work with segmentSize = 1" in assertAllStagesStopped { val probe = TestSubscriber.manualProbe[Int]() Source(0 to 2).interleave(Source(3 to 5), 1).runWith(Sink(probe)) @@ -53,6 +53,26 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { probe.expectComplete() } + "not work with segmentSize = 0" in assertAllStagesStopped { + an[IllegalArgumentException] mustBe thrownBy(Source(0 to 2).interleave(Source(3 to 5), 0).runWith(Sink.head)) + } + + "not work when segmentSize > than stream elements" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + Source(0 to 2).interleave(Source(3 to 15), 10).runWith(Sink(probe)) + probe.expectSubscription().request(25) + (0 to 15).foreach(probe.expectNext) + probe.expectComplete() + + val probe2 = TestSubscriber.manualProbe[Int]() + Source(1 to 20).interleave(Source(21 to 25), 10).runWith(Sink(probe2)) + probe2.expectSubscription().request(100) + (1 to 10).foreach(probe2.expectNext) + (21 to 25).foreach(probe2.expectNext) + (11 to 20).foreach(probe2.expectNext) + probe2.expectComplete() + } + commonTests() "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { From dace84c91b0ad903103e77c8402e1b1ec71c4f67 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sun, 13 Dec 2015 23:22:23 -0500 Subject: [PATCH 5/5] +str #19041 deterministic `interleave` operation --- .../stream/scaladsl/FlowInterleaveSpec.scala | 10 ++++---- .../scala/akka/stream/scaladsl/Graph.scala | 25 ++++++++++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala index 1cb6264654..80d43c7ad4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -21,17 +21,17 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { "work in the happy case" in assertAllStagesStopped { val probe = TestSubscriber.manualProbe[Int]() - Source(0 to 3).interleave(Source(List[Int]()), 2).interleave(Source(4 to 9), 3).runWith(Sink(probe)) + Source(0 to 3).interleave(Source(4 to 6), 2).interleave(Source(7 to 11), 3).runWith(Sink(probe)) val subscription = probe.expectSubscription() - var collected = Set.empty[Int] - for (_ ← 1 to 10) { + var collected = Seq.empty[Int] + for (_ ← 1 to 12) { subscription.request(1) - collected += probe.expectNext() + collected :+= probe.expectNext() } - collected should be(Set(0, 1, 4, 5, 6, 2, 3, 7, 8, 9)) + collected should be(Seq(0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6)) probe.expectComplete() } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 7cbdaa7281..f55d8af5ce 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -271,8 +271,18 @@ final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, va private def switchToNextInput(): Unit = { @tailrec def nextInletIndex(index: Int): Int = { - val reduced = (index + 1) % inputPorts - if (!isClosed(in(reduced))) reduced else nextInletIndex(index + 1) + val successor = index + 1 match { + case `inputPorts` ⇒ 0 + case x ⇒ x + } + if (!isClosed(in(successor))) successor + else { + if (successor != currentUpstreamIndex) nextInletIndex(successor) + else { + completeStage() + 0 // return dummy/min value to exit stage logic gracefully + } + } } counter = 0 currentUpstreamIndex = nextInletIndex(currentUpstreamIndex) @@ -286,11 +296,8 @@ final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, va if (counter == segmentSize) switchToNextInput() } - override def onUpstreamFinish() = { - if (eagerClose) { - in.foreach(cancel) - completeStage() - } else { + override def onUpstreamFinish(): Unit = { + if (!eagerClose) { runningUpstreams -= 1 if (!upstreamsClosed) { if (i == currentUpstream) { @@ -298,13 +305,13 @@ final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, va if (isAvailable(out)) pull(currentUpstream) } } else completeStage() - } + } else completeStage() } }) } setHandler(out, new OutHandler { - override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) pull(currentUpstream) + override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) tryPull(currentUpstream) }) }