change semantics of ZipLatest slightly to account for pending first tuple with subsequent completion

This commit is contained in:
Jonas Chapuis 2018-10-02 18:44:01 +02:00
parent a098e0b743
commit 41c59fdf48
2 changed files with 59 additions and 9 deletions

View file

@ -170,6 +170,58 @@ class GraphZipLatestSpec
} }
} }
"complete when either source completes and requesting element" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("either source completes")
select((bools, ints)).sendComplete()
And("request for one element")
probe.request(1)
Then("subscribes and completes")
probe.expectComplete()
}
}
"complete when either source completes with some pending element" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("one element pushed on each source")
bools.sendNext(true)
ints.sendNext(1)
And("either source completes")
select((bools, ints)).sendComplete()
Then("should emit first element then complete")
probe.requestNext((true, 1))
probe.expectComplete()
}
}
"complete if no pending demand" in {
forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1)
Given("one element pushed on each source and tuple emitted")
bools.sendNext(true)
ints.sendNext(1)
probe.expectNext((true, 1))
And("either source completes")
select((bools, ints)).sendComplete()
Then("should complete")
probe.expectComplete()
}
}
"fail when either source has error" in { "fail when either source has error" in {
forAll(Gen.oneOf(first, second)) { select forAll(Gen.oneOf(first, second)) { select
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]

View file

@ -37,7 +37,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[
[#val inlet0 = new ZipLatestInlet(in0)# [#val inlet0 = new ZipLatestInlet(in0)#
] ]
var waitingForTuple = false var waitingForTuple = false
var waitingForNextTuple = false var staleTupleValues = true
override def preStart(): Unit = { override def preStart(): Unit = {
[#pull(in0)# [#pull(in0)#
@ -49,14 +49,12 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[
new OutHandler { new OutHandler {
override def onPull(): Unit = { override def onPull(): Unit = {
if (hasAllValues) { if (hasAllValues) {
if (waitingForNextTuple) { if (staleTupleValues) {
waitingForTuple = true waitingForTuple = true
} else { } else {
pushOutput() pushOutput()
} }
} } else {
else
{
waitingForTuple = true waitingForTuple = true
} }
pullAllIfNeeded() pullAllIfNeeded()
@ -72,12 +70,12 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[
private def pushOutput(): Unit = { private def pushOutput(): Unit = {
push(out, zipper([#inlet0.value#,])) push(out, zipper([#inlet0.value#,]))
if (willShutDown) completeStage() if (willShutDown) completeStage()
waitingForNextTuple = true staleTupleValues = true
} }
private def pullAllIfNeeded(): Unit = { private def pullAllIfNeeded(): Unit = {
[#if (!hasBeenPulled(in0)) { [#if (!hasBeenPulled(in0)) {
pull(in0) tryPull(in0)
}# }#
] ]
} }
@ -89,7 +87,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[
override def onPush() = { override def onPush() = {
value = outer.grab(in) value = outer.grab(in)
hasValue = true hasValue = true
outer.waitingForNextTuple = false outer.staleTupleValues = false
if (outer.waitingForTuple && outer.hasAllValues) { if (outer.waitingForTuple && outer.hasAllValues) {
outer.pushOutput() outer.pushOutput()
outer.waitingForTuple = false outer.waitingForTuple = false
@ -98,7 +96,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[
} }
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
if (!outer.isAvailable(in)) outer.completeStage() if (outer.staleTupleValues) completeStage()
outer.willShutDown = true outer.willShutDown = true
} }
} }