diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala index 23dbc2a3c5..004d94bc49 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/parsing/HttpResponseParser.scala @@ -99,7 +99,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser: summary = s"Response Content-Length of $contentLength bytes exceeds the configured limit of $maxContentLength bytes", detail = "Consider increasing the value of akka.http.client.parsing.max-content-length") else if (contentLength == 0) finishEmptyResponse() - else if (contentLength < input.size - bodyStart) { + else if (contentLength <= input.size - bodyStart) { val cl = contentLength.toInt emitResponseStart(strictEntity(cth, input, bodyStart, cl)) setCompletionHandling(HttpMessageParser.CompletionOk) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala index 5d35284a43..59de26aae0 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala @@ -57,6 +57,17 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { closeAfterResponseCompletion shouldEqual Seq(false) } + "a response with a simple body" in new Test { + collectBlocking(rawParse(GET, + prep { + """HTTP/1.1 200 Ok + |Content-Length: 4 + | + |ABCD""" + })) shouldEqual Seq(Right(HttpResponse(entity = "ABCD".getBytes))) + closeAfterResponseCompletion shouldEqual Seq(false) + } + "a response with a custom status code" in new Test { override def parserSettings: ParserSettings = super.parserSettings.withCustomStatusCodes(ServerOnTheMove) @@ -324,31 +335,31 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def generalRawMultiParseTo(requestMethod: HttpMethod, expected: Either[ResponseOutput, HttpResponse]*): Matcher[Seq[String]] = equal(expected.map(strictEqualify)) .matcher[Seq[Either[ResponseOutput, StrictEqualHttpResponse]]] compose { input: Seq[String] ⇒ - val future = - Source(input.toList) - .map(ByteString.apply) - .transform(() ⇒ newParserStage(requestMethod)).named("parser") - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) - .via(headAndTailFlow) - .collect { - case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒ - closeAfterResponseCompletion :+= close - Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol)) - case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x) - }.map { x ⇒ - Source { - x match { - case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x))) - case Left(error) ⇒ FastFuture.successful(Left(error)) - } - } + collectBlocking { + rawParse(requestMethod, input: _*) + .mapAsync(1) { + case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x))) + case Left(error) ⇒ FastFuture.successful(Left(error)) } - .flatten(FlattenStrategy.concat) - .map(strictEqualify) - .grouped(100000).runWith(Sink.head) - Await.result(future, 500.millis) + }.map(strictEqualify) } + def rawParse(requestMethod: HttpMethod, input: String*): Source[Either[ResponseOutput, HttpResponse], Unit] = + Source(input.toList) + .map(ByteString.apply) + .transform(() ⇒ newParserStage(requestMethod)).named("parser") + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) + .via(headAndTailFlow) + .collect { + case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒ + closeAfterResponseCompletion :+= close + Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol)) + case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x) + } + + def collectBlocking[T](source: Source[T, Any]): Seq[T] = + Await.result(source.grouped(100000).runWith(Sink.head), 500.millis) + protected def parserSettings: ParserSettings = ParserSettings(system) def newParserStage(requestMethod: HttpMethod = GET) = {