diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala b/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala index 8b106b67e7..8c22fc1a60 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala @@ -81,7 +81,7 @@ private[http] object One2OneBidiFlow { push(out, element) if (pullSuppressed) { pullSuppressed = false - pull(in) + if (!isClosed(in)) pull(in) } } else throw new UnexpectedOutputException(element) } diff --git a/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala index 35093d7d74..75d5739c41 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala @@ -113,6 +113,34 @@ class One2OneBidiFlowSpec extends AkkaSpec { out.sendComplete() // To please assertAllStagesStopped } + + "not pull when input is closed before surpressed pull can be acted on" in assertAllStagesStopped { + val in = TestPublisher.probe[Int]() + val out = TestSubscriber.probe[Int]() + val wrappedIn = TestSubscriber.probe[Int]() + val wrappedOut = TestPublisher.probe[Int]() + + Source.fromPublisher(in).via( + One2OneBidiFlow(maxPending = 1) join Flow.fromSinkAndSource( + Sink.fromSubscriber(wrappedIn), + Source.fromPublisher(wrappedOut)) + ).runWith(Sink.fromSubscriber(out)) + + out.request(2) + wrappedOut.expectRequest() + wrappedIn.request(2) + in.expectRequest() + in.sendNext(1) + wrappedIn.expectNext(1) + // now we have reached the maxPending limit + in.sendComplete() + wrappedOut.sendNext(1) + out.expectNext(1) + wrappedIn.expectComplete() + wrappedOut.sendComplete() + out.expectComplete() + + } } class Test(maxPending: Int = -1) {