Http internal One2OneBidi pulling closed port #20944

This commit is contained in:
Johan Andrén 2016-08-30 12:50:12 +02:00 committed by GitHub
parent 8c98d6cd74
commit fbfc8b4c58
2 changed files with 29 additions and 1 deletions

View file

@ -81,7 +81,7 @@ private[http] object One2OneBidiFlow {
push(out, element)
if (pullSuppressed) {
pullSuppressed = false
pull(in)
if (!isClosed(in)) pull(in)
}
} else throw new UnexpectedOutputException(element)
}

View file

@ -113,6 +113,34 @@ class One2OneBidiFlowSpec extends AkkaSpec {
out.sendComplete() // To please assertAllStagesStopped
}
"not pull when input is closed before surpressed pull can be acted on" in assertAllStagesStopped {
val in = TestPublisher.probe[Int]()
val out = TestSubscriber.probe[Int]()
val wrappedIn = TestSubscriber.probe[Int]()
val wrappedOut = TestPublisher.probe[Int]()
Source.fromPublisher(in).via(
One2OneBidiFlow(maxPending = 1) join Flow.fromSinkAndSource(
Sink.fromSubscriber(wrappedIn),
Source.fromPublisher(wrappedOut))
).runWith(Sink.fromSubscriber(out))
out.request(2)
wrappedOut.expectRequest()
wrappedIn.request(2)
in.expectRequest()
in.sendNext(1)
wrappedIn.expectNext(1)
// now we have reached the maxPending limit
in.sendComplete()
wrappedOut.sendNext(1)
out.expectNext(1)
wrappedIn.expectComplete()
wrappedOut.sendComplete()
out.expectComplete()
}
}
class Test(maxPending: Int = -1) {