Merge pull request #18580 from spray/wip-18539-mathias
=htc #18539 Fix off-by-one error in response entity parsing
This commit is contained in:
commit
7a070a3120
2 changed files with 34 additions and 23 deletions
|
|
@ -99,7 +99,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser:
|
||||||
summary = s"Response Content-Length of $contentLength bytes exceeds the configured limit of $maxContentLength bytes",
|
summary = s"Response Content-Length of $contentLength bytes exceeds the configured limit of $maxContentLength bytes",
|
||||||
detail = "Consider increasing the value of akka.http.client.parsing.max-content-length")
|
detail = "Consider increasing the value of akka.http.client.parsing.max-content-length")
|
||||||
else if (contentLength == 0) finishEmptyResponse()
|
else if (contentLength == 0) finishEmptyResponse()
|
||||||
else if (contentLength < input.size - bodyStart) {
|
else if (contentLength <= input.size - bodyStart) {
|
||||||
val cl = contentLength.toInt
|
val cl = contentLength.toInt
|
||||||
emitResponseStart(strictEntity(cth, input, bodyStart, cl))
|
emitResponseStart(strictEntity(cth, input, bodyStart, cl))
|
||||||
setCompletionHandling(HttpMessageParser.CompletionOk)
|
setCompletionHandling(HttpMessageParser.CompletionOk)
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,17 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
||||||
closeAfterResponseCompletion shouldEqual Seq(false)
|
closeAfterResponseCompletion shouldEqual Seq(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"a response with a simple body" in new Test {
|
||||||
|
collectBlocking(rawParse(GET,
|
||||||
|
prep {
|
||||||
|
"""HTTP/1.1 200 Ok
|
||||||
|
|Content-Length: 4
|
||||||
|
|
|
||||||
|
|ABCD"""
|
||||||
|
})) shouldEqual Seq(Right(HttpResponse(entity = "ABCD".getBytes)))
|
||||||
|
closeAfterResponseCompletion shouldEqual Seq(false)
|
||||||
|
}
|
||||||
|
|
||||||
"a response with a custom status code" in new Test {
|
"a response with a custom status code" in new Test {
|
||||||
override def parserSettings: ParserSettings =
|
override def parserSettings: ParserSettings =
|
||||||
super.parserSettings.withCustomStatusCodes(ServerOnTheMove)
|
super.parserSettings.withCustomStatusCodes(ServerOnTheMove)
|
||||||
|
|
@ -324,31 +335,31 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
||||||
def generalRawMultiParseTo(requestMethod: HttpMethod, expected: Either[ResponseOutput, HttpResponse]*): Matcher[Seq[String]] =
|
def generalRawMultiParseTo(requestMethod: HttpMethod, expected: Either[ResponseOutput, HttpResponse]*): Matcher[Seq[String]] =
|
||||||
equal(expected.map(strictEqualify))
|
equal(expected.map(strictEqualify))
|
||||||
.matcher[Seq[Either[ResponseOutput, StrictEqualHttpResponse]]] compose { input: Seq[String] ⇒
|
.matcher[Seq[Either[ResponseOutput, StrictEqualHttpResponse]]] compose { input: Seq[String] ⇒
|
||||||
val future =
|
collectBlocking {
|
||||||
Source(input.toList)
|
rawParse(requestMethod, input: _*)
|
||||||
.map(ByteString.apply)
|
.mapAsync(1) {
|
||||||
.transform(() ⇒ newParserStage(requestMethod)).named("parser")
|
case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x)))
|
||||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
|
case Left(error) ⇒ FastFuture.successful(Left(error))
|
||||||
.via(headAndTailFlow)
|
|
||||||
.collect {
|
|
||||||
case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒
|
|
||||||
closeAfterResponseCompletion :+= close
|
|
||||||
Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol))
|
|
||||||
case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x)
|
|
||||||
}.map { x ⇒
|
|
||||||
Source {
|
|
||||||
x match {
|
|
||||||
case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x)))
|
|
||||||
case Left(error) ⇒ FastFuture.successful(Left(error))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
.flatten(FlattenStrategy.concat)
|
}.map(strictEqualify)
|
||||||
.map(strictEqualify)
|
|
||||||
.grouped(100000).runWith(Sink.head)
|
|
||||||
Await.result(future, 500.millis)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def rawParse(requestMethod: HttpMethod, input: String*): Source[Either[ResponseOutput, HttpResponse], Unit] =
|
||||||
|
Source(input.toList)
|
||||||
|
.map(ByteString.apply)
|
||||||
|
.transform(() ⇒ newParserStage(requestMethod)).named("parser")
|
||||||
|
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
|
||||||
|
.via(headAndTailFlow)
|
||||||
|
.collect {
|
||||||
|
case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒
|
||||||
|
closeAfterResponseCompletion :+= close
|
||||||
|
Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol))
|
||||||
|
case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
def collectBlocking[T](source: Source[T, Any]): Seq[T] =
|
||||||
|
Await.result(source.grouped(100000).runWith(Sink.head), 500.millis)
|
||||||
|
|
||||||
protected def parserSettings: ParserSettings = ParserSettings(system)
|
protected def parserSettings: ParserSettings = ParserSettings(system)
|
||||||
|
|
||||||
def newParserStage(requestMethod: HttpMethod = GET) = {
|
def newParserStage(requestMethod: HttpMethod = GET) = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue