From 691a520c16e5732674a8919a273aa8d7d8eba15a Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 12 Feb 2015 16:29:19 +0100 Subject: [PATCH] =hco #16865 response parser needs to continue with startNewMessage if method is still missing --- .../engine/parsing/HttpMessageParser.scala | 6 +- .../engine/parsing/HttpResponseParser.scala | 2 +- .../http/engine/client/HttpClientSpec.scala | 62 ++++++++++++++++++- 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala index c2288ccdf0..ac87715a6c 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala @@ -285,7 +285,11 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser done() } - def done(): StateResult = null // StateResult is a phantom type + /** + * Use [[continue]] or [[terminate]] to suspend or terminate processing. + * Do not call this directly. + */ + private def done(): StateResult = null // StateResult is a phantom type def contentType(cth: Option[`Content-Type`]) = cth match { case Some(x) ⇒ x.contentType diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala index 2e9eabfde8..1aac65c770 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala @@ -38,7 +38,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser: } else badProtocol } else { emit(NeedNextRequestMethod) - done() + continue(input, offset)(startNewMessage) } override def emit(output: ResponseOutput): Unit = { diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala index d257a2741b..c44a47728d 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala @@ -22,7 +22,7 @@ class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") "The client implementation" should { - "properly handle a request/response round-trip" which { + "handle a request/response round-trip" which { "has a request with empty entity" in new TestSetup { requestsSub.sendNext(HttpRequest()) @@ -86,7 +86,7 @@ class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") responses.expectComplete() } - "has a response with a default entity" in new TestSetup { + "has a response with a chunked entity" in new TestSetup { requestsSub.sendNext(HttpRequest()) expectWireData( """GET / HTTP/1.1 @@ -155,6 +155,64 @@ class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") } } + "handle several requests on one persistent connection" which { + "has a first response that was chunked" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |""") + + responsesSub.request(1) + val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = responses.expectNext() + + val probe = StreamTestKit.SubscriberProbe[ChunkStreamPart]() + chunks.runWith(Sink(probe)) + val sub = probe.expectSubscription() + + sendWireData("3\nABC\n") + sub.request(1) + probe.expectNext(HttpEntity.Chunk("ABC")) + + sendWireData("0\n\n") + sub.request(1) + probe.expectNext(HttpEntity.LastChunk) + probe.expectComplete() + + // simulate that response is received before method bypass reaches response parser + sendWireData( + """HTTP/1.1 200 OK + |Content-Length: 0 + | + |""") + + responsesSub.request(1) + + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + requestsSub.sendComplete() + responses.expectNext(HttpResponse()) + + netOut.expectComplete() + netInSub.sendComplete() + responses.expectComplete() + } + } + "produce proper errors" which { "catch the entity stream being shorter than the Content-Length" in new TestSetup {