Merge pull request #25538 from jchapuis/zipLatest

zipLatest and zipLatestWith operators (refs #25392)
This commit is contained in:
Johan Andrén 2018-10-03 11:28:43 +02:00 committed by GitHub
commit 1d65159923
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1180 additions and 1 deletions

View file

@ -0,0 +1,33 @@
# zipLatest
Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.
@ref[Fan-in operators](../index.md#fan-in-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #zipLatest }
@@@
## Description
Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.
No element is emitted until at least one element from each Source becomes available.
@@@div { .callout }
**emits** when all of the inputs have at least an element available, and then each time an element becomes
available on either of the inputs
**backpressures** when downstream backpressures
**completes** when any upstream completes
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,34 @@
# zipLatestWith
Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.
@ref[Fan-in operators](../index.md#fan-in-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #zipLatestWith }
@@@
## Description
Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.
No element is emitted until at least one element from each Source becomes available. Whenever a new
element appears, the zipping function is invoked with a tuple containing the new element and the last seen element of the other stream.
@@@div { .callout }
**emits** all of the inputs have at least an element available, and then each time an element becomes
available on either of the inputs
**backpressures** when downstream backpressures
**completes** when any upstream completes
**cancels** when downstream cancels
@@@

View file

@ -245,6 +245,8 @@ the inputs in different ways.
|Source/Flow|<a name="orelse"></a>@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|Source/Flow|<a name="prepend"></a>@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|Source/Flow|<a name="zip"></a>@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|<a name="ziplatest"></a>@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.|
|Source/Flow|<a name="ziplatestwith"></a>@ref[zipLatestWith](Source-or-Flow/zipLatestWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.|
|Source/Flow|<a name="zipwith"></a>@ref[zipWith](Source-or-Flow/zipWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream.|
|Source/Flow|<a name="zipwithindex"></a>@ref[zipWithIndex](Source-or-Flow/zipWithIndex.md)|Zips elements of current flow with its indices.|
@ -306,7 +308,9 @@ Operators meant for inter-operating between Akka Streams and Actors:
* [merge](Source-or-Flow/merge.md)
* [mergeSorted](Source-or-Flow/mergeSorted.md)
* [zip](Source-or-Flow/zip.md)
* [zipLatest](Source-or-Flow/zipLatest.md)
* [zipWith](Source-or-Flow/zipWith.md)
* [zipLatestWith](Source-or-Flow/zipLatestWith.md)
* [zipWithIndex](Source-or-Flow/zipWithIndex.md)
* [map](Source-or-Flow/map.md)
* [recover](Source-or-Flow/recover.md)

View file

@ -40,7 +40,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph")
val graphHelpers = Set("zipGraph", "zipWithGraph", "zipLatestGraph", "zipLatestWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph")
val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass graphHelpers,

View file

@ -0,0 +1,331 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.testkit.TestPublisher.Probe
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.{ ActorMaterializer, ClosedShape }
import akka.testkit.TestKit
import org.scalacheck.Gen
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.PropertyChecks
import org.scalatest.{ BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike }
import scala.concurrent.duration._
import scala.language.postfixOps
class GraphZipLatestSpec
extends TestKit(ActorSystem("ZipLatestSpec"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with PropertyChecks
with GivenWhenThen
with ScalaFutures {
implicit val materializer = ActorMaterializer()
override def afterAll = TestKit.shutdownActorSystem(system)
implicit val patience = PatienceConfig(5 seconds)
"ZipLatest" must {
"only emit when at least one pair is available" in {
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1)
And("an element pushed on one of the sources")
bools.sendNext(true)
Then("does not emit yet")
probe.expectNoMessage(0 seconds)
And("an element pushed on the other source")
ints.sendNext(1)
Then("emits a single pair")
probe.expectNext((true, 1))
}
"emits as soon as one source is available" in {
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for 3 elements")
probe.request(3)
And("a first element pushed on either source")
bools.sendNext(true)
ints.sendNext(1)
And("then 2 elements pushed only on one source")
ints.sendNext(1)
ints.sendNext(1)
Then("3 elements are emitted")
probe.expectNext((true, 1))
probe.expectNext((true, 1))
probe.expectNext((true, 1))
}
"does not emit the same pair upon two pulls with value types" in {
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1)
And("one element pushed on each source")
bools.sendNext(true)
ints.sendNext(1)
Then("emits a single pair")
probe.expectNext((true, 1))
And("another request")
probe.request(1)
Then("does not emit a duplicate")
probe.expectNoMessage(0 seconds)
And("sending complete")
bools.sendComplete()
Then("completes the stream")
probe.expectComplete()
}
"does not emit the same pair upon two pulls with reference types" in new Fixture {
val a = A(someString)
val b = B(someInt)
val (probe, as, bs) = testGraph[A, B]
Given("request for one element")
probe.request(1)
And("one element pushed on each source")
as.sendNext(a)
bs.sendNext(b)
Then("emits a single pair")
probe.expectNext((a, b))
And("another request")
probe.request(1)
Then("does not emit a duplicate")
probe.expectNoMessage(0 seconds)
And("sending complete")
as.sendComplete()
Then("completes the stream")
probe.expectComplete()
}
"does not de-duplicate instances based on value" in new Fixture {
Given("""
|S1 -> A1 A2 A3 --\
| > -- ZipLatest
|S2 -> B1 B2 --/
""".stripMargin)
val a1 = A(someString)
val a2 = A(someString)
val a3 = A(someString)
val b1 = B(someInt)
val b2 = B(someInt)
val (probe, as, bs) = testGraph[A, B]
Then("""
|O -> (A1, B1), (A2, B1), (A3, B1), (A3, B2)
""".stripMargin)
probe.request(4)
as.sendNext(a1)
bs.sendNext(b1)
probe.expectNext((a1, b1))
as.sendNext(a2)
probe.expectNext((a2, b1))
as.sendNext(a3)
probe.expectNext((a3, b1))
bs.sendNext(b2)
probe.expectNext((a3, b2))
}
val first = (t: (Probe[Boolean], Probe[Int])) t._1
val second = (t: (Probe[Boolean], Probe[Int])) t._2
"complete when either source completes" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("either source completes")
select((bools, ints)).sendComplete()
Then("subscribes and completes")
probe.expectSubscriptionAndComplete()
}
}
"complete when either source completes and requesting element" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("either source completes")
select((bools, ints)).sendComplete()
And("request for one element")
probe.request(1)
Then("subscribes and completes")
probe.expectComplete()
}
}
"complete when either source completes with some pending element" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("one element pushed on each source")
bools.sendNext(true)
ints.sendNext(1)
And("either source completes")
select((bools, ints)).sendComplete()
Then("should emit first element then complete")
probe.requestNext((true, 1))
probe.expectComplete()
}
}
"complete when one source completes and the other continues pushing" in {
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("one element pushed on each source")
bools.sendNext(true)
ints.sendNext(1)
And("either source completes")
bools.sendComplete()
ints.sendNext(10)
ints.sendNext(10)
Then("should emit first element then complete")
probe.requestNext((true, 1))
probe.expectComplete()
}
"complete if no pending demand" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1)
Given("one element pushed on each source and tuple emitted")
bools.sendNext(true)
ints.sendNext(1)
probe.expectNext((true, 1))
And("either source completes")
select((bools, ints)).sendComplete()
Then("should complete")
probe.expectComplete()
}
}
"fail when either source has error" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
val error = new RuntimeException
Given("either source errors")
select((bools, ints)).sendError(error)
Then("subscribes and error")
probe.expectSubscriptionAndError(error)
}
}
"emit even if pair is the same" in {
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for two elements")
probe.request(2)
And("one element pushed on each source")
bools.sendNext(true)
ints.sendNext(1)
And("once again the same element on one source")
ints.sendNext(1)
And("followed by complete")
bools.sendComplete()
ints.sendComplete()
Then("emits two equal pairs")
probe.expectNext((true, 1))
probe.expectNext((true, 1))
And("then complete")
probe.expectComplete()
}
"emit combined elements in proper order" in {
val (probe, firstDigits, secondDigits) = testGraph[Int, Int]
Given(s"numbers up to 99 in tuples")
val allNumbers = for {
firstDigit 0 to 9
secondDigit 0 to 9
} yield (firstDigit, secondDigit)
allNumbers.groupBy(_._1).toList.sortBy(_._1).foreach {
case (firstDigit, pairs) {
When(s"sending first digit $firstDigit")
firstDigits.sendNext(firstDigit)
pairs.map { case (_, digits) digits }.foreach { secondDigit
And(s"sending second digit $secondDigit")
secondDigits.sendNext(secondDigit)
probe.request(1)
Then(s"should receive tuple ($firstDigit,$secondDigit)")
probe.expectNext((firstDigit, secondDigit))
}
}
}
}
}
private class Fixture {
val someString = "someString"
val someInt = 1
case class A(value: String)
case class B(value: Int)
}
private def testGraph[A, B] =
RunnableGraph
.fromGraph(
GraphDSL
.create(
TestSink.probe[(A, B)],
TestSource.probe[A],
TestSource.probe[B])(Tuple3.apply) { implicit b (ts, as, bs)
import GraphDSL.Implicits._
val zipLatest = b.add(new ZipLatest[A, B]())
as ~> zipLatest.in0
bs ~> zipLatest.in1
zipLatest.out ~> ts
ClosedShape
}
)
.run()
}

