From ff7d644fd6077d81872e76ffd47226754ec26dbf Mon Sep 17 00:00:00 2001 From: Mathias Date: Tue, 20 Oct 2015 16:18:01 +0200 Subject: [PATCH] =htc #17344 Error-close client-side response stream if there are responses pending at connection end --- .../client/OutgoingConnectionBlueprint.scala | 4 +++- .../HighLevelOutgoingConnectionSpec.scala | 19 ++++++++++++++++--- .../LowLevelOutgoingConnectionSpec.scala | 6 ++++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 30c49f6737..40d2b6c51b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -76,7 +76,7 @@ private[http] object OutgoingConnectionBlueprint { case (MessageStartError(_, info), _) ⇒ throw IllegalResponseException(info) } - BidiFlow() { implicit b ⇒ + val core = BidiFlow() { implicit b ⇒ import FlowGraph.Implicits._ val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true)) val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser)) @@ -104,6 +104,8 @@ private[http] object OutgoingConnectionBlueprint { unwrapTls.inlet, terminationFanout.out(1)) } + + One2OneBidiFlow[HttpRequest, HttpResponse](-1) atop core } // a simple merge stage that simply forwards its first input and ignores its second input diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index acd61632b8..8e0630f2ea 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -6,14 +6,11 @@ package akka.http.impl.engine.client import scala.concurrent.Await import scala.concurrent.duration._ - import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec - import akka.http.scaladsl.{ Http, TestUtils } import akka.http.scaladsl.model._ -import akka.http.impl.util._ class HighLevelOutgoingConnectionSpec extends AkkaSpec { implicit val materializer = ActorMaterializer() @@ -69,5 +66,21 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec { Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2 } + + "catch response stream truncation" in { + val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() + Http().bindAndHandleSync({ + case HttpRequest(_, Uri.Path("/b"), _, _, _) ⇒ HttpResponse(headers = List(headers.Connection("close"))) + case _ ⇒ HttpResponse() + }, serverHostName, serverPort) + + val x = Source(List("/a", "/b", "/c")) + .map(path ⇒ HttpRequest(uri = path)) + .via(Http().outgoingConnection(serverHostName, serverPort)) + .grouped(10) + .runWith(Sink.head) + + a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 1.second) + } } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index fdcdc30e38..53df22424d 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -238,7 +238,8 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. val InvalidContentLengthException(info) = netOut.expectError() info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to 2 bytes less" netInSub.sendComplete() - responses.expectComplete() + responsesSub.request(1) + responses.expectError(One2OneBidiFlow.OutputTruncationException) } "catch the entity stream being longer than the Content-Length" in new TestSetup { @@ -263,7 +264,8 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. val InvalidContentLengthException(info) = netOut.expectError() info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to more bytes" netInSub.sendComplete() - responses.expectComplete() + responsesSub.request(1) + responses.expectError(One2OneBidiFlow.OutputTruncationException) } "catch illegal response starts" in new TestSetup {