=htc small simplification in message parsers
This commit is contained in:
parent
2d9a49d010
commit
37a476ab46
3 changed files with 22 additions and 19 deletions
|
|
@ -14,12 +14,13 @@ import akka.http.model.parser.CharacterClasses
|
|||
import akka.http.model._
|
||||
import headers._
|
||||
import HttpProtocols._
|
||||
import ParserOutput._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOutput <: ParserOutput](val settings: ParserSettings,
|
||||
val headerParser: HttpHeaderParser)
|
||||
private[http] abstract class HttpMessageParser[Output >: MessageOutput <: ParserOutput](val settings: ParserSettings,
|
||||
val headerParser: HttpHeaderParser)
|
||||
extends StatefulStage[ByteString, Output] {
|
||||
import settings._
|
||||
|
||||
|
|
@ -134,12 +135,12 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
|
|||
val remainingInputBytes = input.length - bodyStart
|
||||
if (remainingInputBytes > 0) {
|
||||
if (remainingInputBytes < remainingBodyBytes) {
|
||||
emit(ParserOutput.EntityPart(input drop bodyStart))
|
||||
emit(EntityPart(input drop bodyStart))
|
||||
continue(parseFixedLengthBody(remainingBodyBytes - remainingInputBytes, isLastMessage))
|
||||
} else {
|
||||
val offset = bodyStart + remainingBodyBytes.toInt
|
||||
emit(ParserOutput.EntityPart(input.slice(bodyStart, offset)))
|
||||
emit(ParserOutput.MessageEnd)
|
||||
emit(EntityPart(input.slice(bodyStart, offset)))
|
||||
emit(MessageEnd)
|
||||
if (isLastMessage) terminate()
|
||||
else startNewMessage(input, offset)
|
||||
}
|
||||
|
|
@ -154,8 +155,8 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
|
|||
case HttpHeaderParser.EmptyHeader ⇒
|
||||
val lastChunk =
|
||||
if (extension.isEmpty && headers.isEmpty) HttpEntity.LastChunk else HttpEntity.LastChunk(extension, headers)
|
||||
emit(ParserOutput.EntityChunk(lastChunk))
|
||||
emit(ParserOutput.MessageEnd)
|
||||
emit(EntityChunk(lastChunk))
|
||||
emit(MessageEnd)
|
||||
if (isLastMessage) terminate()
|
||||
else startNewMessage(input, lineEnd)
|
||||
case header if headerCount < maxHeaderCount ⇒
|
||||
|
|
@ -168,7 +169,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
|
|||
if (chunkSize > 0) {
|
||||
val chunkBodyEnd = cursor + chunkSize
|
||||
def result(terminatorLen: Int) = {
|
||||
emit(ParserOutput.EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension)))
|
||||
emit(EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension)))
|
||||
trampoline(_ ⇒ parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage))
|
||||
}
|
||||
byteChar(input, chunkBodyEnd) match {
|
||||
|
|
@ -227,7 +228,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
|
|||
def fail(status: StatusCode): StateResult = fail(status, status.defaultMessage)
|
||||
def fail(status: StatusCode, summary: String, detail: String = ""): StateResult = fail(status, ErrorInfo(summary, detail))
|
||||
def fail(status: StatusCode, info: ErrorInfo): StateResult = {
|
||||
emit(ParserOutput.ParseError(status, info))
|
||||
emit(ParseError(status, info))
|
||||
terminate()
|
||||
}
|
||||
|
||||
|
|
@ -251,12 +252,12 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
|
|||
HttpEntity.Strict(contentType(cth), input.slice(bodyStart, bodyStart + contentLength))
|
||||
|
||||
def defaultEntity(cth: Option[`Content-Type`], contentLength: Long)(entityParts: Source[_ <: ParserOutput]): UniversalEntity = {
|
||||
val data = entityParts.collect { case ParserOutput.EntityPart(bytes) ⇒ bytes }
|
||||
val data = entityParts.collect { case EntityPart(bytes) ⇒ bytes }
|
||||
HttpEntity.Default(contentType(cth), contentLength, data)
|
||||
}
|
||||
|
||||
def chunkedEntity(cth: Option[`Content-Type`])(entityChunks: Source[_ <: ParserOutput]): RequestEntity with ResponseEntity = {
|
||||
val chunks = entityChunks.collect { case ParserOutput.EntityChunk(chunk) ⇒ chunk }
|
||||
val chunks = entityChunks.collect { case EntityChunk(chunk) ⇒ chunk }
|
||||
HttpEntity.Chunked(contentType(cth), chunks)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.util.ByteString
|
|||
import akka.http.model._
|
||||
import headers._
|
||||
import StatusCodes._
|
||||
import ParserOutput._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -19,7 +20,7 @@ import StatusCodes._
|
|||
private[http] class HttpRequestParser(_settings: ParserSettings,
|
||||
rawRequestUriHeader: Boolean,
|
||||
_headerParser: HttpHeaderParser)
|
||||
extends HttpMessageParser[ParserOutput.RequestOutput](_settings, _headerParser) {
|
||||
extends HttpMessageParser[RequestOutput](_settings, _headerParser) {
|
||||
import settings._
|
||||
|
||||
private[this] var method: HttpMethod = _
|
||||
|
|
@ -110,12 +111,12 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
|
|||
clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`],
|
||||
hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult =
|
||||
if (hostHeaderPresent || protocol == HttpProtocols.`HTTP/1.0`) {
|
||||
def emitRequestStart(createEntity: Source[ParserOutput.RequestOutput] ⇒ RequestEntity,
|
||||
def emitRequestStart(createEntity: Source[RequestOutput] ⇒ RequestEntity,
|
||||
headers: List[HttpHeader] = headers) = {
|
||||
val allHeaders =
|
||||
if (rawRequestUriHeader) `Raw-Request-URI`(new String(uriBytes, HttpCharsets.`US-ASCII`.nioCharset)) :: headers
|
||||
else headers
|
||||
emit(ParserOutput.RequestStart(method, uri, protocol, allHeaders, createEntity, closeAfterResponseCompletion))
|
||||
emit(RequestStart(method, uri, protocol, allHeaders, createEntity, closeAfterResponseCompletion))
|
||||
}
|
||||
|
||||
teh match {
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import akka.util.ByteString
|
|||
import akka.http.model._
|
||||
import headers._
|
||||
import HttpResponseParser.NoMethod
|
||||
import ParserOutput._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -18,7 +19,7 @@ import HttpResponseParser.NoMethod
|
|||
private[http] class HttpResponseParser(_settings: ParserSettings,
|
||||
_headerParser: HttpHeaderParser,
|
||||
dequeueRequestMethodForNextResponse: () ⇒ HttpMethod = () ⇒ NoMethod)
|
||||
extends HttpMessageParser[ParserOutput.ResponseOutput](_settings, _headerParser) {
|
||||
extends HttpMessageParser[ResponseOutput](_settings, _headerParser) {
|
||||
import settings._
|
||||
|
||||
private[this] var requestMethodForCurrentResponse: HttpMethod = NoMethod
|
||||
|
|
@ -74,9 +75,9 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
|
|||
def parseEntity(headers: List[HttpHeader], protocol: HttpProtocol, input: ByteString, bodyStart: Int,
|
||||
clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`],
|
||||
hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult = {
|
||||
def emitResponseStart(createEntity: Source[ParserOutput.ResponseOutput] ⇒ ResponseEntity,
|
||||
def emitResponseStart(createEntity: Source[ResponseOutput] ⇒ ResponseEntity,
|
||||
headers: List[HttpHeader] = headers) =
|
||||
emit(ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, closeAfterResponseCompletion))
|
||||
emit(ResponseStart(statusCode, protocol, headers, createEntity, closeAfterResponseCompletion))
|
||||
def finishEmptyResponse() = {
|
||||
emitResponseStart(emptyEntity(cth))
|
||||
startNewMessage(input, bodyStart)
|
||||
|
|
@ -99,7 +100,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
|
|||
}
|
||||
case None ⇒
|
||||
emitResponseStart { entityParts ⇒
|
||||
val data = entityParts.collect { case ParserOutput.EntityPart(bytes) ⇒ bytes }
|
||||
val data = entityParts.collect { case EntityPart(bytes) ⇒ bytes }
|
||||
HttpEntity.CloseDelimited(contentType(cth), data)
|
||||
}
|
||||
parseToCloseBody(input, bodyStart)
|
||||
|
|
@ -121,7 +122,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
|
|||
// currently we do not check for `settings.maxContentLength` overflow
|
||||
def parseToCloseBody(input: ByteString, bodyStart: Int): StateResult = {
|
||||
if (input.length > bodyStart)
|
||||
emit(ParserOutput.EntityPart(input drop bodyStart))
|
||||
emit(EntityPart(input drop bodyStart))
|
||||
continue(parseToCloseBody)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue