Complete ZipLatest when upstream completes without emitting #26711

This commit is contained in:
Johan Andrén 2019-05-29 08:43:09 +02:00 committed by GitHub
parent e6cdb01aa1
commit fd45fcf58e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 118 deletions

View file

@ -4,141 +4,127 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.testkit.TestPublisher.Probe import akka.stream.testkit.TestPublisher.Probe
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.{ ActorMaterializer, ClosedShape } import akka.stream.{ ActorMaterializer, ClosedShape }
import akka.testkit.TestKit import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.scalacheck.Gen import org.scalacheck.Gen
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import org.scalatest.prop.PropertyChecks import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
import org.scalatest.{ BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.postfixOps import scala.language.postfixOps
class GraphZipLatestSpec object GraphZipLatestSpec {
extends TestKit(ActorSystem("ZipLatestSpec")) val someString = "someString"
with WordSpecLike val someInt = 1
with Matchers }
with BeforeAndAfterAll
with PropertyChecks class GraphZipLatestSpec extends StreamSpec with ScalaCheckPropertyChecks with ScalaFutures {
with GivenWhenThen import GraphZipLatestSpec._
with ScalaFutures {
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
override def afterAll = TestKit.shutdownActorSystem(system)
implicit val patience = PatienceConfig(5 seconds)
"ZipLatest" must { "ZipLatest" must {
"only emit when at least one pair is available" in { "only emit when at least one pair is available" in assertAllStagesStopped {
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1) probe.request(1)
And("an element pushed on one of the sources") // an element pushed on one of the sources
bools.sendNext(true) bools.sendNext(true)
Then("does not emit yet") // does not emit yet
probe.expectNoMessage(0 seconds) probe.expectNoMessage(0 seconds)
And("an element pushed on the other source") // an element pushed on the other source
ints.sendNext(1) ints.sendNext(1)
Then("emits a single pair") // emits a single pair
probe.expectNext((true, 1)) probe.expectNext((true, 1))
probe.cancel()
} }
"emits as soon as one source is available" in { "emits as soon as one source is available" in assertAllStagesStopped {
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for 3 elements")
probe.request(3) probe.request(3)
And("a first element pushed on either source") // a first element pushed on either source
bools.sendNext(true) bools.sendNext(true)
ints.sendNext(1) ints.sendNext(1)
And("then 2 elements pushed only on one source") // then 2 elements pushed only on one source
ints.sendNext(1) ints.sendNext(1)
ints.sendNext(1) ints.sendNext(1)
Then("3 elements are emitted") // 3 elements are emitted
probe.expectNext((true, 1)) probe.expectNext((true, 1))
probe.expectNext((true, 1)) probe.expectNext((true, 1))
probe.expectNext((true, 1)) probe.expectNext((true, 1))
probe.cancel()
} }
"does not emit the same pair upon two pulls with value types" in { "does not emit the same pair upon two pulls with value types" in assertAllStagesStopped {
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1) probe.request(1)
And("one element pushed on each source") // one element pushed on each source
bools.sendNext(true) bools.sendNext(true)
ints.sendNext(1) ints.sendNext(1)
Then("emits a single pair") // emits a single pair
probe.expectNext((true, 1)) probe.expectNext((true, 1))
And("another request") // another request does not emit a duplicate
probe.request(1) probe.request(1)
Then("does not emit a duplicate")
probe.expectNoMessage(0 seconds) probe.expectNoMessage(0 seconds)
And("sending complete")
bools.sendComplete() bools.sendComplete()
Then("completes the stream")
probe.expectComplete() probe.expectComplete()
} }
"does not emit the same pair upon two pulls with reference types" in new Fixture { "does not emit the same pair upon two pulls with reference types" in assertAllStagesStopped {
val a = A(someString) val a = someString
val b = B(someInt) val b = Some(someInt)
val (probe, as, bs) = testGraph[A, B] val (probe, as, bs) = testGraph[String, Option[Int]]
Given("request for one element")
probe.request(1) probe.request(1)
And("one element pushed on each source") // one element pushed on each source
as.sendNext(a) as.sendNext(a)
bs.sendNext(b) bs.sendNext(b)
Then("emits a single pair") // emits a single pair
probe.expectNext((a, b)) probe.expectNext((a, b))
And("another request") // another request does not emit a duplicate
probe.request(1) probe.request(1)
Then("does not emit a duplicate")
probe.expectNoMessage(0 seconds) probe.expectNoMessage(0 seconds)
And("sending complete")
as.sendComplete() as.sendComplete()
Then("completes the stream")
probe.expectComplete() probe.expectComplete()
} }
"does not de-duplicate instances based on value" in new Fixture { "does not de-duplicate instances based on value" in assertAllStagesStopped {
Given(""" /*
|S1 -> A1 A2 A3 --\ S1 -> A1 A2 A3 --\
| > -- ZipLatest > -- ZipLatest
|S2 -> B1 B2 --/ S2 -> B1 B2 --/
""".stripMargin) */
val a1 = A(someString)
val a2 = A(someString)
val a3 = A(someString)
val b1 = B(someInt)
val b2 = B(someInt)
val (probe, as, bs) = testGraph[A, B]
Then(""" val a1 = someString
|O -> (A1, B1), (A2, B1), (A3, B1), (A3, B2) val a2 = someString
""".stripMargin) val a3 = someString
val b1 = someInt
val b2 = someInt
val (probe, as, bs) = testGraph[String, Int]
// O -> (A1, B1), (A2, B1), (A3, B1), (A3, B2)
probe.request(4) probe.request(4)
as.sendNext(a1) as.sendNext(a1)
@ -153,134 +139,129 @@ class GraphZipLatestSpec
bs.sendNext(b2) bs.sendNext(b2)
probe.expectNext((a3, b2)) probe.expectNext((a3, b2))
probe.cancel()
} }
val first = (t: (Probe[Boolean], Probe[Int])) => t._1 val first = (t: (Probe[Boolean], Probe[Int])) => t._1
val second = (t: (Probe[Boolean], Probe[Int])) => t._2 val second = (t: (Probe[Boolean], Probe[Int])) => t._2
"complete when either source completes" in { "complete when either source completes" in assertAllStagesStopped {
forAll(Gen.oneOf(first, second)) { select => forAll(Gen.oneOf(first, second)) { select =>
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("either source completes")
select((bools, ints)).sendComplete() select((bools, ints)).sendComplete()
Then("subscribes and completes")
probe.expectSubscriptionAndComplete() probe.expectSubscriptionAndComplete()
} }
} }
"complete when either source completes and requesting element" in { "complete when either source completes and requesting element" in assertAllStagesStopped {
forAll(Gen.oneOf(first, second)) { select => forAll(Gen.oneOf(first, second)) { select =>
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("either source completes")
select((bools, ints)).sendComplete() select((bools, ints)).sendComplete()
And("request for one element")
probe.request(1) probe.request(1)
Then("subscribes and completes")
probe.expectComplete() probe.expectComplete()
} }
} }
"complete when either source completes with some pending element" in { "complete when either source completes with some pending element" in assertAllStagesStopped {
forAll(Gen.oneOf(first, second)) { select => forAll(Gen.oneOf(first, second)) { select =>
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("one element pushed on each source") // one element pushed on each source
bools.sendNext(true) bools.sendNext(true)
ints.sendNext(1) ints.sendNext(1)
And("either source completes") // either source completes
select((bools, ints)).sendComplete() select((bools, ints)).sendComplete()
Then("should emit first element then complete") // should emit first element then complete
probe.requestNext((true, 1)) probe.requestNext((true, 1))
probe.expectComplete() probe.expectComplete()
} }
} }
"complete when one source completes and the other continues pushing" in { // Reproducing #26711
"complete when one quickly completes without any elements" in {
// not sure quite why, but Source.empty does not trigger this, while Source(List()) does - race condition somehow
val immediatelyCompleting = Source.single(3).zipLatest(Source(List[Int]()))
immediatelyCompleting.runWith(Sink.seq).futureValue should ===(Seq[(Int, Int)]())
}
"complete when one source completes and the other continues pushing" in assertAllStagesStopped {
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("one element pushed on each source") // one element pushed on each source
bools.sendNext(true) bools.sendNext(true)
ints.sendNext(1) ints.sendNext(1)
And("either source completes") // either source completes
bools.sendComplete() bools.sendComplete()
ints.sendNext(10) ints.sendNext(10)
ints.sendNext(10) ints.sendNext(10)
Then("should emit first element then complete") // should emit first element then complete
probe.requestNext((true, 1)) probe.requestNext((true, 1))
probe.expectComplete() probe.expectComplete()
} }
"complete if no pending demand" in { "complete if no pending demand" in assertAllStagesStopped {
forAll(Gen.oneOf(first, second)) { select => forAll(Gen.oneOf(first, second)) { select =>
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for one element")
probe.request(1) probe.request(1)
Given("one element pushed on each source and tuple emitted") // one element pushed on each source and tuple emitted
bools.sendNext(true) bools.sendNext(true)
ints.sendNext(1) ints.sendNext(1)
probe.expectNext((true, 1)) probe.expectNext((true, 1))
And("either source completes") // either source completes
select((bools, ints)).sendComplete() select((bools, ints)).sendComplete()
Then("should complete") // should complete
probe.expectComplete() probe.expectComplete()
} }
} }
"fail when either source has error" in { "fail when either source has error" in assertAllStagesStopped {
forAll(Gen.oneOf(first, second)) { select => forAll(Gen.oneOf(first, second)) { select =>
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
val error = new RuntimeException val error = new RuntimeException
Given("either source errors")
select((bools, ints)).sendError(error) select((bools, ints)).sendError(error)
Then("subscribes and error")
probe.expectSubscriptionAndError(error) probe.expectSubscriptionAndError(error)
} }
} }
"emit even if pair is the same" in { "emit even if pair is the same" in assertAllStagesStopped {
val (probe, bools, ints) = testGraph[Boolean, Int] val (probe, bools, ints) = testGraph[Boolean, Int]
Given("request for two elements")
probe.request(2) probe.request(2)
And("one element pushed on each source") // one element pushed on each source
bools.sendNext(true) bools.sendNext(true)
ints.sendNext(1) ints.sendNext(1)
And("once again the same element on one source") // once again the same element on one source
ints.sendNext(1) ints.sendNext(1)
And("followed by complete") // followed by complete
bools.sendComplete() bools.sendComplete()
ints.sendComplete() ints.sendComplete()
Then("emits two equal pairs") // emits two equal pairs
probe.expectNext((true, 1)) probe.expectNext((true, 1))
probe.expectNext((true, 1)) probe.expectNext((true, 1))
And("then complete") // then completes
probe.expectComplete() probe.expectComplete()
} }
"emit combined elements in proper order" in { "emit combined elements in proper order" in assertAllStagesStopped {
val (probe, firstDigits, secondDigits) = testGraph[Int, Int] val (probe, firstDigits, secondDigits) = testGraph[Int, Int]
Given(s"numbers up to 99 in tuples") // numbers up to 99 in tuples
val allNumbers = for { val allNumbers = for {
firstDigit <- 0 to 9 firstDigit <- 0 to 9
secondDigit <- 0 to 9 secondDigit <- 0 to 9
@ -288,29 +269,20 @@ class GraphZipLatestSpec
allNumbers.groupBy(_._1).toList.sortBy(_._1).foreach { allNumbers.groupBy(_._1).toList.sortBy(_._1).foreach {
case (firstDigit, pairs) => { case (firstDigit, pairs) => {
When(s"sending first digit $firstDigit")
firstDigits.sendNext(firstDigit) firstDigits.sendNext(firstDigit)
pairs.map { case (_, digits) => digits }.foreach { secondDigit => pairs.map { case (_, digits) => digits }.foreach { secondDigit =>
And(s"sending second digit $secondDigit")
secondDigits.sendNext(secondDigit) secondDigits.sendNext(secondDigit)
probe.request(1) probe.request(1)
Then(s"should receive tuple ($firstDigit,$secondDigit)")
probe.expectNext((firstDigit, secondDigit)) probe.expectNext((firstDigit, secondDigit))
} }
} }
} }
probe.cancel()
} }
} }
private class Fixture { private def testGraph[A, B]: (TestSubscriber.Probe[(A, B)], TestPublisher.Probe[A], TestPublisher.Probe[B]) =
val someString = "someString"
val someInt = 1
case class A(value: String)
case class B(value: Int)
}
private def testGraph[A, B] =
RunnableGraph RunnableGraph
.fromGraph(GraphDSL.create(TestSink.probe[(A, B)], TestSource.probe[A], TestSource.probe[B])(Tuple3.apply) { .fromGraph(GraphDSL.create(TestSink.probe[(A, B)], TestSource.probe[A], TestSource.probe[B])(Tuple3.apply) {
implicit b => (ts, as, bs) => implicit b => (ts, as, bs) =>

View file

@ -32,12 +32,12 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { outer => override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { outer =>
// Without this field the completion signalling would take one extra pull // Without this field the completion signalling would take one extra pull
var willShutDown = false private var willShutDown = false
[#val inlet0 = new ZipLatestInlet(in0)# [#val inlet0 = new ZipLatestInlet(in0)#
] ]
var waitingForTuple = false private var waitingForTuple = false
var staleTupleValues = true private var staleTupleValues = true
override def preStart(): Unit = { override def preStart(): Unit = {
[#pull(in0)# [#pull(in0)#
@ -96,8 +96,8 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F
} }
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
if (outer.staleTupleValues) completeStage() if (!hasValue || outer.staleTupleValues) completeStage()
outer.willShutDown = true else outer.willShutDown = true
} }
} }
} }