diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 08ea91d654..1c84c1d2af 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -168,7 +168,11 @@ object TestPublisher { this } - def expectRequest(): Long = subscription.expectRequest() + def expectRequest(): Long = { + val requests = subscription.expectRequest() + pendingRequests += requests + requests + } def expectCancellation(): Self = { subscription.expectCancellation() diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala index 96664a8cca..48c91f7b35 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala @@ -59,5 +59,20 @@ class TestPublisherSubscriberSpec extends AkkaSpec { upstreamSubscription.sendComplete() } + "properly update pendingRequest in expectRequest" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(upstream).runWith(Sink.fromSubscriber(downstream)) + + downstream + .expectSubscription() + .request(10) + + upstream.expectRequest() should ===(10) + upstream.sendNext(1) + downstream.expectNext(1) + } + } }