#19503 fix closed-pull in ResponseParsingMerge

also assert all stages stopped in HighLevelOutgoingConnectionSpec
This commit is contained in:
Roland Kuhn 2016-01-18 21:11:10 +01:00
parent dd388d838b
commit 2cca0788ed
2 changed files with 27 additions and 26 deletions

View file

@ -167,7 +167,6 @@ private[http] object OutgoingConnectionBlueprint {
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
val parser = rootParser.createShallowCopy()
var methodBypassCompleted = false
var waitingForMethod = true
setHandler(methodBypassInput, new InHandler {
@ -179,7 +178,6 @@ private[http] object OutgoingConnectionBlueprint {
}
override def onUpstreamFinish(): Unit =
if (waitingForMethod) completeStage()
else methodBypassCompleted = true
})
setHandler(dataInput, new InHandler {
@ -201,17 +199,16 @@ private[http] object OutgoingConnectionBlueprint {
setHandler(out, eagerTerminateOutput)
val getNextMethod = ()
if (methodBypassCompleted) completeStage()
else {
pull(methodBypassInput)
waitingForMethod = true
}
val getNextMethod = () {
waitingForMethod = true
if (isClosed(methodBypassInput)) completeStage()
else pull(methodBypassInput)
}
val getNextData = () {
waitingForMethod = false
if (!isClosed(dataInput)) pull(dataInput)
else completeStage()
if (isClosed(dataInput)) completeStage()
else pull(dataInput)
}
@tailrec def drainParser(current: ResponseOutput, b: ListBuffer[ResponseOutput] = ListBuffer.empty): Unit = {
@ -219,13 +216,10 @@ private[http] object OutgoingConnectionBlueprint {
if (output.nonEmpty) emit(out, output, andThen)
else andThen()
current match {
case NeedNextRequestMethod
e(b.result(), getNextMethod)
case StreamEnd
e(b.result(), () completeStage())
case NeedMoreData
e(b.result(), getNextData)
case x drainParser(parser.onPull(), b += x)
case NeedNextRequestMethod e(b.result(), getNextMethod)
case StreamEnd e(b.result(), () completeStage())
case NeedMoreData e(b.result(), getNextData)
case x drainParser(parser.onPull(), b += x)
}
}

View file

@ -11,16 +11,19 @@ import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import akka.http.scaladsl.{ Http, TestUtils }
import akka.http.scaladsl.model._
import akka.stream.testkit.Utils
import org.scalatest.concurrent.ScalaFutures
class HighLevelOutgoingConnectionSpec extends AkkaSpec {
class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures {
implicit val materializer = ActorMaterializer()
implicit val patience = PatienceConfig(1.second)
"The connection-level client implementation" should {
"be able to handle 100 pipelined requests across one connection" in {
"be able to handle 100 pipelined requests across one connection" in Utils.assertAllStagesStopped {
val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
Http().bindAndHandleSync(r HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse),
val binding = Http().bindAndHandleSync(r HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse),
serverHostName, serverPort)
val N = 100
@ -32,13 +35,14 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec {
.map { r val s = r.data.utf8String; log.debug(s); s.toInt }
.runFold(0)(_ + _)
Await.result(result, 10.seconds) shouldEqual N * (N + 1) / 2
result.futureValue(PatienceConfig(10.seconds)) shouldEqual N * (N + 1) / 2
binding.futureValue.unbind()
}
"be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in {
"be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in Utils.assertAllStagesStopped {
val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
Http().bindAndHandleSync(r HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse),
val binding = Http().bindAndHandleSync(r HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse),
serverHostName, serverPort)
val connFlow = Http().outgoingConnection(serverHostName, serverPort)
@ -64,12 +68,14 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec {
.map { r val s = r.data.utf8String; log.debug(s); s.toInt }
.runFold(0)(_ + _)
Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2
result.futureValue(PatienceConfig(10.seconds)) shouldEqual C * N * (N + 1) / 2
binding.futureValue.unbind()
}
"catch response stream truncation" in {
"catch response stream truncation" in Utils.assertAllStagesStopped {
val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
Http().bindAndHandleSync({
val binding = Http().bindAndHandleSync({
case HttpRequest(_, Uri.Path("/b"), _, _, _) HttpResponse(headers = List(headers.Connection("close")))
case _ HttpResponse()
}, serverHostName, serverPort)
@ -81,6 +87,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec {
.runWith(Sink.head)
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 1.second)
binding.futureValue.unbind()
}
}
}