=htc #17344 Error-close client-side response stream if there are responses pending at connection end

This commit is contained in:
Mathias 2015-10-20 16:18:01 +02:00
parent 6e9ac310c0
commit ff7d644fd6
3 changed files with 23 additions and 6 deletions

View file

@ -76,7 +76,7 @@ private[http] object OutgoingConnectionBlueprint {
case (MessageStartError(_, info), _) throw IllegalResponseException(info)
}
BidiFlow() { implicit b
val core = BidiFlow() { implicit b
import FlowGraph.Implicits._
val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true))
val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser))
@ -104,6 +104,8 @@ private[http] object OutgoingConnectionBlueprint {
unwrapTls.inlet,
terminationFanout.out(1))
}
One2OneBidiFlow[HttpRequest, HttpResponse](-1) atop core
}
// a simple merge stage that simply forwards its first input and ignores its second input

View file

@ -6,14 +6,11 @@ package akka.http.impl.engine.client
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import akka.http.scaladsl.{ Http, TestUtils }
import akka.http.scaladsl.model._
import akka.http.impl.util._
class HighLevelOutgoingConnectionSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
@ -69,5 +66,21 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec {
Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2
}
"catch response stream truncation" in {
val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
Http().bindAndHandleSync({
case HttpRequest(_, Uri.Path("/b"), _, _, _) HttpResponse(headers = List(headers.Connection("close")))
case _ HttpResponse()
}, serverHostName, serverPort)
val x = Source(List("/a", "/b", "/c"))
.map(path HttpRequest(uri = path))
.via(Http().outgoingConnection(serverHostName, serverPort))
.grouped(10)
.runWith(Sink.head)
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 1.second)
}
}
}

View file

@ -238,7 +238,8 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
val InvalidContentLengthException(info) = netOut.expectError()
info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to 2 bytes less"
netInSub.sendComplete()
responses.expectComplete()
responsesSub.request(1)
responses.expectError(One2OneBidiFlow.OutputTruncationException)
}
"catch the entity stream being longer than the Content-Length" in new TestSetup {
@ -263,7 +264,8 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
val InvalidContentLengthException(info) = netOut.expectError()
info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to more bytes"
netInSub.sendComplete()
responses.expectComplete()
responsesSub.request(1)
responses.expectError(One2OneBidiFlow.OutputTruncationException)
}
"catch illegal response starts" in new TestSetup {