diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala index d138006e8e..f50329cd62 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestSpec.scala @@ -4,141 +4,127 @@ package akka.stream.scaladsl -import akka.actor.ActorSystem import akka.stream.testkit.TestPublisher.Probe import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.{ ActorMaterializer, ClosedShape } -import akka.testkit.TestKit +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import org.scalacheck.Gen import org.scalatest.concurrent.ScalaFutures -import org.scalatest.prop.PropertyChecks -import org.scalatest.{ BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike } +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks import scala.concurrent.duration._ import scala.language.postfixOps -class GraphZipLatestSpec - extends TestKit(ActorSystem("ZipLatestSpec")) - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with PropertyChecks - with GivenWhenThen - with ScalaFutures { +object GraphZipLatestSpec { + val someString = "someString" + val someInt = 1 +} + +class GraphZipLatestSpec extends StreamSpec with ScalaCheckPropertyChecks with ScalaFutures { + import GraphZipLatestSpec._ + implicit val materializer = ActorMaterializer() - override def afterAll = TestKit.shutdownActorSystem(system) - implicit val patience = PatienceConfig(5 seconds) "ZipLatest" must { - "only emit when at least one pair is available" in { + "only emit when at least one pair is available" in assertAllStagesStopped { val (probe, bools, ints) = testGraph[Boolean, Int] - Given("request for one element") probe.request(1) - And("an element pushed on one of the sources") + // an element pushed on one of the sources bools.sendNext(true) - Then("does not emit yet") + // does not emit yet probe.expectNoMessage(0 seconds) - And("an element pushed on the other source") + // an element pushed on the other source ints.sendNext(1) - Then("emits a single pair") + // emits a single pair probe.expectNext((true, 1)) + + probe.cancel() } - "emits as soon as one source is available" in { + "emits as soon as one source is available" in assertAllStagesStopped { val (probe, bools, ints) = testGraph[Boolean, Int] - Given("request for 3 elements") probe.request(3) - And("a first element pushed on either source") + // a first element pushed on either source bools.sendNext(true) ints.sendNext(1) - And("then 2 elements pushed only on one source") + // then 2 elements pushed only on one source ints.sendNext(1) ints.sendNext(1) - Then("3 elements are emitted") + // 3 elements are emitted probe.expectNext((true, 1)) probe.expectNext((true, 1)) probe.expectNext((true, 1)) + + probe.cancel() } - "does not emit the same pair upon two pulls with value types" in { + "does not emit the same pair upon two pulls with value types" in assertAllStagesStopped { val (probe, bools, ints) = testGraph[Boolean, Int] - Given("request for one element") probe.request(1) - And("one element pushed on each source") + // one element pushed on each source bools.sendNext(true) ints.sendNext(1) - Then("emits a single pair") + // emits a single pair probe.expectNext((true, 1)) - And("another request") + // another request does not emit a duplicate probe.request(1) - - Then("does not emit a duplicate") probe.expectNoMessage(0 seconds) - And("sending complete") bools.sendComplete() - - Then("completes the stream") probe.expectComplete() } - "does not emit the same pair upon two pulls with reference types" in new Fixture { - val a = A(someString) - val b = B(someInt) - val (probe, as, bs) = testGraph[A, B] + "does not emit the same pair upon two pulls with reference types" in assertAllStagesStopped { + val a = someString + val b = Some(someInt) + val (probe, as, bs) = testGraph[String, Option[Int]] - Given("request for one element") probe.request(1) - And("one element pushed on each source") + // one element pushed on each source as.sendNext(a) bs.sendNext(b) - Then("emits a single pair") + // emits a single pair probe.expectNext((a, b)) - And("another request") + // another request does not emit a duplicate probe.request(1) - - Then("does not emit a duplicate") probe.expectNoMessage(0 seconds) - And("sending complete") as.sendComplete() - - Then("completes the stream") probe.expectComplete() } - "does not de-duplicate instances based on value" in new Fixture { - Given(""" - |S1 -> A1 A2 A3 --\ - | > -- ZipLatest - |S2 -> B1 B2 --/ - """.stripMargin) - val a1 = A(someString) - val a2 = A(someString) - val a3 = A(someString) - val b1 = B(someInt) - val b2 = B(someInt) - val (probe, as, bs) = testGraph[A, B] + "does not de-duplicate instances based on value" in assertAllStagesStopped { + /* + S1 -> A1 A2 A3 --\ + > -- ZipLatest + S2 -> B1 B2 --/ + */ - Then(""" - |O -> (A1, B1), (A2, B1), (A3, B1), (A3, B2) - """.stripMargin) + val a1 = someString + val a2 = someString + val a3 = someString + val b1 = someInt + val b2 = someInt + val (probe, as, bs) = testGraph[String, Int] + + // O -> (A1, B1), (A2, B1), (A3, B1), (A3, B2) probe.request(4) as.sendNext(a1) @@ -153,134 +139,129 @@ class GraphZipLatestSpec bs.sendNext(b2) probe.expectNext((a3, b2)) + + probe.cancel() } val first = (t: (Probe[Boolean], Probe[Int])) => t._1 val second = (t: (Probe[Boolean], Probe[Int])) => t._2 - "complete when either source completes" in { + "complete when either source completes" in assertAllStagesStopped { forAll(Gen.oneOf(first, second)) { select => val (probe, bools, ints) = testGraph[Boolean, Int] - - Given("either source completes") select((bools, ints)).sendComplete() - - Then("subscribes and completes") probe.expectSubscriptionAndComplete() } } - "complete when either source completes and requesting element" in { + "complete when either source completes and requesting element" in assertAllStagesStopped { forAll(Gen.oneOf(first, second)) { select => val (probe, bools, ints) = testGraph[Boolean, Int] - - Given("either source completes") select((bools, ints)).sendComplete() - - And("request for one element") probe.request(1) - - Then("subscribes and completes") probe.expectComplete() } } - "complete when either source completes with some pending element" in { + "complete when either source completes with some pending element" in assertAllStagesStopped { forAll(Gen.oneOf(first, second)) { select => val (probe, bools, ints) = testGraph[Boolean, Int] - Given("one element pushed on each source") + // one element pushed on each source bools.sendNext(true) ints.sendNext(1) - And("either source completes") + // either source completes select((bools, ints)).sendComplete() - Then("should emit first element then complete") + // should emit first element then complete probe.requestNext((true, 1)) probe.expectComplete() } } - "complete when one source completes and the other continues pushing" in { + // Reproducing #26711 + "complete when one quickly completes without any elements" in { + // not sure quite why, but Source.empty does not trigger this, while Source(List()) does - race condition somehow + val immediatelyCompleting = Source.single(3).zipLatest(Source(List[Int]())) + + immediatelyCompleting.runWith(Sink.seq).futureValue should ===(Seq[(Int, Int)]()) + } + + "complete when one source completes and the other continues pushing" in assertAllStagesStopped { val (probe, bools, ints) = testGraph[Boolean, Int] - Given("one element pushed on each source") + // one element pushed on each source bools.sendNext(true) ints.sendNext(1) - And("either source completes") + // either source completes bools.sendComplete() ints.sendNext(10) ints.sendNext(10) - Then("should emit first element then complete") + // should emit first element then complete probe.requestNext((true, 1)) probe.expectComplete() } - "complete if no pending demand" in { + "complete if no pending demand" in assertAllStagesStopped { forAll(Gen.oneOf(first, second)) { select => val (probe, bools, ints) = testGraph[Boolean, Int] - Given("request for one element") probe.request(1) - Given("one element pushed on each source and tuple emitted") + // one element pushed on each source and tuple emitted bools.sendNext(true) ints.sendNext(1) probe.expectNext((true, 1)) - And("either source completes") + // either source completes select((bools, ints)).sendComplete() - Then("should complete") + // should complete probe.expectComplete() } } - "fail when either source has error" in { + "fail when either source has error" in assertAllStagesStopped { forAll(Gen.oneOf(first, second)) { select => val (probe, bools, ints) = testGraph[Boolean, Int] val error = new RuntimeException - Given("either source errors") select((bools, ints)).sendError(error) - - Then("subscribes and error") probe.expectSubscriptionAndError(error) } } - "emit even if pair is the same" in { + "emit even if pair is the same" in assertAllStagesStopped { val (probe, bools, ints) = testGraph[Boolean, Int] - Given("request for two elements") probe.request(2) - And("one element pushed on each source") + // one element pushed on each source bools.sendNext(true) ints.sendNext(1) - And("once again the same element on one source") + // once again the same element on one source ints.sendNext(1) - And("followed by complete") + // followed by complete bools.sendComplete() ints.sendComplete() - Then("emits two equal pairs") + // emits two equal pairs probe.expectNext((true, 1)) probe.expectNext((true, 1)) - And("then complete") + // then completes probe.expectComplete() } - "emit combined elements in proper order" in { + "emit combined elements in proper order" in assertAllStagesStopped { val (probe, firstDigits, secondDigits) = testGraph[Int, Int] - Given(s"numbers up to 99 in tuples") + // numbers up to 99 in tuples val allNumbers = for { firstDigit <- 0 to 9 secondDigit <- 0 to 9 @@ -288,29 +269,20 @@ class GraphZipLatestSpec allNumbers.groupBy(_._1).toList.sortBy(_._1).foreach { case (firstDigit, pairs) => { - When(s"sending first digit $firstDigit") firstDigits.sendNext(firstDigit) pairs.map { case (_, digits) => digits }.foreach { secondDigit => - And(s"sending second digit $secondDigit") secondDigits.sendNext(secondDigit) probe.request(1) - - Then(s"should receive tuple ($firstDigit,$secondDigit)") probe.expectNext((firstDigit, secondDigit)) } } } + + probe.cancel() } } - private class Fixture { - val someString = "someString" - val someInt = 1 - case class A(value: String) - case class B(value: Int) - } - - private def testGraph[A, B] = + private def testGraph[A, B]: (TestSubscriber.Probe[(A, B)], TestPublisher.Probe[A], TestPublisher.Probe[B]) = RunnableGraph .fromGraph(GraphDSL.create(TestSink.probe[(A, B)], TestSource.probe[A], TestSource.probe[B])(Tuple3.apply) { implicit b => (ts, as, bs) => diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template index 571ae00ad5..db9d9d15d4 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template @@ -32,12 +32,12 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { outer => // Without this field the completion signalling would take one extra pull - var willShutDown = false + private var willShutDown = false [#val inlet0 = new ZipLatestInlet(in0)# ] - var waitingForTuple = false - var staleTupleValues = true + private var waitingForTuple = false + private var staleTupleValues = true override def preStart(): Unit = { [#pull(in0)# @@ -96,8 +96,8 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F } override def onUpstreamFinish(): Unit = { - if (outer.staleTupleValues) completeStage() - outer.willShutDown = true + if (!hasValue || outer.staleTupleValues) completeStage() + else outer.willShutDown = true } } }