diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala index 2e7b991d5a..f7e1bbe37e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala @@ -170,6 +170,58 @@ class GraphZipLatestSpec } } + "complete when either source completes and requesting element" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("either source completes") + select((bools, ints)).sendComplete() + + And("request for one element") + probe.request(1) + + Then("subscribes and completes") + probe.expectComplete() + } + } + + "complete when either source completes with some pending element" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("one element pushed on each source") + bools.sendNext(true) + ints.sendNext(1) + + And("either source completes") + select((bools, ints)).sendComplete() + + Then("should emit first element then complete") + probe.requestNext((true, 1)) + probe.expectComplete() + } + } + + "complete if no pending demand" in { + forAll(Gen.oneOf(first, second)) { select ⇒ + val (probe, bools, ints) = testGraph[Boolean, Int] + + Given("request for one element") + probe.request(1) + + Given("one element pushed on each source and tuple emitted") + bools.sendNext(true) + ints.sendNext(1) + probe.expectNext((true, 1)) + + And("either source completes") + select((bools, ints)).sendComplete() + + Then("should complete") + probe.expectComplete() + } + } + "fail when either source has error" in { forAll(Gen.oneOf(first, second)) { select ⇒ val (probe, bools, ints) = testGraph[Boolean, Int] diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template index 9828c41cde..25e481e1e7 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template @@ -37,7 +37,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ [#val inlet0 = new ZipLatestInlet(in0)# ] var waitingForTuple = false - var waitingForNextTuple = false + var staleTupleValues = true override def preStart(): Unit = { [#pull(in0)# @@ -49,14 +49,12 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ new OutHandler { override def onPull(): Unit = { if (hasAllValues) { - if (waitingForNextTuple) { + if (staleTupleValues) { waitingForTuple = true } else { pushOutput() } - } - else - { + } else { waitingForTuple = true } pullAllIfNeeded() @@ -72,12 +70,12 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ private def pushOutput(): Unit = { push(out, zipper([#inlet0.value#,])) if (willShutDown) completeStage() - waitingForNextTuple = true + staleTupleValues = true } private def pullAllIfNeeded(): Unit = { [#if (!hasBeenPulled(in0)) { - pull(in0) + tryPull(in0) }# ] } @@ -89,7 +87,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ override def onPush() = { value = outer.grab(in) hasValue = true - outer.waitingForNextTuple = false + outer.staleTupleValues = false if (outer.waitingForTuple && outer.hasAllValues) { outer.pushOutput() outer.waitingForTuple = false @@ -98,7 +96,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ } override def onUpstreamFinish(): Unit = { - if (!outer.isAvailable(in)) outer.completeStage() + if (outer.staleTupleValues) completeStage() outer.willShutDown = true } }