diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index b929528e5d..d55d2bb76f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "concatGraph", "alsoToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "interleaveGraph", "concatGraph", "alsoToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass -> graphHelpers, jSourceClass -> graphHelpers, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala new file mode 100644 index 0000000000..80d43c7ad4 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import org.reactivestreams.Publisher + +class FlowInterleaveSpec extends BaseTwoStreamsSetup { + + override type Outputs = Int + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source(p1).interleave(Source(p2), 2).runWith(Sink(subscriber)) + subscriber + } + + "An Interleave for Flow " must { + + "work in the happy case" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + Source(0 to 3).interleave(Source(4 to 6), 2).interleave(Source(7 to 11), 3).runWith(Sink(probe)) + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ ← 1 to 12) { + subscription.request(1) + collected :+= probe.expectNext() + } + + collected should be(Seq(0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6)) + probe.expectComplete() + } + + "work when segmentSize is not equal elements in stream" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + + Source(0 to 2).interleave(Source(3 to 5), 2).runWith(Sink(probe)) + probe.expectSubscription().request(10) + probe.expectNext(0, 1, 3, 4, 2, 5) + probe.expectComplete() + } + + "work with segmentSize = 1" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + + Source(0 to 2).interleave(Source(3 to 5), 1).runWith(Sink(probe)) + probe.expectSubscription().request(10) + probe.expectNext(0, 3, 1, 4, 2, 5) + probe.expectComplete() + } + + "not work with segmentSize = 0" in assertAllStagesStopped { + an[IllegalArgumentException] mustBe thrownBy(Source(0 to 2).interleave(Source(3 to 5), 0).runWith(Sink.head)) + } + + "not work when segmentSize > than stream elements" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + Source(0 to 2).interleave(Source(3 to 15), 10).runWith(Sink(probe)) + probe.expectSubscription().request(25) + (0 to 15).foreach(probe.expectNext) + probe.expectComplete() + + val probe2 = TestSubscriber.manualProbe[Int]() + Source(1 to 20).interleave(Source(21 to 25), 10).runWith(Sink(probe2)) + probe2.expectSubscription().request(100) + (1 to 10).foreach(probe2.expectNext) + (21 to 25).foreach(probe2.expectNext) + (11 to 20).foreach(probe2.expectNext) + probe2.expectComplete() + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + subscriber1.expectError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + subscriber2.expectError(TestException) + + } + + "work with one delayed failed and one nonempty publisher" in { + // This is nondeterministic, multiple scenarios can happen + pending + } + + "pass along early cancellation" in assertAllStagesStopped { + val up1 = TestPublisher.manualProbe[Int]() + val up2 = TestPublisher.manualProbe[Int]() + val down = TestSubscriber.manualProbe[Int]() + + val (graphSubscriber1, graphSubscriber2) = Source.subscriber[Int] + .interleaveMat(Source.subscriber[Int], 2)((_, _)).toMat(Sink(down))(Keep.left).run + + val downstream = down.expectSubscription() + downstream.cancel() + up1.subscribe(graphSubscriber1) + up2.subscribe(graphSubscriber2) + up1.expectSubscription().expectCancellation() + up2.expectSubscription().expectCancellation() + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index 4941e05222..41f3259c7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.stream.testkit.Utils._ import akka.stream.testkit._ -import org.reactivestreams.{ Subscriber, Publisher } +import org.reactivestreams.Publisher class FlowMergeSpec extends BaseTwoStreamsSetup { @@ -22,11 +22,11 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { "work in the happy case" in assertAllStagesStopped { // Different input sizes (4 and 6) val source1 = Source(0 to 3) - val source2 = Source(4 to 9) - val source3 = Source(List[Int]()) + val source2 = Source(List[Int]()) + val source3 = Source(4 to 9) val probe = TestSubscriber.manualProbe[Int]() - Source(0 to 3).merge(Source(List[Int]())).merge(Source(4 to 9)) + source1.merge(source2).merge(source3) .map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe)) val subscription = probe.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index a4bda1c594..d39db47d6a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1032,6 +1032,49 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * Example: + * {{{ + * Source src = Source.from(Arrays.asList(1, 2, 3)) + * Flow flow = flow.interleave(Source.from(Arrays.asList(4, 5, 6, 7)), 2) + * src.via(flow) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Flow[In, T, Mat] = + new Flow(delegate.interleave(that, segmentSize)) + + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * After one of upstreams is compete than all the rest elements will be emitted from the second one + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b53db067c8..47d62eb512 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -528,6 +528,48 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = new Source(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * Example: + * {{{ + * Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2) + * // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' this [[Source]] and given one completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Source[T, Mat] = + new Source(delegate.interleave(that, segmentSize)) + + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, + matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) + /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 2cd9905e42..660fc339f5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -743,6 +743,32 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def merge[T >: Out](that: Graph[SourceShape[T], _]): SubFlow[In, T, Mat] = new SubFlow(delegate.merge(that)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` + * source, then repeat process. + * + * Example: + * {{{ + * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubFlow[In, T, Mat] = + new SubFlow(delegate.interleave(that, segmentSize)) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 0b6d58d2e5..4a3b7d093b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -741,6 +741,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] = new SubSource(delegate.merge(that)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * Example: + * {{{ + * Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2) + * // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of sources is complete than all the rest elements will be emitted from the second one + * + * If one of sources gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' this [[Source]] and given one completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] = + new SubSource(delegate.interleave(that, segmentSize)) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 64a7143ddd..8ec28bb836 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1324,6 +1324,42 @@ trait FlowOps[+Out, +Mat] { FlowShape(zip.in0, zip.out) } + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` + * source, then repeat process. + * + * Example: + * {{{ + * Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7 + * }}} + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave[U >: Out](that: Graph[SourceShape[U], _], segmentSize: Int): Repr[U] = + via(interleaveGraph(that, segmentSize)) + + protected def interleaveGraph[U >: Out, M](that: Graph[SourceShape[U], M], + segmentSize: Int): Graph[FlowShape[Out @uncheckedVariance, U], M] = + GraphDSL.create(that) { implicit b ⇒ + r ⇒ + import GraphDSL.Implicits._ + val interleave = b.add(Interleave[U](2, segmentSize)) + r ~> interleave.in(1) + FlowShape(interleave.in(0), interleave.out) + } + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. @@ -1510,6 +1546,20 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(mergeGraph(that))(matF) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * After one of upstreams is complete than all the rest elements will be emitted from the second one + * + * If it gets error from one of upstreams - stream completes with failure. + * + * @see [[#interleave]]. + */ + def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = + viaMat(interleaveGraph(that, request))(matF) + /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index d9a545e895..f55d8af5ce 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -227,6 +227,97 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: } } +object Interleave { + /** + * Create a new `Interleave` with the specified number of input ports and given size of elements + * to take from each input. + * + * @param inputPorts number of input ports + * @param segmentSize number of elements to send downstream before switching to next input port + * @param eagerClose if true, interleave completes upstream if any of its upstream completes. + */ + def apply[T](inputPorts: Int, segmentSize: Int, eagerClose: Boolean = false): Interleave[T] = + new Interleave(inputPorts, segmentSize, eagerClose) +} + +/** + * Interleave represents deterministic merge which takes N elements per input stream, + * in-order of inputs, emits them downstream and then cycles/"wraps-around" the inputs. + * + * '''Emits when''' element is available from current input (depending on phase) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * + * '''Cancels when''' downstream cancels + * + */ +final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { + require(inputPorts > 1, "input ports must be > 1") + require(segmentSize > 0, "segmentSize must be > 0") + + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Interleave.in" + i)) + val out: Outlet[T] = Outlet[T]("Interleave.out") + override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + private var counter = 0 + private var currentUpstreamIndex = 0 + private var runningUpstreams = inputPorts + private def upstreamsClosed = runningUpstreams == 0 + private def currentUpstream = in(currentUpstreamIndex) + + private def switchToNextInput(): Unit = { + @tailrec + def nextInletIndex(index: Int): Int = { + val successor = index + 1 match { + case `inputPorts` ⇒ 0 + case x ⇒ x + } + if (!isClosed(in(successor))) successor + else { + if (successor != currentUpstreamIndex) nextInletIndex(successor) + else { + completeStage() + 0 // return dummy/min value to exit stage logic gracefully + } + } + } + counter = 0 + currentUpstreamIndex = nextInletIndex(currentUpstreamIndex) + } + + in.foreach { i ⇒ + setHandler(i, new InHandler { + override def onPush(): Unit = { + push(out, grab(i)) + counter += 1 + if (counter == segmentSize) switchToNextInput() + } + + override def onUpstreamFinish(): Unit = { + if (!eagerClose) { + runningUpstreams -= 1 + if (!upstreamsClosed) { + if (i == currentUpstream) { + switchToNextInput() + if (isAvailable(out)) pull(currentUpstream) + } + } else completeStage() + } else completeStage() + } + }) + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) tryPull(currentUpstream) + }) + } + + override def toString = "Interleave" +} + object Broadcast { /** * Create a new `Broadcast` with the specified number of output ports.