From 10e4c04e101f5a4edfdd1d44700b1bef600fd8a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 3 Nov 2015 16:46:32 +0100 Subject: [PATCH] =str #18756: Fix Zip completion --- .../akka/stream/scaladsl/GraphZipSpec.scala | 145 ++++++++++++++++++ .../scaladsl/ZipWithApply.scala.template | 2 +- 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala index 3e5443b37f..ec1da3f1de 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala @@ -7,6 +7,9 @@ import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream._ +import scala.concurrent.Await +import scala.concurrent.duration._ + class GraphZipSpec extends TwoStreamsSetup { import FlowGraph.Implicits._ @@ -50,6 +53,148 @@ class GraphZipSpec extends TwoStreamsSetup { probe.expectComplete() } + "complete if one side is available but other already completed" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[String]() + + val completed = RunnableGraph.fromGraph(FlowGraph.create(Sink.ignore) { implicit b ⇒ + out ⇒ + val zip = b.add(Zip[Int, String]()) + + Source(upstream1) ~> zip.in0 + Source(upstream2) ~> zip.in1 + zip.out ~> out + + ClosedShape + }).run() + + upstream1.sendNext(1) + upstream1.sendNext(2) + upstream2.sendNext("A") + upstream2.sendComplete() + + Await.ready(completed, 3.seconds) + upstream1.expectCancellation() + + } + + "complete even if no pending demand" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[String]() + val downstream = TestSubscriber.probe[(Int, String)]() + + RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b ⇒ + out ⇒ + val zip = b.add(Zip[Int, String]()) + + Source(upstream1) ~> zip.in0 + Source(upstream2) ~> zip.in1 + zip.out ~> out + + ClosedShape + }).run() + + downstream.request(1) + + upstream1.sendNext(1) + upstream2.sendNext("A") + downstream.expectNext((1, "A")) + + upstream2.sendComplete() + downstream.expectComplete() + upstream1.expectCancellation() + } + + "complete if both sides complete before requested with elements pending" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[String]() + val downstream = TestSubscriber.probe[(Int, String)]() + + RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b ⇒ + out ⇒ + val zip = b.add(Zip[Int, String]()) + + Source(upstream1) ~> zip.in0 + Source(upstream2) ~> zip.in1 + zip.out ~> out + + ClosedShape + }).run() + + upstream1.sendNext(1) + upstream2.sendNext("A") + + upstream1.sendComplete() + upstream2.sendComplete() + + downstream.requestNext((1, "A")) + downstream.expectComplete() + + upstream1.expectNoMsg(500.millis) + upstream2.expectNoMsg(500.millis) + } + + "complete if one side complete before requested with elements pending" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[String]() + val downstream = TestSubscriber.probe[(Int, String)]() + + RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b ⇒ + out ⇒ + val zip = b.add(Zip[Int, String]()) + + Source(upstream1) ~> zip.in0 + Source(upstream2) ~> zip.in1 + zip.out ~> out + + ClosedShape + }).run() + + upstream1.sendNext(1) + upstream1.sendNext(2) + upstream2.sendNext("A") + + upstream1.sendComplete() + upstream2.sendComplete() + + downstream.requestNext((1, "A")) + downstream.expectComplete() + + upstream1.expectNoMsg(500.millis) + upstream2.expectNoMsg(500.millis) + } + + "complete if one side complete before requested with elements pending 2" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[String]() + val downstream = TestSubscriber.probe[(Int, String)]() + + RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b ⇒ + out ⇒ + val zip = b.add(Zip[Int, String]()) + + Source(upstream1) ~> zip.in0 + Source(upstream2) ~> zip.in1 + zip.out ~> out + + ClosedShape + }).run() + + downstream.ensureSubscription() + + upstream1.sendNext(1) + upstream1.sendComplete() + downstream.expectNoMsg(500.millis) + + upstream2.sendNext("A") + upstream2.sendComplete() + downstream.requestNext((1, "A")) + downstream.expectComplete() + + upstream1.expectNoMsg(500.millis) + upstream2.expectNoMsg(500.millis) + } + commonTests() "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 03af15507f..fa8b30d93f 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -42,7 +42,7 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape } override def onUpstreamFinish(): Unit = { - if (!isAvailable(out) || !isAvailable(in0)) completeStage() + if (!isAvailable(in0)) completeStage() } })#