View file

@ -0,0 +1,247 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream._
import akka.stream.testkit._
import akka.testkit.EventFilter
import org.reactivestreams.Publisher
import scala.concurrent.duration._
import scala.language.postfixOps
class GraphZipLatestWithSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
override type Outputs = Int
override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) {
val zip = b.add(ZipWith((_: Int) + (_: Int)))
override def left: Inlet[Int] = zip.in0
override def right: Inlet[Int] = zip.in1
override def out: Outlet[Int] = zip.out
}
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source
.fromPublisher(p1)
.zipLatestWith(Source.fromPublisher(p2))(_ + _)
.runWith(Sink.fromSubscriber(subscriber))
subscriber
}
"ZipLatestWith" must {
"work in the happy case" in {
val upstreamProbe = TestPublisher.manualProbe[Int]()
val downstreamProbe = TestSubscriber.manualProbe[Outputs]()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b
val zipLatest = b.add(ZipLatestWith((_: Int) + (_: Int)))
val never = Source.single(3).initialDelay(1 day)
Source(1 to 2).concat(never) ~> zipLatest.in0
Source.fromPublisher(upstreamProbe) ~> zipLatest.in1
zipLatest.out ~> Sink.fromSubscriber(downstreamProbe)
ClosedShape
})
.run()
val upstreamSubscription = upstreamProbe.expectSubscription()
val downstreamSubscription = downstreamProbe.expectSubscription()
upstreamSubscription.sendNext(10)
downstreamSubscription.request(2)
downstreamProbe.expectNext(11)
downstreamProbe.expectNext(12)
upstreamSubscription.sendNext(20)
downstreamSubscription.request(1)
downstreamProbe.expectNext(22)
upstreamSubscription.sendNext(30)
downstreamSubscription.request(1)
downstreamProbe.expectNext(32)
upstreamSubscription.sendComplete()
downstreamProbe.expectComplete()
}
"work in the sad case" in {
val probe = TestSubscriber.manualProbe[Outputs]()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b
val zip = b.add(ZipLatestWith[Int, Int, Int]((_: Int) / (_: Int)))
val never = Source.single(2).initialDelay(1 day)
Source.single(1).concat(never) ~> zip.in0
Source(-2 to 2) ~> zip.in1
zip.out ~> Sink.fromSubscriber(probe)
ClosedShape
})
.run()
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext(1 / -2)
probe.expectNext(1 / -1)
EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
probe.expectError() match {
case a: java.lang.ArithmeticException
a.getMessage should be("/ by zero")
}
probe.expectNoMsg(200.millis)
}
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 =
setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 =
setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
val subscription2 = subscriber2.expectSubscriptionAndError(TestException)
}
"zipLatestWith a ETA expanded Person.apply (3 inputs)" in {
val upstreamProbe = TestPublisher.manualProbe[Int]()
val downstreamProbe = TestSubscriber.manualProbe[Person]()
case class Person(name: String, surname: String, int: Int)
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b
val zip = b.add(ZipLatestWith(Person.apply _))
Source.single("Caplin") ~> zip.in0
Source.single("Capybara") ~> zip.in1
Source.fromPublisher(upstreamProbe).take(1) ~> zip.in2
zip.out ~> Sink.fromSubscriber(downstreamProbe)
ClosedShape
})
.run()
val downstreamSubscription = downstreamProbe.expectSubscription()
val upstreamSubscription = upstreamProbe.expectSubscription()
downstreamSubscription.request(1)
upstreamSubscription.sendNext(3)
downstreamProbe.expectNext(Person("Caplin", "Capybara", 3))
downstreamProbe.expectComplete()
}
"work with up to 22 inputs" in {
val downstreamProbe = TestSubscriber.manualProbe[String]()
val upstreamProbe = TestPublisher.manualProbe[Int]()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b
val sum22 = (v1: Int,
v2: String,
v3: Int,
v4: String,
v5: Int,
v6: String,
v7: Int,
v8: String,
v9: Int,
v10: String,
v11: Int,
v12: String,
v13: Int,
v14: String,
v15: Int,
v16: String,
v17: Int,
v18: String,
v19: Int,
v20: String,
v21: Int,
v22: String)
v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 +
v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18 + v19 + v20 + v21 + v22
// odd input ports will be Int, even input ports will be String
val zip = b.add(ZipLatestWith(sum22))
Source.single(1) ~> zip.in0
Source.single(2).map(_.toString) ~> zip.in1
Source.single(3) ~> zip.in2
Source.single(4).map(_.toString) ~> zip.in3
Source.single(5) ~> zip.in4
Source.single(6).map(_.toString) ~> zip.in5
Source.single(7) ~> zip.in6
Source.single(8).map(_.toString) ~> zip.in7
Source.single(9) ~> zip.in8
Source.single(10).map(_.toString) ~> zip.in9
Source.single(11) ~> zip.in10
Source.single(12).map(_.toString) ~> zip.in11
Source.single(13) ~> zip.in12
Source.single(14).map(_.toString) ~> zip.in13
Source.single(15) ~> zip.in14
Source.single(16).map(_.toString) ~> zip.in15
Source.single(17) ~> zip.in16
Source.single(18).map(_.toString) ~> zip.in17
Source.single(19) ~> zip.in18
Source.single(20).map(_.toString) ~> zip.in19
Source.single(21) ~> zip.in20
Source.fromPublisher(upstreamProbe).map(_.toString) ~> zip.in21
zip.out ~> Sink.fromSubscriber(downstreamProbe)
ClosedShape
})
.run()
val downstreamSubscription = downstreamProbe.expectSubscription()
val upstreamSubscription = upstreamProbe.expectSubscription()
downstreamSubscription.request(1)
upstreamSubscription.sendNext(22)
upstreamSubscription.sendComplete()
downstreamProbe.expectNext((1 to 22).mkString(""))
downstreamProbe.expectComplete()
}
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl
import akka.stream._
import akka.stream.scaladsl
import akka.japi.function
import akka.NotUsed
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function,
* picking always the latest of the elements of each source.
*
* No element is emitted until at least one element from each Source becomes available. Whenever a new
* element appears, the zipping function is invoked with a tuple containing the new element
* and the other last seen elements.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any of the upstreams completes
*
* '''Cancels when''' downstream cancels
*/
object ZipLatestWith {
/**
* Create a new `ZipLatestWith` vertex with the specified input types and zipping-function `f`.
*
* @param f zipping-function from the input values to the output value
* @param attributes optional attributes for this vertex
*/
def create[A, B, Out](f: function.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], NotUsed] =
scaladsl.ZipLatestWith(f.apply _)
[3..22#/** Create a new `ZipLatestWith` specialized for 1 inputs.
*
* @param f zipping-function from the input values to the output value
* @param attributes optional attributes for this vertex
*/
def create1[[#T1#], Out](f: function.Function1[[#T1#], Out]): Graph[FanInShape1[[#T1#], Out], NotUsed] =
scaladsl.ZipLatestWith(f.apply _)#
]
}

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream._
import akka.stream.impl.StreamLayout
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
trait ZipLatestWithApply {
[2..22#/**
* Create a new `ZipLatestWith` specialized for 1 inputs.
*
* @param zipper zipping-function from the input values to the output value
*/
def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipLatestWith1[[#A1#], O] =
new ZipLatestWith1(zipper)
#
]
}
[2..22#/** `ZipLatestWith` specialized for 1 inputs */
class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] {
override def initialAttributes = Attributes.name("ZipLatestWith1")
override val shape: FanInShape1[[#A1#], O] = new FanInShape1[[#A1#], O]("ZipLatestWith1")
def out: Outlet[O] = shape.out
[#val in0: Inlet[A1] = shape.in0#
]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { outer =>
// Without this field the completion signalling would take one extra pull
var willShutDown = false
[#val inlet0 = new ZipLatestInlet(in0)#
]
var waitingForTuple = false
var staleTupleValues = true
override def preStart(): Unit = {
[#pull(in0)#
]
}
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
if (hasAllValues) {
if (staleTupleValues) {
waitingForTuple = true
} else {
pushOutput()
}
} else {
waitingForTuple = true
}
tryPullAllIfNeeded()
}
}
)
[#setHandler(in0, inlet0)#
]
private def hasAllValues = [#inlet0.hasValue#&&]
private def pushOutput(): Unit = {
push(out, zipper([#inlet0.value#,]))
if (willShutDown) completeStage()
staleTupleValues = true
}
private def tryPullAllIfNeeded(): Unit = {
[#if (!hasBeenPulled(in0)) {
tryPull(in0)
}#
]
}
private class ZipLatestInlet[T](in: Inlet[T]) extends InHandler {
var value: T = _
var hasValue = false
override def onPush() = {
value = outer.grab(in)
hasValue = true
outer.staleTupleValues = false
if (outer.waitingForTuple && outer.hasAllValues) {
outer.pushOutput()
outer.waitingForTuple = false
outer.tryPullAllIfNeeded()
}
}
override def onUpstreamFinish(): Unit = {
if (outer.staleTupleValues) completeStage()
outer.willShutDown = true
}
}
}
override def toString = "ZipLatestWith1"
}
#
]

View file

@ -53,3 +53,11 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTapMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTapMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTapMat")
# zipLatest
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.zipLatestMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.zipLatestWithMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestGraph")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatest")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith")

View file

@ -79,6 +79,7 @@ import akka.stream._
val wireTap = name("wireTap")
val balance = name("balance")
val zip = name("zip")
val zipLatest = name("zipLatest")
val zipN = name("zipN")
val zipWithN = name("zipWithN")
val zipWithIndex = name("zipWithIndex")

View file

@ -2442,6 +2442,46 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
}
})), matF)
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*
* A `ZipLatest` has a `left` and a `right` input port and one `out` port.
*
* No element is emitted until at least one element from each Source becomes available.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* * available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatest[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out Pair T, Mat] =
zipLatestMat(source, Keep.left)
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipLatest]]
*/
def zipLatestMat[T, M, M2](
that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out Pair T, M2] =
this.viaMat(Flow.fromGraph(GraphDSL.create(
that,
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out Pair T]] {
def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out Pair T] = {
val zip: FanInShape2[Out, T, Out Pair T] = b.add(ZipLatest.create[Out, T])
b.from(s).toInlet(zip.in1)
FlowShape(zip.in0, zip.out)
}
})), matF)
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
@ -2474,6 +2514,43 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] =
new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF)))
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function,
* picking always the latest of the elements of each source.
*
* No element is emitted until at least one element from each Source becomes available. Whenever a new
* element appears, the zipping function is invoked with a tuple containing the new element
* and the other last seen elements.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any of the upstreams completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatestWith[Out2, Out3](
that: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): javadsl.Flow[In, Out3, Mat] =
new Flow(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function, picking always the latest element of each.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipLatestWith]]
*/
def zipLatestWithMat[Out2, Out3, M, M2](
that: Graph[SourceShape[Out2], M],
combine: function.Function2[Out, Out2, Out3],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] =
new Flow(delegate.zipLatestWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF)))
/**
* Combine the elements of current flow into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.

View file

@ -349,6 +349,35 @@ object Zip {
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
}
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*
* A `Zip` has a `left` and a `right` input port and one `out` port
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
object ZipLatest {
import akka.japi.function.Function2
import akka.japi.Pair
/**
* Create a new `ZipLatest` operator with the specified input types and zipping-function
* which creates `akka.japi.Pair`s.
*/
def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] =
ZipLatestWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]])
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] =
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
}
/**
* Combine the elements of multiple streams into a stream of lists.
*

View file

@ -1077,6 +1077,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.create[Out].zipMat(that, Keep.right[NotUsed, M]), matF)
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*
* A `ZipLatest` has a `left` and a `right` input port and one `out` port.
*
* No element is emitted until at least one element from each Source becomes available.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* * available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatest[T](that: Graph[SourceShape[T], _]): javadsl.Source[Out @uncheckedVariance Pair T, Mat] =
zipLatestMat(that, Keep.left)
/**
* Combine the elements of current [[Source]] and the given one into a stream of tuples, picking always the latest element of each.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipLatest]].
*/
def zipLatestMat[T, M, M2](
that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.create[Out].zipLatestMat(that, Keep.right[NotUsed, M]), matF)
/**
* Put together the elements of current [[Source]] and the given one
* into a stream of combined elements using a combiner function.
@ -1109,6 +1141,44 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] =
new Source(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF)))
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function,
* picking always the latest of the elements of each source.
*
* No element is emitted until at least one element from each Source becomes available. Whenever a new
* element appears, the zipping function is invoked with a tuple containing the new element
* and the other last seen elements.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any of the upstreams completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatestWith[Out2, Out3](
that: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): javadsl.Source[Out3, Mat] =
new Source(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Put together the elements of current [[Source]] and the given one
* into a stream of combined elements using a combiner function,
* picking always the latest of the elements of each source.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipLatestWith]].
*/
def zipLatestWithMat[Out2, Out3, M, M2](
that: Graph[SourceShape[Out2], M],
combine: function.Function2[Out, Out2, Out3],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] =
new Source(delegate.zipLatestWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF)))
/**
* Combine the elements of current [[Source]] into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.

View file

@ -1497,6 +1497,21 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
def zip[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubFlow(delegate.zip(source).map { case (o, t) akka.japi.Pair.create(o, t) })
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatest[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubFlow(delegate.zipLatest(source).map { case (o, t) akka.japi.Pair.create(o, t) })
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
@ -1514,6 +1529,24 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
combine: function.Function2[Out, Out2, Out3]): SubFlow[In, Out3, Mat] =
new SubFlow(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function, picking always the latest element of each.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatestWith[Out2, Out3](
that: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): SubFlow[In, Out3, Mat] =
new SubFlow(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Combine the elements of current [[Flow]] into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.

View file

@ -1478,6 +1478,21 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
def zip[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubSource(delegate.zip(source).map { case (o, t) akka.japi.Pair.create(o, t) })
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatest[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubSource(delegate.zipLatest(source).map { case (o, t) akka.japi.Pair.create(o, t) })
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
@ -1495,6 +1510,24 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
combine: function.Function2[Out, Out2, Out3]): SubSource[Out3, Mat] =
new SubSource(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function, picking always the latest element of each.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatestWith[Out2, Out3](
that: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): SubSource[Out3, Mat] =
new SubSource(delegate.zipLatestWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Combine the elements of current [[Source]] into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.

View file

@ -2354,6 +2354,31 @@ trait FlowOps[+Out, +Mat] {
FlowShape(zip.in0, zip.out)
}
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*
* A `ZipLatest` has a `left` and a `right` input port and one `out` port.
*
* No element is emitted until at least one element from each Source becomes available.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatest[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)] = via(zipLatestGraph(that))
protected def zipLatestGraph[U, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] =
GraphDSL.create(that) { implicit b r
val zip = b.add(ZipLatest[Out, U]())
r ~> zip.in1
FlowShape(zip.in0, zip.out)
}
/**
* Put together the elements of current flow and the given [[Source]]
* into a stream of combined elements using a combiner function.
@ -2376,6 +2401,33 @@ trait FlowOps[+Out, +Mat] {
FlowShape(zip.in0, zip.out)
}
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function,
* picking always the latest of the elements of each source.
*
* No element is emitted until at least one element from each Source becomes available. Whenever a new
* element appears, the zipping function is invoked with a tuple containing the new element
* and the other last seen elements.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any of the upstreams completes
*
* '''Cancels when''' downstream cancels
*/
def zipLatestWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) Out3): Repr[Out3] =
via(zipLatestWithGraph(that)(combine))
protected def zipLatestWithGraph[Out2, Out3, M](that: Graph[SourceShape[Out2], M])(combine: (Out, Out2) Out3): Graph[FlowShape[Out @uncheckedVariance, Out3], M] =
GraphDSL.create(that) { implicit b r
val zip = b.add(ZipLatestWith[Out, Out2, Out3](combine))
r ~> zip.in1
FlowShape(zip.in0, zip.out)
}
/**
* Combine the elements of current flow into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.
@ -2798,6 +2850,30 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): ReprMat[Out3, Mat3] =
viaMat(zipWithGraph(that)(combine))(matF)
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples,
* picking always the latest of the elements of each source.
*
* @see [[#zipLatest]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def zipLatestMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[(Out, U), Mat3] =
viaMat(zipLatestGraph(that))(matF)
/**
* Put together the elements of current flow and the given [[Source]]
* into a stream of combined elements using a combiner function, picking always the latest of the elements of each source.
*
* @see [[#zipLatestWith]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def zipLatestWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): ReprMat[Out3, Mat3] =
viaMat(zipLatestWithGraph(that)(combine))(matF)
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.

View file

@ -957,6 +957,33 @@ final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply) {
override def toString = "Zip"
}
object ZipLatest {
/**
* Create a new `ZipLatest`.
*/
def apply[A, B](): ZipLatest[A, B] = new ZipLatest()
}
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*
* A `ZipLatest` has a `left` and a `right` input port and one `out` port.
*
* No element is emitted until at least one element from each Source becomes available.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* * available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
final class ZipLatest[A, B] extends ZipLatestWith2[A, B, (A, B)](Tuple2.apply) {
override def toString = "ZipLatest"
}
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function.
*
@ -970,6 +997,25 @@ final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply) {
*/
object ZipWith extends ZipWithApply
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function,
* picking always the latest of the elements of each source.
*
* No element is emitted until at least one element from each Source becomes available. Whenever a new
* element appears, the zipping function is invoked with a tuple containing the new element
* and the other last seen elements.
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
* available on either of the inputs
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any of the upstreams completes
*
* '''Cancels when''' downstream cancels
*/
object ZipLatestWith extends ZipLatestWithApply
/**
* Takes a stream of pair elements and splits each pair to two output streams.
*