diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipLatest.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipLatest.md new file mode 100644 index 0000000000..c7e5c16577 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipLatest.md @@ -0,0 +1,33 @@ +# zipLatest + +Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #zipLatest } + +@@@ + +## Description + +Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each. + +No element is emitted until at least one element from each Source becomes available. + +@@@div { .callout } + +**emits** when all of the inputs have at least an element available, and then each time an element becomes + available on either of the inputs + +**backpressures** when downstream backpressures + +**completes** when any upstream completes + +**cancels** when downstream cancels + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipLatestWith.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipLatestWith.md new file mode 100644 index 0000000000..799bbaf6d3 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipLatestWith.md @@ -0,0 +1,34 @@ +# zipLatestWith + +Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #zipLatestWith } + +@@@ + +## Description + +Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each. + +No element is emitted until at least one element from each Source becomes available. Whenever a new +element appears, the zipping function is invoked with a tuple containing the new element and the last seen element of the other stream. + +@@@div { .callout } + +**emits** all of the inputs have at least an element available, and then each time an element becomes + available on either of the inputs + +**backpressures** when downstream backpressures + +**completes** when any upstream completes + +**cancels** when downstream cancels + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index c67034c23e..365fe3a2d3 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -245,6 +245,8 @@ the inputs in different ways. |Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.| |Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| |Source/Flow|@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.| +|Source/Flow|@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.| +|Source/Flow|@ref[zipLatestWith](Source-or-Flow/zipLatestWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.| |Source/Flow|@ref[zipWith](Source-or-Flow/zipWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream.| |Source/Flow|@ref[zipWithIndex](Source-or-Flow/zipWithIndex.md)|Zips elements of current flow with its indices.| @@ -306,7 +308,9 @@ Operators meant for inter-operating between Akka Streams and Actors: * [merge](Source-or-Flow/merge.md) * [mergeSorted](Source-or-Flow/mergeSorted.md) * [zip](Source-or-Flow/zip.md) +* [zipLatest](Source-or-Flow/zipLatest.md) * [zipWith](Source-or-Flow/zipWith.md) +* [zipLatestWith](Source-or-Flow/zipLatestWith.md) * [zipWithIndex](Source-or-Flow/zipWithIndex.md) * [map](Source-or-Flow/map.md) * [recover](Source-or-Flow/recover.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 6507fcbbc0..9eee038437 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -40,7 +40,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "zipLatestGraph", "zipLatestWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass → graphHelpers, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala new file mode 100644 index 0000000000..bc4bd05f4a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala @@ -0,0 +1,331 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.actor.ActorSystem +import akka.stream.testkit.TestPublisher.Probe +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.{ ActorMaterializer, ClosedShape } +import akka.testkit.TestKit +import org.scalacheck.Gen +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.prop.PropertyChecks +import org.scalatest.{ BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike } + +import scala.concurrent.duration._ +import scala.language.postfixOps + +class GraphZipLatestSpec + extends TestKit(ActorSystem("ZipLatestSpec")) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with PropertyChecks + with GivenWhenThen + with ScalaFutures { + implicit val materializer = ActorMaterializer() + override def afterAll = TestKit.shutdownActorSystem(system) + implicit val patience = PatienceConfig(5 seconds) + + "ZipLatest" must { + "only emit when at least one pair is available" in { + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("request for one element") + probe.request(1) + + And("an element pushed on one of the sources") + bools.sendNext(true) + + Then("does not emit yet") + probe.expectNoMessage(0 seconds) + + And("an element pushed on the other source") + ints.sendNext(1) + + Then("emits a single pair") + probe.expectNext((true, 1)) + } + + "emits as soon as one source is available" in { + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("request for 3 elements") + probe.request(3) + + And("a first element pushed on either source") + bools.sendNext(true) + ints.sendNext(1) + + And("then 2 elements pushed only on one source") + ints.sendNext(1) + ints.sendNext(1) + + Then("3 elements are emitted") + probe.expectNext((true, 1)) + probe.expectNext((true, 1)) + probe.expectNext((true, 1)) + } + + "does not emit the same pair upon two pulls with value types" in { + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("request for one element") + probe.request(1) + + And("one element pushed on each source") + bools.sendNext(true) + ints.sendNext(1) + + Then("emits a single pair") + probe.expectNext((true, 1)) + + And("another request") + probe.request(1) + + Then("does not emit a duplicate") + probe.expectNoMessage(0 seconds) + + And("sending complete") + bools.sendComplete() + + Then("completes the stream") + probe.expectComplete() + } + + "does not emit the same pair upon two pulls with reference types" in new Fixture { + val a = A(someString) + val b = B(someInt) + val (probe, as, bs) = testGraph[A, B] + + Given("request for one element") + probe.request(1) + + And("one element pushed on each source") + as.sendNext(a) + bs.sendNext(b) + + Then("emits a single pair") + probe.expectNext((a, b)) + + And("another request") + probe.request(1) + + Then("does not emit a duplicate") + probe.expectNoMessage(0 seconds) + + And("sending complete") + as.sendComplete() + + Then("completes the stream") + probe.expectComplete() + } + + "does not de-duplicate instances based on value" in new Fixture { + Given(""" + |S1 -> A1 A2 A3 --\ + | > -- ZipLatest + |S2 -> B1 B2 --/ + """.stripMargin) + val a1 = A(someString) + val a2 = A(someString) + val a3 = A(someString) + val b1 = B(someInt) + val b2 = B(someInt) + val (probe, as, bs) = testGraph[A, B] + + Then(""" + |O -> (A1, B1), (A2, B1), (A3, B1), (A3, B2) + """.stripMargin) + probe.request(4) + + as.sendNext(a1) + bs.sendNext(b1) + probe.expectNext((a1, b1)) + + as.sendNext(a2) + probe.expectNext((a2, b1)) + + as.sendNext(a3) + probe.expectNext((a3, b1)) + + bs.sendNext(b2) + probe.expectNext((a3, b2)) + } + + val first = (t: (Probe[Boolean], Probe[Int])) ⇒ t._1 + val second = (t: (Probe[Boolean], Probe[Int])) ⇒ t._2 + + "complete when either source completes" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("either source completes") + select((bools, ints)).sendComplete() + + Then("subscribes and completes") + probe.expectSubscriptionAndComplete() + } + } + + "complete when either source completes and requesting element" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("either source completes") + select((bools, ints)).sendComplete() + + And("request for one element") + probe.request(1) + + Then("subscribes and completes") + probe.expectComplete() + } + } + + "complete when either source completes with some pending element" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("one element pushed on each source") + bools.sendNext(true) + ints.sendNext(1) + + And("either source completes") + select((bools, ints)).sendComplete() + + Then("should emit first element then complete") + probe.requestNext((true, 1)) + probe.expectComplete() + } + } + + "complete when one source completes and the other continues pushing" in { + + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("one element pushed on each source") + bools.sendNext(true) + ints.sendNext(1) + + And("either source completes") + bools.sendComplete() + ints.sendNext(10) + ints.sendNext(10) + + Then("should emit first element then complete") + probe.requestNext((true, 1)) + probe.expectComplete() + } + + "complete if no pending demand" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("request for one element") + probe.request(1) + + Given("one element pushed on each source and tuple emitted") + bools.sendNext(true) + ints.sendNext(1) + probe.expectNext((true, 1)) + + And("either source completes") + select((bools, ints)).sendComplete() + + Then("should complete") + probe.expectComplete() + } + } + + "fail when either source has error" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + val error = new RuntimeException + + Given("either source errors") + select((bools, ints)).sendError(error) + + Then("subscribes and error") + probe.expectSubscriptionAndError(error) + } + } + + "emit even if pair is the same" in { + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("request for two elements") + probe.request(2) + + And("one element pushed on each source") + bools.sendNext(true) + ints.sendNext(1) + And("once again the same element on one source") + ints.sendNext(1) + + And("followed by complete") + bools.sendComplete() + ints.sendComplete() + + Then("emits two equal pairs") + probe.expectNext((true, 1)) + probe.expectNext((true, 1)) + + And("then complete") + probe.expectComplete() + } + + "emit combined elements in proper order" in { + val (probe, firstDigits, secondDigits) = testGraph[Int, Int] + + Given(s"numbers up to 99 in tuples") + val allNumbers = for { + firstDigit ← 0 to 9 + secondDigit ← 0 to 9 + } yield (firstDigit, secondDigit) + + allNumbers.groupBy(_._1).toList.sortBy(_._1).foreach { + case (firstDigit, pairs) ⇒ { + When(s"sending first digit $firstDigit") + firstDigits.sendNext(firstDigit) + pairs.map { case (_, digits) ⇒ digits }.foreach { secondDigit ⇒ + And(s"sending second digit $secondDigit") + secondDigits.sendNext(secondDigit) + probe.request(1) + + Then(s"should receive tuple ($firstDigit,$secondDigit)") + probe.expectNext((firstDigit, secondDigit)) + } + } + } + } + } + + private class Fixture { + val someString = "someString" + val someInt = 1 + case class A(value: String) + case class B(value: Int) + } + + private def testGraph[A, B] = + RunnableGraph + .fromGraph( + GraphDSL + .create( + TestSink.probe[(A, B)], + TestSource.probe[A], + TestSource.probe[B])(Tuple3.apply) { implicit b ⇒ (ts, as, bs) ⇒ + import GraphDSL.Implicits._ + val zipLatest = b.add(new ZipLatest[A, B]()) + as ~> zipLatest.in0 + bs ~> zipLatest.in1 + zipLatest.out ~> ts + ClosedShape + } + ) + .run() + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala new file mode 100644 index 0000000000..17422c9b02 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala @@ -0,0 +1,247 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream._ +import akka.stream.testkit._ +import akka.testkit.EventFilter +import org.reactivestreams.Publisher + +import scala.concurrent.duration._ +import scala.language.postfixOps + +class GraphZipLatestWithSpec extends TwoStreamsSetup { + import GraphDSL.Implicits._ + + override type Outputs = Int + + override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) { + val zip = b.add(ZipWith((_: Int) + (_: Int))) + override def left: Inlet[Int] = zip.in0 + override def right: Inlet[Int] = zip.in1 + override def out: Outlet[Int] = zip.out + } + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source + .fromPublisher(p1) + .zipLatestWith(Source.fromPublisher(p2))(_ + _) + .runWith(Sink.fromSubscriber(subscriber)) + subscriber + } + + "ZipLatestWith" must { + + "work in the happy case" in { + val upstreamProbe = TestPublisher.manualProbe[Int]() + val downstreamProbe = TestSubscriber.manualProbe[Outputs]() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b ⇒ + val zipLatest = b.add(ZipLatestWith((_: Int) + (_: Int))) + val never = Source.single(3).initialDelay(1 day) + Source(1 to 2).concat(never) ~> zipLatest.in0 + Source.fromPublisher(upstreamProbe) ~> zipLatest.in1 + zipLatest.out ~> Sink.fromSubscriber(downstreamProbe) + ClosedShape + }) + .run() + + val upstreamSubscription = upstreamProbe.expectSubscription() + val downstreamSubscription = downstreamProbe.expectSubscription() + + upstreamSubscription.sendNext(10) + downstreamSubscription.request(2) + downstreamProbe.expectNext(11) + downstreamProbe.expectNext(12) + + upstreamSubscription.sendNext(20) + downstreamSubscription.request(1) + downstreamProbe.expectNext(22) + + upstreamSubscription.sendNext(30) + downstreamSubscription.request(1) + downstreamProbe.expectNext(32) + + upstreamSubscription.sendComplete() + + downstreamProbe.expectComplete() + } + + "work in the sad case" in { + val probe = TestSubscriber.manualProbe[Outputs]() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b ⇒ + val zip = b.add(ZipLatestWith[Int, Int, Int]((_: Int) / (_: Int))) + val never = Source.single(2).initialDelay(1 day) + Source.single(1).concat(never) ~> zip.in0 + Source(-2 to 2) ~> zip.in1 + + zip.out ~> Sink.fromSubscriber(probe) + + ClosedShape + }) + .run() + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(1 / -2) + probe.expectNext(1 / -1) + + EventFilter[ArithmeticException](occurrences = 1).intercept { + subscription.request(2) + } + probe.expectError() match { + case a: java.lang.ArithmeticException ⇒ + a.getMessage should be("/ by zero") + } + probe.expectNoMsg(200.millis) + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one delayed completed and one nonempty publisher" in { + val subscriber1 = + setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = + setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in { + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + val subscription2 = subscriber2.expectSubscriptionAndError(TestException) + } + + "zipLatestWith a ETA expanded Person.apply (3 inputs)" in { + val upstreamProbe = TestPublisher.manualProbe[Int]() + val downstreamProbe = TestSubscriber.manualProbe[Person]() + + case class Person(name: String, surname: String, int: Int) + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b ⇒ + val zip = b.add(ZipLatestWith(Person.apply _)) + + Source.single("Caplin") ~> zip.in0 + Source.single("Capybara") ~> zip.in1 + Source.fromPublisher(upstreamProbe).take(1) ~> zip.in2 + + zip.out ~> Sink.fromSubscriber(downstreamProbe) + + ClosedShape + }) + .run() + + val downstreamSubscription = downstreamProbe.expectSubscription() + val upstreamSubscription = upstreamProbe.expectSubscription() + + downstreamSubscription.request(1) + upstreamSubscription.sendNext(3) + downstreamProbe.expectNext(Person("Caplin", "Capybara", 3)) + downstreamProbe.expectComplete() + } + + "work with up to 22 inputs" in { + val downstreamProbe = TestSubscriber.manualProbe[String]() + val upstreamProbe = TestPublisher.manualProbe[Int]() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b ⇒ + val sum22 = (v1: Int, + v2: String, + v3: Int, + v4: String, + v5: Int, + v6: String, + v7: Int, + v8: String, + v9: Int, + v10: String, + v11: Int, + v12: String, + v13: Int, + v14: String, + v15: Int, + v16: String, + v17: Int, + v18: String, + v19: Int, + v20: String, + v21: Int, + v22: String) ⇒ + v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 + + v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18 + v19 + v20 + v21 + v22 + + // odd input ports will be Int, even input ports will be String + val zip = b.add(ZipLatestWith(sum22)) + + Source.single(1) ~> zip.in0 + Source.single(2).map(_.toString) ~> zip.in1 + Source.single(3) ~> zip.in2 + Source.single(4).map(_.toString) ~> zip.in3 + Source.single(5) ~> zip.in4 + Source.single(6).map(_.toString) ~> zip.in5 + Source.single(7) ~> zip.in6 + Source.single(8).map(_.toString) ~> zip.in7 + Source.single(9) ~> zip.in8 + Source.single(10).map(_.toString) ~> zip.in9 + Source.single(11) ~> zip.in10 + Source.single(12).map(_.toString) ~> zip.in11 + Source.single(13) ~> zip.in12 + Source.single(14).map(_.toString) ~> zip.in13 + Source.single(15) ~> zip.in14 + Source.single(16).map(_.toString) ~> zip.in15 + Source.single(17) ~> zip.in16 + Source.single(18).map(_.toString) ~> zip.in17 + Source.single(19) ~> zip.in18 + Source.single(20).map(_.toString) ~> zip.in19 + Source.single(21) ~> zip.in20 + Source.fromPublisher(upstreamProbe).map(_.toString) ~> zip.in21 + + zip.out ~> Sink.fromSubscriber(downstreamProbe) + + ClosedShape + }) + .run() + + val downstreamSubscription = downstreamProbe.expectSubscription() + val upstreamSubscription = upstreamProbe.expectSubscription() + + downstreamSubscription.request(1) + upstreamSubscription.sendNext(22) + upstreamSubscription.sendComplete() + downstreamProbe.expectNext((1 to 22).mkString("")) + downstreamProbe.expectComplete() + + } + + } + +} diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template new file mode 100644 index 0000000000..a1363a6084 --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ +package akka.stream.javadsl + +import akka.stream._ +import akka.stream.scaladsl +import akka.japi.function +import akka.NotUsed + +/** + * Combine the elements of multiple streams into a stream of combined elements using a combiner function, + * picking always the latest of the elements of each source. + * + * No element is emitted until at least one element from each Source becomes available. Whenever a new + * element appears, the zipping function is invoked with a tuple containing the new element + * and the other last seen elements. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any of the upstreams completes + * + * '''Cancels when''' downstream cancels + */ +object ZipLatestWith { + + /** + * Create a new `ZipLatestWith` vertex with the specified input types and zipping-function `f`. + * + * @param f zipping-function from the input values to the output value + * @param attributes optional attributes for this vertex + */ + def create[A, B, Out](f: function.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], NotUsed] = + scaladsl.ZipLatestWith(f.apply _) + + [3..22#/** Create a new `ZipLatestWith` specialized for 1 inputs. + * + * @param f zipping-function from the input values to the output value + * @param attributes optional attributes for this vertex + */ + def create1[[#T1#], Out](f: function.Function1[[#T1#], Out]): Graph[FanInShape1[[#T1#], Out], NotUsed] = + scaladsl.ZipLatestWith(f.apply _)# + + ] + +} diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template new file mode 100644 index 0000000000..34c0d18d0c --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream._ +import akka.stream.impl.StreamLayout +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + +trait ZipLatestWithApply { + + [2..22#/** + * Create a new `ZipLatestWith` specialized for 1 inputs. + * + * @param zipper zipping-function from the input values to the output value + */ + def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipLatestWith1[[#A1#], O] = + new ZipLatestWith1(zipper) + # + + ] + +} + +[2..22#/** `ZipLatestWith` specialized for 1 inputs */ +class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] { + override def initialAttributes = Attributes.name("ZipLatestWith1") + override val shape: FanInShape1[[#A1#], O] = new FanInShape1[[#A1#], O]("ZipLatestWith1") + def out: Outlet[O] = shape.out + [#val in0: Inlet[A1] = shape.in0# + ] + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { outer => + // Without this field the completion signalling would take one extra pull + var willShutDown = false + + [#val inlet0 = new ZipLatestInlet(in0)# + ] + var waitingForTuple = false + var staleTupleValues = true + + override def preStart(): Unit = { + [#pull(in0)# + ] + } + + setHandler( + out, + new OutHandler { + override def onPull(): Unit = { + if (hasAllValues) { + if (staleTupleValues) { + waitingForTuple = true + } else { + pushOutput() + } + } else { + waitingForTuple = true + } + tryPullAllIfNeeded() + } + } + ) + + [#setHandler(in0, inlet0)# + ] + + private def hasAllValues = [#inlet0.hasValue#&&] + + private def pushOutput(): Unit = { + push(out, zipper([#inlet0.value#,])) + if (willShutDown) completeStage() + staleTupleValues = true + } + + private def tryPullAllIfNeeded(): Unit = { + [#if (!hasBeenPulled(in0)) { + tryPull(in0) + }# + ] + } + + private class ZipLatestInlet[T](in: Inlet[T]) extends InHandler { + var value: T = _ + var hasValue = false + + override def onPush() = { + value = outer.grab(in) + hasValue = true + outer.staleTupleValues = false + if (outer.waitingForTuple && outer.hasAllValues) { + outer.pushOutput() + outer.waitingForTuple = false + outer.tryPullAllIfNeeded() + } + } + + override def onUpstreamFinish(): Unit = { + if (outer.staleTupleValues) completeStage() + outer.willShutDown = true + } + } + } + + override def toString = "ZipLatestWith1" +} +# +] \ No newline at end of file diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index eaa5b71f9a..5698fa3a3d 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -53,3 +53,11 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTapMat") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTapMat") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTapMat") + +# zipLatest +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.zipLatestMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.zipLatestWithMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestGraph") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatest") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index ec745f864e..8de10338e2 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -79,6 +79,7 @@ import akka.stream._ val wireTap = name("wireTap") val balance = name("balance") val zip = name("zip") + val zipLatest = name("zipLatest") val zipN = name("zipN") val zipWithN = name("zipWithN") val zipWithIndex = name("zipWithIndex") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c87996b332..38bb553134 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2442,6 +2442,46 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr } })), matF) + /** + * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. + * + * A `ZipLatest` has a `left` and a `right` input port and one `out` port. + * + * No element is emitted until at least one element from each Source becomes available. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatest[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out Pair T, Mat] = + zipLatestMat(source, Keep.left) + + /** + * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#zipLatest]] + */ + def zipLatestMat[T, M, M2]( + that: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out Pair T, M2] = + this.viaMat(Flow.fromGraph(GraphDSL.create( + that, + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out Pair T]] { + def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out Pair T] = { + val zip: FanInShape2[Out, T, Out Pair T] = b.add(ZipLatest.create[Out, T]) + b.from(s).toInlet(zip.in1) + FlowShape(zip.in0, zip.out) + } + })), matF) + /** * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. @@ -2474,6 +2514,43 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** + * Combine the elements of multiple streams into a stream of combined elements using a combiner function, + * picking always the latest of the elements of each source. + * + * No element is emitted until at least one element from each Source becomes available. Whenever a new + * element appears, the zipping function is invoked with a tuple containing the new element + * and the other last seen elements. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any of the upstreams completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatestWith[Out2, Out3]( + that: Graph[SourceShape[Out2], _], + combine: function.Function2[Out, Out2, Out3]): javadsl.Flow[In, Out3, Mat] = + new Flow(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine))) + + /** + * Put together the elements of current [[Flow]] and the given [[Source]] + * into a stream of combined elements using a combiner function, picking always the latest element of each. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#zipLatestWith]] + */ + def zipLatestWithMat[Out2, Out3, M, M2]( + that: Graph[SourceShape[Out2], M], + combine: function.Function2[Out, Out2, Out3], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = + new Flow(delegate.zipLatestWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** * Combine the elements of current flow into a stream of tuples consisting * of all elements paired with their index. Indices start at 0. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 5eec2f66d7..526b140b8d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -349,6 +349,35 @@ object Zip { new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } } +/** + * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. + * + * A `Zip` has a `left` and a `right` input port and one `out` port + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ +object ZipLatest { + import akka.japi.function.Function2 + import akka.japi.Pair + + /** + * Create a new `ZipLatest` operator with the specified input types and zipping-function + * which creates `akka.japi.Pair`s. + */ + def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] = + ZipLatestWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]]) + + private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = + new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } +} + /** * Combine the elements of multiple streams into a stream of lists. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 09d74df339..b2503b2501 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1077,6 +1077,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.create[Out].zipMat(that, Keep.right[NotUsed, M]), matF) + /** + * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. + * + * A `ZipLatest` has a `left` and a `right` input port and one `out` port. + * + * No element is emitted until at least one element from each Source becomes available. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatest[T](that: Graph[SourceShape[T], _]): javadsl.Source[Out @uncheckedVariance Pair T, Mat] = + zipLatestMat(that, Keep.left) + + /** + * Combine the elements of current [[Source]] and the given one into a stream of tuples, picking always the latest element of each. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#zipLatest]]. + */ + def zipLatestMat[T, M, M2]( + that: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = + this.viaMat(Flow.create[Out].zipLatestMat(that, Keep.right[NotUsed, M]), matF) + /** * Put together the elements of current [[Source]] and the given one * into a stream of combined elements using a combiner function. @@ -1109,6 +1141,44 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] = new Source(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** + * Combine the elements of multiple streams into a stream of combined elements using a combiner function, + * picking always the latest of the elements of each source. + * + * No element is emitted until at least one element from each Source becomes available. Whenever a new + * element appears, the zipping function is invoked with a tuple containing the new element + * and the other last seen elements. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any of the upstreams completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatestWith[Out2, Out3]( + that: Graph[SourceShape[Out2], _], + combine: function.Function2[Out, Out2, Out3]): javadsl.Source[Out3, Mat] = + new Source(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine))) + + /** + * Put together the elements of current [[Source]] and the given one + * into a stream of combined elements using a combiner function, + * picking always the latest of the elements of each source. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#zipLatestWith]]. + */ + def zipLatestWithMat[Out2, Out3, M, M2]( + that: Graph[SourceShape[Out2], M], + combine: function.Function2[Out, Out2, Out3], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] = + new Source(delegate.zipLatestWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** * Combine the elements of current [[Source]] into a stream of tuples consisting * of all elements paired with their index. Indices start at 0. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 25f50d7b79..e3918f82ce 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1497,6 +1497,21 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I def zip[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] = new SubFlow(delegate.zip(source).map { case (o, t) ⇒ akka.japi.Pair.create(o, t) }) + /** + * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatest[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] = + new SubFlow(delegate.zipLatest(source).map { case (o, t) ⇒ akka.japi.Pair.create(o, t) }) + /** * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. @@ -1514,6 +1529,24 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I combine: function.Function2[Out, Out2, Out3]): SubFlow[In, Out3, Mat] = new SubFlow(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine))) + /** + * Put together the elements of current [[Flow]] and the given [[Source]] + * into a stream of combined elements using a combiner function, picking always the latest element of each. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatestWith[Out2, Out3]( + that: Graph[SourceShape[Out2], _], + combine: function.Function2[Out, Out2, Out3]): SubFlow[In, Out3, Mat] = + new SubFlow(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine))) + /** * Combine the elements of current [[Flow]] into a stream of tuples consisting * of all elements paired with their index. Indices start at 0. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 1ed87d45a9..511b6fb02f 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1478,6 +1478,21 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O def zip[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] = new SubSource(delegate.zip(source).map { case (o, t) ⇒ akka.japi.Pair.create(o, t) }) + /** + * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatest[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] = + new SubSource(delegate.zipLatest(source).map { case (o, t) ⇒ akka.japi.Pair.create(o, t) }) + /** * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. @@ -1495,6 +1510,24 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O combine: function.Function2[Out, Out2, Out3]): SubSource[Out3, Mat] = new SubSource(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine))) + /** + * Put together the elements of current [[Flow]] and the given [[Source]] + * into a stream of combined elements using a combiner function, picking always the latest element of each. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatestWith[Out2, Out3]( + that: Graph[SourceShape[Out2], _], + combine: function.Function2[Out, Out2, Out3]): SubSource[Out3, Mat] = + new SubSource(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine))) + /** * Combine the elements of current [[Source]] into a stream of tuples consisting * of all elements paired with their index. Indices start at 0. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e791ee660c..daa1012536 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2354,6 +2354,31 @@ trait FlowOps[+Out, +Mat] { FlowShape(zip.in0, zip.out) } + /** + * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. + * + * A `ZipLatest` has a `left` and a `right` input port and one `out` port. + * + * No element is emitted until at least one element from each Source becomes available. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatest[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)] = via(zipLatestGraph(that)) + + protected def zipLatestGraph[U, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] = + GraphDSL.create(that) { implicit b ⇒ r ⇒ + val zip = b.add(ZipLatest[Out, U]()) + r ~> zip.in1 + FlowShape(zip.in0, zip.out) + } + /** * Put together the elements of current flow and the given [[Source]] * into a stream of combined elements using a combiner function. @@ -2376,6 +2401,33 @@ trait FlowOps[+Out, +Mat] { FlowShape(zip.in0, zip.out) } + /** + * Combine the elements of multiple streams into a stream of combined elements using a combiner function, + * picking always the latest of the elements of each source. + * + * No element is emitted until at least one element from each Source becomes available. Whenever a new + * element appears, the zipping function is invoked with a tuple containing the new element + * and the other last seen elements. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any of the upstreams completes + * + * '''Cancels when''' downstream cancels + */ + def zipLatestWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) ⇒ Out3): Repr[Out3] = + via(zipLatestWithGraph(that)(combine)) + + protected def zipLatestWithGraph[Out2, Out3, M](that: Graph[SourceShape[Out2], M])(combine: (Out, Out2) ⇒ Out3): Graph[FlowShape[Out @uncheckedVariance, Out3], M] = + GraphDSL.create(that) { implicit b ⇒ r ⇒ + val zip = b.add(ZipLatestWith[Out, Out2, Out3](combine)) + r ~> zip.in1 + FlowShape(zip.in0, zip.out) + } + /** * Combine the elements of current flow into a stream of tuples consisting * of all elements paired with their index. Indices start at 0. @@ -2798,6 +2850,30 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) ⇒ Out3)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out3, Mat3] = viaMat(zipWithGraph(that)(combine))(matF) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples, + * picking always the latest of the elements of each source. + * + * @see [[#zipLatest]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def zipLatestMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[(Out, U), Mat3] = + viaMat(zipLatestGraph(that))(matF) + + /** + * Put together the elements of current flow and the given [[Source]] + * into a stream of combined elements using a combiner function, picking always the latest of the elements of each source. + * + * @see [[#zipLatestWith]]. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def zipLatestWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) ⇒ Out3)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out3, Mat3] = + viaMat(zipLatestWithGraph(that)(combine))(matF) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 1416e72c88..3ce78dce39 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -957,6 +957,33 @@ final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply) { override def toString = "Zip" } +object ZipLatest { + /** + * Create a new `ZipLatest`. + */ + def apply[A, B](): ZipLatest[A, B] = new ZipLatest() +} + +/** + * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. + * + * A `ZipLatest` has a `left` and a `right` input port and one `out` port. + * + * No element is emitted until at least one element from each Source becomes available. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ +final class ZipLatest[A, B] extends ZipLatestWith2[A, B, (A, B)](Tuple2.apply) { + override def toString = "ZipLatest" +} + /** * Combine the elements of multiple streams into a stream of combined elements using a combiner function. * @@ -970,6 +997,25 @@ final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply) { */ object ZipWith extends ZipWithApply +/** + * Combine the elements of multiple streams into a stream of combined elements using a combiner function, + * picking always the latest of the elements of each source. + * + * No element is emitted until at least one element from each Source becomes available. Whenever a new + * element appears, the zipping function is invoked with a tuple containing the new element + * and the other last seen elements. + * + * '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes + * available on either of the inputs + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any of the upstreams completes + * + * '''Cancels when''' downstream cancels + */ +object ZipLatestWith extends ZipLatestWithApply + /** * Takes a stream of pair elements and splits each pair to two output streams. *