diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala index 1d4feb82e1..187d46c25b 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala @@ -14,12 +14,13 @@ import akka.http.model.parser.CharacterClasses import akka.http.model._ import headers._ import HttpProtocols._ +import ParserOutput._ /** * INTERNAL API */ -private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOutput <: ParserOutput](val settings: ParserSettings, - val headerParser: HttpHeaderParser) +private[http] abstract class HttpMessageParser[Output >: MessageOutput <: ParserOutput](val settings: ParserSettings, + val headerParser: HttpHeaderParser) extends StatefulStage[ByteString, Output] { import settings._ @@ -134,12 +135,12 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut val remainingInputBytes = input.length - bodyStart if (remainingInputBytes > 0) { if (remainingInputBytes < remainingBodyBytes) { - emit(ParserOutput.EntityPart(input drop bodyStart)) + emit(EntityPart(input drop bodyStart)) continue(parseFixedLengthBody(remainingBodyBytes - remainingInputBytes, isLastMessage)) } else { val offset = bodyStart + remainingBodyBytes.toInt - emit(ParserOutput.EntityPart(input.slice(bodyStart, offset))) - emit(ParserOutput.MessageEnd) + emit(EntityPart(input.slice(bodyStart, offset))) + emit(MessageEnd) if (isLastMessage) terminate() else startNewMessage(input, offset) } @@ -154,8 +155,8 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut case HttpHeaderParser.EmptyHeader ⇒ val lastChunk = if (extension.isEmpty && headers.isEmpty) HttpEntity.LastChunk else HttpEntity.LastChunk(extension, headers) - emit(ParserOutput.EntityChunk(lastChunk)) - emit(ParserOutput.MessageEnd) + emit(EntityChunk(lastChunk)) + emit(MessageEnd) if (isLastMessage) terminate() else startNewMessage(input, lineEnd) case header if headerCount < maxHeaderCount ⇒ @@ -168,7 +169,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut if (chunkSize > 0) { val chunkBodyEnd = cursor + chunkSize def result(terminatorLen: Int) = { - emit(ParserOutput.EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension))) + emit(EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension))) trampoline(_ ⇒ parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage)) } byteChar(input, chunkBodyEnd) match { @@ -227,7 +228,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut def fail(status: StatusCode): StateResult = fail(status, status.defaultMessage) def fail(status: StatusCode, summary: String, detail: String = ""): StateResult = fail(status, ErrorInfo(summary, detail)) def fail(status: StatusCode, info: ErrorInfo): StateResult = { - emit(ParserOutput.ParseError(status, info)) + emit(ParseError(status, info)) terminate() } @@ -251,12 +252,12 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut HttpEntity.Strict(contentType(cth), input.slice(bodyStart, bodyStart + contentLength)) def defaultEntity(cth: Option[`Content-Type`], contentLength: Long)(entityParts: Source[_ <: ParserOutput]): UniversalEntity = { - val data = entityParts.collect { case ParserOutput.EntityPart(bytes) ⇒ bytes } + val data = entityParts.collect { case EntityPart(bytes) ⇒ bytes } HttpEntity.Default(contentType(cth), contentLength, data) } def chunkedEntity(cth: Option[`Content-Type`])(entityChunks: Source[_ <: ParserOutput]): RequestEntity with ResponseEntity = { - val chunks = entityChunks.collect { case ParserOutput.EntityChunk(chunk) ⇒ chunk } + val chunks = entityChunks.collect { case EntityChunk(chunk) ⇒ chunk } HttpEntity.Chunked(contentType(cth), chunks) } diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index 0e16f4ee3d..7ee5939a2d 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -12,6 +12,7 @@ import akka.util.ByteString import akka.http.model._ import headers._ import StatusCodes._ +import ParserOutput._ /** * INTERNAL API @@ -19,7 +20,7 @@ import StatusCodes._ private[http] class HttpRequestParser(_settings: ParserSettings, rawRequestUriHeader: Boolean, _headerParser: HttpHeaderParser) - extends HttpMessageParser[ParserOutput.RequestOutput](_settings, _headerParser) { + extends HttpMessageParser[RequestOutput](_settings, _headerParser) { import settings._ private[this] var method: HttpMethod = _ @@ -110,12 +111,12 @@ private[http] class HttpRequestParser(_settings: ParserSettings, clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`], hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult = if (hostHeaderPresent || protocol == HttpProtocols.`HTTP/1.0`) { - def emitRequestStart(createEntity: Source[ParserOutput.RequestOutput] ⇒ RequestEntity, + def emitRequestStart(createEntity: Source[RequestOutput] ⇒ RequestEntity, headers: List[HttpHeader] = headers) = { val allHeaders = if (rawRequestUriHeader) `Raw-Request-URI`(new String(uriBytes, HttpCharsets.`US-ASCII`.nioCharset)) :: headers else headers - emit(ParserOutput.RequestStart(method, uri, protocol, allHeaders, createEntity, closeAfterResponseCompletion)) + emit(RequestStart(method, uri, protocol, allHeaders, createEntity, closeAfterResponseCompletion)) } teh match { diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala index c2321697de..6b65dc81fe 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala @@ -11,6 +11,7 @@ import akka.util.ByteString import akka.http.model._ import headers._ import HttpResponseParser.NoMethod +import ParserOutput._ /** * INTERNAL API @@ -18,7 +19,7 @@ import HttpResponseParser.NoMethod private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser: HttpHeaderParser, dequeueRequestMethodForNextResponse: () ⇒ HttpMethod = () ⇒ NoMethod) - extends HttpMessageParser[ParserOutput.ResponseOutput](_settings, _headerParser) { + extends HttpMessageParser[ResponseOutput](_settings, _headerParser) { import settings._ private[this] var requestMethodForCurrentResponse: HttpMethod = NoMethod @@ -74,9 +75,9 @@ private[http] class HttpResponseParser(_settings: ParserSettings, def parseEntity(headers: List[HttpHeader], protocol: HttpProtocol, input: ByteString, bodyStart: Int, clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`], hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult = { - def emitResponseStart(createEntity: Source[ParserOutput.ResponseOutput] ⇒ ResponseEntity, + def emitResponseStart(createEntity: Source[ResponseOutput] ⇒ ResponseEntity, headers: List[HttpHeader] = headers) = - emit(ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, closeAfterResponseCompletion)) + emit(ResponseStart(statusCode, protocol, headers, createEntity, closeAfterResponseCompletion)) def finishEmptyResponse() = { emitResponseStart(emptyEntity(cth)) startNewMessage(input, bodyStart) @@ -99,7 +100,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, } case None ⇒ emitResponseStart { entityParts ⇒ - val data = entityParts.collect { case ParserOutput.EntityPart(bytes) ⇒ bytes } + val data = entityParts.collect { case EntityPart(bytes) ⇒ bytes } HttpEntity.CloseDelimited(contentType(cth), data) } parseToCloseBody(input, bodyStart) @@ -121,7 +122,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, // currently we do not check for `settings.maxContentLength` overflow def parseToCloseBody(input: ByteString, bodyStart: Int): StateResult = { if (input.length > bodyStart) - emit(ParserOutput.EntityPart(input drop bodyStart)) + emit(EntityPart(input drop bodyStart)) continue(parseToCloseBody) } }