=str #18756: Fix Zip completion

This commit is contained in:
Endre Sándor Varga 2015-11-03 16:46:32 +01:00
parent 1378fedad0
commit 10e4c04e10
2 changed files with 146 additions and 1 deletions

View file

@ -7,6 +7,9 @@ import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream._
import scala.concurrent.Await
import scala.concurrent.duration._
class GraphZipSpec extends TwoStreamsSetup {
import FlowGraph.Implicits._
@ -50,6 +53,148 @@ class GraphZipSpec extends TwoStreamsSetup {
probe.expectComplete()
}
"complete if one side is available but other already completed" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[String]()
val completed = RunnableGraph.fromGraph(FlowGraph.create(Sink.ignore) { implicit b
out
val zip = b.add(Zip[Int, String]())
Source(upstream1) ~> zip.in0
Source(upstream2) ~> zip.in1
zip.out ~> out
ClosedShape
}).run()
upstream1.sendNext(1)
upstream1.sendNext(2)
upstream2.sendNext("A")
upstream2.sendComplete()
Await.ready(completed, 3.seconds)
upstream1.expectCancellation()
}
"complete even if no pending demand" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[String]()
val downstream = TestSubscriber.probe[(Int, String)]()
RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b
out
val zip = b.add(Zip[Int, String]())
Source(upstream1) ~> zip.in0
Source(upstream2) ~> zip.in1
zip.out ~> out
ClosedShape
}).run()
downstream.request(1)
upstream1.sendNext(1)
upstream2.sendNext("A")
downstream.expectNext((1, "A"))
upstream2.sendComplete()
downstream.expectComplete()
upstream1.expectCancellation()
}
"complete if both sides complete before requested with elements pending" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[String]()
val downstream = TestSubscriber.probe[(Int, String)]()
RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b
out
val zip = b.add(Zip[Int, String]())
Source(upstream1) ~> zip.in0
Source(upstream2) ~> zip.in1
zip.out ~> out
ClosedShape
}).run()
upstream1.sendNext(1)
upstream2.sendNext("A")
upstream1.sendComplete()
upstream2.sendComplete()
downstream.requestNext((1, "A"))
downstream.expectComplete()
upstream1.expectNoMsg(500.millis)
upstream2.expectNoMsg(500.millis)
}
"complete if one side complete before requested with elements pending" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[String]()
val downstream = TestSubscriber.probe[(Int, String)]()
RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b
out
val zip = b.add(Zip[Int, String]())
Source(upstream1) ~> zip.in0
Source(upstream2) ~> zip.in1
zip.out ~> out
ClosedShape
}).run()
upstream1.sendNext(1)
upstream1.sendNext(2)
upstream2.sendNext("A")
upstream1.sendComplete()
upstream2.sendComplete()
downstream.requestNext((1, "A"))
downstream.expectComplete()
upstream1.expectNoMsg(500.millis)
upstream2.expectNoMsg(500.millis)
}
"complete if one side complete before requested with elements pending 2" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[String]()
val downstream = TestSubscriber.probe[(Int, String)]()
RunnableGraph.fromGraph(FlowGraph.create(Sink(downstream)) { implicit b
out
val zip = b.add(Zip[Int, String]())
Source(upstream1) ~> zip.in0
Source(upstream2) ~> zip.in1
zip.out ~> out
ClosedShape
}).run()
downstream.ensureSubscription()
upstream1.sendNext(1)
upstream1.sendComplete()
downstream.expectNoMsg(500.millis)
upstream2.sendNext("A")
upstream2.sendComplete()
downstream.requestNext((1, "A"))
downstream.expectComplete()
upstream1.expectNoMsg(500.millis)
upstream2.expectNoMsg(500.millis)
}
commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {

View file

@ -42,7 +42,7 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape
}
override def onUpstreamFinish(): Unit = {
if (!isAvailable(out) || !isAvailable(in0)) completeStage()
if (!isAvailable(in0)) completeStage()
}
})#