diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index a597aec773..0c10c3975f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -167,7 +167,6 @@ private[http] object OutgoingConnectionBlueprint { // each connection uses a single (private) response parser instance for all its responses // which builds a cache of all header instances seen on that connection val parser = rootParser.createShallowCopy() - var methodBypassCompleted = false var waitingForMethod = true setHandler(methodBypassInput, new InHandler { @@ -179,7 +178,6 @@ private[http] object OutgoingConnectionBlueprint { } override def onUpstreamFinish(): Unit = if (waitingForMethod) completeStage() - else methodBypassCompleted = true }) setHandler(dataInput, new InHandler { @@ -201,17 +199,16 @@ private[http] object OutgoingConnectionBlueprint { setHandler(out, eagerTerminateOutput) - val getNextMethod = () ⇒ - if (methodBypassCompleted) completeStage() - else { - pull(methodBypassInput) - waitingForMethod = true - } + val getNextMethod = () ⇒ { + waitingForMethod = true + if (isClosed(methodBypassInput)) completeStage() + else pull(methodBypassInput) + } val getNextData = () ⇒ { waitingForMethod = false - if (!isClosed(dataInput)) pull(dataInput) - else completeStage() + if (isClosed(dataInput)) completeStage() + else pull(dataInput) } @tailrec def drainParser(current: ResponseOutput, b: ListBuffer[ResponseOutput] = ListBuffer.empty): Unit = { @@ -219,13 +216,10 @@ private[http] object OutgoingConnectionBlueprint { if (output.nonEmpty) emit(out, output, andThen) else andThen() current match { - case NeedNextRequestMethod ⇒ - e(b.result(), getNextMethod) - case StreamEnd ⇒ - e(b.result(), () ⇒ completeStage()) - case NeedMoreData ⇒ - e(b.result(), getNextData) - case x ⇒ drainParser(parser.onPull(), b += x) + case NeedNextRequestMethod ⇒ e(b.result(), getNextMethod) + case StreamEnd ⇒ e(b.result(), () ⇒ completeStage()) + case NeedMoreData ⇒ e(b.result(), getNextData) + case x ⇒ drainParser(parser.onPull(), b += x) } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index 32c659eb2d..1004a7ae48 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -11,16 +11,19 @@ import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import akka.http.scaladsl.{ Http, TestUtils } import akka.http.scaladsl.model._ +import akka.stream.testkit.Utils +import org.scalatest.concurrent.ScalaFutures -class HighLevelOutgoingConnectionSpec extends AkkaSpec { +class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures { implicit val materializer = ActorMaterializer() + implicit val patience = PatienceConfig(1.second) "The connection-level client implementation" should { - "be able to handle 100 pipelined requests across one connection" in { + "be able to handle 100 pipelined requests across one connection" in Utils.assertAllStagesStopped { val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() - Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), + val binding = Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), serverHostName, serverPort) val N = 100 @@ -32,13 +35,14 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec { .map { r ⇒ val s = r.data.utf8String; log.debug(s); s.toInt } .runFold(0)(_ + _) - Await.result(result, 10.seconds) shouldEqual N * (N + 1) / 2 + result.futureValue(PatienceConfig(10.seconds)) shouldEqual N * (N + 1) / 2 + binding.futureValue.unbind() } - "be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in { + "be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in Utils.assertAllStagesStopped { val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() - Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), + val binding = Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), serverHostName, serverPort) val connFlow = Http().outgoingConnection(serverHostName, serverPort) @@ -64,12 +68,14 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec { .map { r ⇒ val s = r.data.utf8String; log.debug(s); s.toInt } .runFold(0)(_ + _) - Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2 + result.futureValue(PatienceConfig(10.seconds)) shouldEqual C * N * (N + 1) / 2 + binding.futureValue.unbind() } - "catch response stream truncation" in { + "catch response stream truncation" in Utils.assertAllStagesStopped { val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() - Http().bindAndHandleSync({ + + val binding = Http().bindAndHandleSync({ case HttpRequest(_, Uri.Path("/b"), _, _, _) ⇒ HttpResponse(headers = List(headers.Connection("close"))) case _ ⇒ HttpResponse() }, serverHostName, serverPort) @@ -81,6 +87,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec { .runWith(Sink.head) a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 1.second) + binding.futureValue.unbind() } } }