From 19f23514f6300e334216162e622eed2329c569ea Mon Sep 17 00:00:00 2001 From: Filip Matusak Date: Tue, 15 May 2018 15:03:52 +0200 Subject: [PATCH] #24778 MergeLatest operator --- .../operators/Source-or-Flow/mergeLatest.md | 23 +++ .../src/main/paradox/stream/stream-graphs.md | 1 + .../scaladsl/GraphMergeLatestSpec.scala | 149 ++++++++++++++++++ .../akka/stream/javadsl/MergeLatest.scala | 42 +++++ .../akka/stream/scaladsl/MergeLatest.scala | 85 ++++++++++ 5 files changed, 300 insertions(+) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/MergeLatest.scala create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl/MergeLatest.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md new file mode 100644 index 0000000000..48634393e7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md @@ -0,0 +1,23 @@ +# mergeLatest + +Merge multiple sources. + +@ref[Fan-in stages](../index.md#fan-in-stages) + +## Signature + +## Description + +MergeLatest joins elements from N input streams into stream of lists of size N. +i-th element in list is the latest emitted element from i-th input stream. +MergeLatest emits list for each element emitted from some input stream, +but only after each input stream emitted at least one element + + +@@@div { .callout } + +**emits** when element is available from some input and each input emits at least one element from stream start + +**completes** all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) +@@@ + diff --git a/akka-docs/src/main/paradox/stream/stream-graphs.md b/akka-docs/src/main/paradox/stream/stream-graphs.md index 9210169413..db52d95129 100644 --- a/akka-docs/src/main/paradox/stream/stream-graphs.md +++ b/akka-docs/src/main/paradox/stream/stream-graphs.md @@ -44,6 +44,7 @@ Akka Streams currently provide these junctions (for a detailed list see the @ref * @scala[`Merge[In]`]@java[`Merge`] – *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output * @scala[`MergePreferred[In]`]@java[`MergePreferred`] – like `Merge` but if elements are available on `preferred` port, it picks from it, otherwise randomly from `others` * @scala[`MergePrioritized[In]`]@java[`MergePrioritized`] – like `Merge` but if elements are available on all input ports, it picks from them randomly based on their `priority` + * @scala[`MergeLatest[In]`]@java[`MergeLatest`] – *(N inputs, 1 output)* emits `List[In]`, when i-th input stream emits element, then i-th element in emitted list is updated * @scala[`ZipWith[A,B,...,Out]`]@java[`ZipWith`] – *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element * @scala[`Zip[A,B]`]@java[`Zip`] – *(2 inputs, 1 output)* is a `ZipWith` specialised to zipping input streams of `A` and `B` into a @scala[`(A,B)`]@java[`Pair(A,B)`] tuple stream * @scala[`Concat[A]`]@java[`Concat`] – *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala new file mode 100644 index 0000000000..6dc9428d28 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeLatestSpec.scala @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream._ +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSource + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class GraphMergeLatestSpec extends TwoStreamsSetup { + import GraphDSL.Implicits._ + + override type Outputs = List[Int] + + override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) { + val merge = b add MergeLatest[Int](2) + + override def left: Inlet[Int] = merge.in(0) + override def right: Inlet[Int] = merge.in(1) + override def out: Outlet[Outputs] = merge.out + + } + + "mergeLatest" must { + + "start emit values only after each input stream emitted value" in assertAllStagesStopped { + val up1 = TestSource.probe[Int] + val up2 = TestSource.probe[Int] + val up3 = TestSource.probe[Int] + val probe = TestSubscriber.manualProbe[List[Int]]() + + val (in1, in2, in3) = RunnableGraph.fromGraph(GraphDSL.create(up1, up2, up3)((_, _, _)) { implicit b ⇒ (s1, s2, s3) ⇒ + val m = b.add(MergeLatest[Int](3)) + + s1 ~> m + s2 ~> m + s3 ~> m + m.out ~> Sink.fromSubscriber(probe) + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + subscription.request(1) + probe.expectNoMessage(10.millis) + in1.sendNext(1) + probe.expectNoMessage(10.millis) + in2.sendNext(2) + probe.expectNoMessage(10.millis) + in3.sendNext(3) + probe.expectNext(List(1, 2, 3)) + in1.sendComplete() + in2.sendComplete() + in3.sendComplete() + probe.expectComplete() + } + + "update values after message from one stream" in assertAllStagesStopped { + val up1 = TestSource.probe[Int] + val up2 = TestSource.probe[Int] + val up3 = TestSource.probe[Int] + val probe = TestSubscriber.manualProbe[List[Int]]() + + val (in1, in2, in3) = RunnableGraph.fromGraph(GraphDSL.create(up1, up2, up3)((_, _, _)) { implicit b ⇒ (s1, s2, s3) ⇒ + val m = b.add(MergeLatest[Int](3)) + + s1 ~> m + s2 ~> m + s3 ~> m + m.out ~> Sink.fromSubscriber(probe) + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + in1.sendNext(1) + in2.sendNext(2) + in3.sendNext(3) + subscription.request(1) + probe.expectNext() should be(List(1, 2, 3)) + + in1.sendNext(2) + subscription.request(1) + probe.expectNext() should be(List(2, 2, 3)) + + in2.sendNext(4) + subscription.request(1) + probe.expectNext() should be(List(2, 4, 3)) + + in3.sendNext(6) + subscription.request(1) + probe.expectNext() should be(List(2, 4, 6)) + + in3.sendNext(9) + subscription.request(1) + probe.expectNext() should be(List(2, 4, 9)) + + in1.sendNext(4) + subscription.request(1) + probe.expectNext() should be(List(4, 4, 9)) + + in1.sendComplete() + in2.sendComplete() + in3.sendComplete() + probe.expectComplete() + } + + "work with one-way merge" in { + val result = Source.fromGraph(GraphDSL.create() { implicit b ⇒ + val merge = b.add(MergeLatest[Int](1)) + val source = b.add(Source(1 to 3)) + + source ~> merge + SourceShape(merge.out) + }).runFold(Seq[List[Int]]())(_ :+ _) + + Await.result(result, 3.seconds) should ===(Seq(List(1), List(2), List(3))) + } + + "complete stage if eagerComplete is set and one of input stream finished" in assertAllStagesStopped { + val up1 = TestSource.probe[Int] + val up2 = TestSource.probe[Int] + val probe = TestSubscriber.manualProbe[List[Int]]() + + val (in1, in2) = RunnableGraph.fromGraph(GraphDSL.create(up1, up2)((_, _)) { implicit b ⇒ (s1, s2) ⇒ + val m = b.add(MergeLatest[Int](2, true)) + + s1 ~> m + s2 ~> m + m.out ~> Sink.fromSubscriber(probe) + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + in1.sendComplete() + probe.expectComplete() + } + + commonTests() + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/MergeLatest.scala b/akka-stream/src/main/scala/akka/stream/javadsl/MergeLatest.scala new file mode 100644 index 0000000000..8dfdad5978 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/MergeLatest.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.stream.stage.GraphStage +import akka.stream.{ UniformFanInShape, scaladsl } + +import scala.collection.JavaConverters._ + +/** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each stream emitted at least one element + * + * '''Emits when''' element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * + * '''Cancels when''' downstream cancels + * + */ +object MergeLatest { + /** + * Create a new `MergeLatest` with the specified number of input ports. + * + * @param inputPorts number of input ports + * @param eagerComplete if true, the merge latest will complete as soon as one of its inputs completes. + */ + def create[T](inputPorts: Int, eagerComplete: Boolean): GraphStage[UniformFanInShape[T, java.util.List[T]]] = + new scaladsl.MergeLatest[T, java.util.List[T]](inputPorts, eagerComplete)(x ⇒ x.toList.asJava) + + /** + * Create a new `MergeLatest` with the specified number of input ports. + * + * @param inputPorts number of input ports + */ + def create[T](inputPorts: Int): GraphStage[UniformFanInShape[T, java.util.List[T]]] = create(inputPorts, false) +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/MergeLatest.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/MergeLatest.scala new file mode 100644 index 0000000000..d18dc56cdc --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/MergeLatest.scala @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape } + +import scala.collection.immutable +import scala.language.higherKinds + +/** + * MergeLatest joins elements from N input streams into stream of lists of size N. + * i-th element in list is the latest emitted element from i-th input stream. + * MergeLatest emits list for each element emitted from some input stream, + * but only after each stream emitted at least one element + * + * '''Emits when''' element is available from some input and each input emits at least one element from stream start + * + * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * + * '''Cancels when''' downstream cancels + * + */ +object MergeLatest { + /** + * Create a new `MergeLatest` with the specified number of input ports. + * + * @param inputPorts number of input ports + * @param eagerComplete if true, the merge latest will complete as soon as one of its inputs completes. + */ + def apply[T](inputPorts: Int, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] = + new MergeLatest[T, List[T]](inputPorts, eagerComplete)(_.toList) + +} + +final class MergeLatest[T, M](val inputPorts: Int, val eagerClose: Boolean)(buildElem: Array[T] ⇒ M) extends GraphStage[UniformFanInShape[T, M]] { + require(inputPorts >= 1, "input ports must be >= 1") + + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("MergeLatest.in" + i)) + val out: Outlet[M] = Outlet[M]("MergeLatest.out") + override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]() + private var runningUpstreams: Int = inputPorts + private def upstreamsClosed: Boolean = runningUpstreams == 0 + private def allMessagesReady: Boolean = activeStreams.size == inputPorts + private val messages: Array[Any] = new Array[Any](inputPorts) + + override def preStart(): Unit = in.foreach(tryPull) + + in.zipWithIndex.foreach { + case (input, index) ⇒ + setHandler(input, new InHandler { + override def onPush(): Unit = { + messages.update(index, grab(input)) + activeStreams.add(index) + if (allMessagesReady) emit(out, buildElem(messages.asInstanceOf[Array[T]])) + tryPull(input) + } + + override def onUpstreamFinish(): Unit = { + if (!eagerClose) { + runningUpstreams -= 1 + if (upstreamsClosed) completeStage() + } else completeStage() + } + }) + } + + override def onPull(): Unit = { + var i = 0 + while (i < inputPorts) { + if (!hasBeenPulled(in(i))) tryPull(in(i)) + i += 1 + } + } + + setHandler(out, this) + } + + override def toString = "MergeLatest" +}