=hco #16865 response parser needs to continue with startNewMessage if method is still missing
This commit is contained in:
parent
35117748f6
commit
691a520c16
3 changed files with 66 additions and 4 deletions
|
|
@ -285,7 +285,11 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
|
|||
done()
|
||||
}
|
||||
|
||||
def done(): StateResult = null // StateResult is a phantom type
|
||||
/**
|
||||
* Use [[continue]] or [[terminate]] to suspend or terminate processing.
|
||||
* Do not call this directly.
|
||||
*/
|
||||
private def done(): StateResult = null // StateResult is a phantom type
|
||||
|
||||
def contentType(cth: Option[`Content-Type`]) = cth match {
|
||||
case Some(x) ⇒ x.contentType
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser:
|
|||
} else badProtocol
|
||||
} else {
|
||||
emit(NeedNextRequestMethod)
|
||||
done()
|
||||
continue(input, offset)(startNewMessage)
|
||||
}
|
||||
|
||||
override def emit(output: ResponseOutput): Unit = {
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF")
|
|||
|
||||
"The client implementation" should {
|
||||
|
||||
"properly handle a request/response round-trip" which {
|
||||
"handle a request/response round-trip" which {
|
||||
|
||||
"has a request with empty entity" in new TestSetup {
|
||||
requestsSub.sendNext(HttpRequest())
|
||||
|
|
@ -86,7 +86,7 @@ class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF")
|
|||
responses.expectComplete()
|
||||
}
|
||||
|
||||
"has a response with a default entity" in new TestSetup {
|
||||
"has a response with a chunked entity" in new TestSetup {
|
||||
requestsSub.sendNext(HttpRequest())
|
||||
expectWireData(
|
||||
"""GET / HTTP/1.1
|
||||
|
|
@ -155,6 +155,64 @@ class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF")
|
|||
}
|
||||
}
|
||||
|
||||
"handle several requests on one persistent connection" which {
|
||||
"has a first response that was chunked" in new TestSetup {
|
||||
requestsSub.sendNext(HttpRequest())
|
||||
expectWireData(
|
||||
"""GET / HTTP/1.1
|
||||
|Host: example.com:80
|
||||
|User-Agent: akka-http/test
|
||||
|
|
||||
|""")
|
||||
|
||||
netInSub.expectRequest(16)
|
||||
sendWireData(
|
||||
"""HTTP/1.1 200 OK
|
||||
|Transfer-Encoding: chunked
|
||||
|
|
||||
|""")
|
||||
|
||||
responsesSub.request(1)
|
||||
val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = responses.expectNext()
|
||||
|
||||
val probe = StreamTestKit.SubscriberProbe[ChunkStreamPart]()
|
||||
chunks.runWith(Sink(probe))
|
||||
val sub = probe.expectSubscription()
|
||||
|
||||
sendWireData("3\nABC\n")
|
||||
sub.request(1)
|
||||
probe.expectNext(HttpEntity.Chunk("ABC"))
|
||||
|
||||
sendWireData("0\n\n")
|
||||
sub.request(1)
|
||||
probe.expectNext(HttpEntity.LastChunk)
|
||||
probe.expectComplete()
|
||||
|
||||
// simulate that response is received before method bypass reaches response parser
|
||||
sendWireData(
|
||||
"""HTTP/1.1 200 OK
|
||||
|Content-Length: 0
|
||||
|
|
||||
|""")
|
||||
|
||||
responsesSub.request(1)
|
||||
|
||||
requestsSub.sendNext(HttpRequest())
|
||||
expectWireData(
|
||||
"""GET / HTTP/1.1
|
||||
|Host: example.com:80
|
||||
|User-Agent: akka-http/test
|
||||
|
|
||||
|""")
|
||||
requestsSub.sendComplete()
|
||||
responses.expectNext(HttpResponse())
|
||||
|
||||
netOut.expectComplete()
|
||||
netInSub.sendComplete()
|
||||
responses.expectComplete()
|
||||
}
|
||||
}
|
||||
|
||||
"produce proper errors" which {
|
||||
|
||||
"catch the entity stream being shorter than the Content-Length" in new TestSetup {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue