From e54d2933bd7701e46beebfe255276567ff3188e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 27 Jun 2016 12:56:29 +0200 Subject: [PATCH] #19931 Track pending requests properly on expectRequest --- .../scala/akka/stream/testkit/StreamTestKit.scala | 6 +++++- .../testkit/TestPublisherSubscriberSpec.scala | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 08ea91d654..1c84c1d2af 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -168,7 +168,11 @@ object TestPublisher { this } - def expectRequest(): Long = subscription.expectRequest() + def expectRequest(): Long = { + val requests = subscription.expectRequest() + pendingRequests += requests + requests + } def expectCancellation(): Self = { subscription.expectCancellation() diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala index 96664a8cca..48c91f7b35 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala @@ -59,5 +59,20 @@ class TestPublisherSubscriberSpec extends AkkaSpec { upstreamSubscription.sendComplete() } + "properly update pendingRequest in expectRequest" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(upstream).runWith(Sink.fromSubscriber(downstream)) + + downstream + .expectSubscription() + .request(10) + + upstream.expectRequest() should ===(10) + upstream.sendNext(1) + downstream.expectNext(1) + } + } }