+htp add first version of new unmarshalling infrastructure, tests still missing

This commit is contained in:
Mathias 2014-07-19 00:10:13 +02:00
parent bebdd549d3
commit e39f4fd0d2
9 changed files with 578 additions and 2 deletions

View file

@ -0,0 +1,237 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.parsing
import akka.event.LoggingAdapter
import org.reactivestreams.api.Producer
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.mutable.ListBuffer
import akka.parboiled2.CharPredicate
import akka.stream.scaladsl.Flow
import akka.stream.{ FlowMaterializer, Transformer }
import akka.util.ByteString
import akka.http.model._
import akka.http.util._
import headers._
/**
* INTERNAL API
*
* see: http://tools.ietf.org/html/rfc2046#section-5.1.1
*/
private[http] final class BodyPartParser(defaultContentType: ContentType,
boundary: String,
materializer: FlowMaterializer,
log: LoggingAdapter,
settings: BodyPartParser.Settings = BodyPartParser.defaultSettings)
extends Transformer[ByteString, BodyPartParser.Output] {
import BodyPartParser._
import settings._
require(boundary.nonEmpty, "'boundary' parameter of multipart Content-Type must be non-empty")
require(boundary.charAt(boundary.length - 1) != ' ', "'boundary' parameter of multipart Content-Type must not end with a space char")
require(boundaryCharNoSpace matchesAll boundary,
s"'boundary' parameter of multipart Content-Type contains illegal character '${boundaryCharNoSpace.firstMismatch(boundary).get}'")
sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup
private[this] val needle = new Array[Byte](boundary.length + 4)
needle(0) = '\r'.toByte
needle(1) = '\n'.toByte
needle(2) = '-'.toByte
needle(3) = '-'.toByte
boundary.getAsciiBytes(needle, 4)
private[this] val boyerMoore = new BoyerMoore(needle)
private[this] val headerParser = HttpHeaderParser.unprimed(settings, warnOnIllegalHeader)
private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs
private[this] var state: ByteString StateResult = tryParseInitialBoundary
private[this] var terminated = false
override def isComplete = terminated
def warnOnIllegalHeader(errorInfo: ErrorInfo): Unit =
if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty)
def onNext(input: ByteString): immutable.Seq[Output] = {
result.clear()
try state(input)
catch {
case e: ParsingException fail(e.info)
case NotEnoughDataException throw new IllegalStateException // we are missing a try/catch{continue} wrapper somewhere
}
result.toList
}
def tryParseInitialBoundary(input: ByteString): StateResult = {
try {
@tailrec def rec(ix: Int): StateResult =
if (ix < needle.length) {
if (byteAt(input, ix - 2) == needle(ix)) rec(ix + 1)
else parsePreamble(input, 0)
} else {
if (crlf(input, ix - 2)) parseHeaderLines(input, ix)
else if (doubleDash(input, ix - 2)) terminate()
else parsePreamble(input, 0)
}
rec(2)
} catch {
case NotEnoughDataException continue((input, _) tryParseInitialBoundary(input))
}
}
def parsePreamble(input: ByteString, offset: Int): StateResult = {
try {
@tailrec def rec(index: Int): StateResult = {
val needleEnd = boyerMoore.nextIndex(input, index) + needle.length
if (crlf(input, needleEnd)) parseHeaderLines(input, needleEnd + 2)
else if (doubleDash(input, needleEnd)) terminate()
else rec(needleEnd)
}
rec(offset)
} catch {
case NotEnoughDataException continue(input.takeRight(needle.length), 0)(parsePreamble)
}
}
@tailrec def parseHeaderLines(input: ByteString, lineStart: Int, headers: List[HttpHeader] = Nil,
headerCount: Int = 0, cth: Option[`Content-Type`] = None): StateResult = {
var lineEnd = 0
val resultHeader =
try {
lineEnd = headerParser.parseHeaderLine(input, lineStart)()
headerParser.resultHeader
} catch {
case NotEnoughDataException null
}
resultHeader match {
case null continue(input, lineStart)(parseHeaderLinesAux(headers, headerCount, cth))
case HttpHeaderParser.EmptyHeader
val contentType = cth match {
case Some(x) x.contentType
case None defaultContentType
}
parseEntity(headers, contentType)(input, lineEnd)
case h: `Content-Type`
if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, Some(h))
else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, cth)
else fail("multipart part must not contain more than one Content-Type header")
case h if headerCount < maxHeaderCount parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, cth)
case _ fail(s"multipart part contains more than the configured limit of $maxHeaderCount headers")
}
}
// work-around for compiler complaining about non-tail-recursion if we inline this method
def parseHeaderLinesAux(headers: List[HttpHeader], headerCount: Int,
cth: Option[`Content-Type`])(input: ByteString, lineStart: Int): StateResult =
parseHeaderLines(input, lineStart, headers, headerCount, cth)
def parseEntity(headers: List[HttpHeader], contentType: ContentType,
emitPartChunk: (List[HttpHeader], ContentType, ByteString) Unit = {
(headers, ct, bytes)
emit(BodyPartStart(headers, entityParts HttpEntity.CloseDelimited(ct,
Flow(entityParts).collect { case EntityPart(data) data }.toProducer(materializer))))
emit(bytes)
},
emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) Unit = {
(headers, ct, bytes) emit(BodyPartStart(headers, _ HttpEntity.Strict(ct, bytes)))
})(input: ByteString, offset: Int): StateResult =
try {
@tailrec def rec(index: Int): StateResult = {
val currentPartEnd = boyerMoore.nextIndex(input, index)
def emitFinalChunk() = emitFinalPartChunk(headers, contentType, input.slice(offset, currentPartEnd))
val needleEnd = currentPartEnd + needle.length
if (crlf(input, needleEnd)) {
emitFinalChunk()
parseHeaderLines(input, needleEnd + 2)
} else if (doubleDash(input, needleEnd)) {
emitFinalChunk()
terminate()
} else rec(needleEnd)
}
rec(offset)
} catch {
case NotEnoughDataException
// we cannot emit all input bytes since the end of the input might be the start of the next boundary
val emitEnd = math.max(input.length - needle.length, offset)
emitPartChunk(headers, contentType, input.slice(offset, emitEnd))
val simpleEmit: (List[HttpHeader], ContentType, ByteString) Unit = (_, _, bytes) emit(bytes)
continue(input drop emitEnd, 0)(parseEntity(null, null, simpleEmit, simpleEmit))
}
def emit(bytes: ByteString): Unit = if (bytes.nonEmpty) emit(EntityPart(bytes))
def emit(output: Output): Unit = result += output
def continue(input: ByteString, offset: Int)(next: (ByteString, Int) StateResult): StateResult = {
state =
math.signum(offset - input.length) match {
case -1 more next(input ++ more, offset)
case 0 next(_, 0)
case 1 throw new IllegalStateException
}
done()
}
def continue(next: (ByteString, Int) StateResult): StateResult = {
state = next(_, 0)
done()
}
def fail(summary: String): StateResult = fail(ErrorInfo(summary))
def fail(info: ErrorInfo): StateResult = {
emit(ParseError(info))
terminate()
}
def terminate(): StateResult = {
terminated = true
done()
}
def done(): StateResult = null // StateResult is a phantom type
def crlf(input: ByteString, offset: Int): Boolean =
byteChar(input, offset) == '\r' && byteChar(input, offset + 1) == '\n'
def doubleDash(input: ByteString, offset: Int): Boolean =
byteChar(input, offset) == '-' && byteChar(input, offset + 1) == '-'
}
private[http] object BodyPartParser {
// http://tools.ietf.org/html/rfc2046#section-5.1.1
val boundaryCharNoSpace = CharPredicate.Digit ++ CharPredicate.Alpha ++ "'()+_,-./:=?"
sealed trait Output
final case class BodyPartStart(headers: List[HttpHeader], createEntity: Producer[Output] HttpEntity) extends Output
final case class EntityPart(data: ByteString) extends Output
final case class ParseError(info: ErrorInfo) extends Output
final case class Settings(
maxHeaderNameLength: Int,
maxHeaderValueLength: Int,
maxHeaderCount: Int,
illegalHeaderWarnings: Boolean,
headerValueCacheLimit: Int) extends HttpHeaderParser.Settings {
require(maxHeaderNameLength > 0, "maxHeaderNameLength must be > 0")
require(maxHeaderValueLength > 0, "maxHeaderValueLength must be > 0")
require(maxHeaderCount > 0, "maxHeaderCount must be > 0")
require(headerValueCacheLimit >= 0, "headerValueCacheLimit must be >= 0")
def headerValueCacheLimit(headerName: String) = headerValueCacheLimit
}
// TODO: load from config
val defaultSettings = Settings(
maxHeaderNameLength = 64,
maxHeaderValueLength = 8192,
maxHeaderCount = 64,
illegalHeaderWarnings = true,
headerValueCacheLimit = 8)
}

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.parsing
import scala.annotation.tailrec
import akka.util.ByteString
/**
* Straight-forward Boyer-Moore string search implementation.
*/
private class BoyerMoore(needle: Array[Byte]) {
require(needle.length > 0, "needle must be non-empty")
private[this] val nl1 = needle.length - 1
private[this] val charTable: Array[Int] = {
val table = Array.fill(256)(needle.length)
@tailrec def rec(i: Int): Unit =
if (i < nl1) {
table(needle(i)) = nl1 - i
rec(i + 1)
}
rec(0)
table
}
private[this] val offsetTable: Array[Int] = {
val table = new Array[Int](needle.length)
@tailrec def isPrefix(i: Int, j: Int): Boolean =
i == needle.length || needle(i) == needle(j) && isPrefix(i + 1, j + 1)
@tailrec def loop1(i: Int, lastPrefixPosition: Int): Unit =
if (i >= 0) {
val nextLastPrefixPosition = if (isPrefix(i + 1, 0)) i + 1 else lastPrefixPosition
table(nl1 - i) = nextLastPrefixPosition - i + nl1
loop1(i - 1, nextLastPrefixPosition)
}
loop1(nl1, needle.length)
@tailrec def suffixLength(i: Int, j: Int, result: Int): Int =
if (i >= 0 && needle(i) == needle(j)) suffixLength(i - 1, j - 1, result + 1) else result
@tailrec def loop2(i: Int): Unit =
if (i < nl1) {
val sl = suffixLength(i, nl1, 0)
table(sl) = nl1 - i + sl
loop2(i + 1)
}
loop2(0)
table
}
/**
* Returns the index of the next occurrence of `needle` in `haystack` that is >= `offset`.
* If none is found a `NotEnoughDataException` is thrown.
*/
def nextIndex(haystack: ByteString, offset: Int): Int = {
@tailrec def rec(i: Int, j: Int): Int = {
val byte = byteAt(haystack, i)
if (needle(j) == byte) {
if (j == 0) i // found
else rec(i - 1, j - 1)
} else rec(i + math.max(offsetTable(nl1 - j), charTable(byte)), nl1)
}
rec(nl1, nl1)
}
}

View file

@ -26,8 +26,13 @@ package object parsing {
/**
* INTERNAL API
*/
private[http] def byteChar(input: ByteString, ix: Int): Char =
if (ix < input.length) input(ix).toChar else throw NotEnoughDataException
private[http] def byteChar(input: ByteString, ix: Int): Char = byteAt(input, ix).toChar
/**
* INTERNAL API
*/
private[http] def byteAt(input: ByteString, ix: Int): Byte =
if (ix < input.length) input(ix) else throw NotEnoughDataException
/**
* INTERNAL API

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.unmarshalling
import akka.http.unmarshalling.Unmarshalling._
import scala.concurrent.{ Future, ExecutionContext }
trait GenericUnmarshallers {
implicit def targetOptionUnmarshaller[A, B](implicit um: Unmarshaller[A, B], ec: ExecutionContext): Unmarshaller[A, Option[B]] =
um.mapUnmarshalling {
case Success(b) Success(Some(b))
case ContentExpected Success(None)
case x: Failure x
}
implicit def sourceOptionUnmarshaller[A, B](implicit um: Unmarshaller[A, B], ec: ExecutionContext): Unmarshaller[Option[A], B] =
Unmarshaller {
case Some(a) um(a)
case None Future.successful(ContentExpected)
}
}
object GenericUnmarshallers extends GenericUnmarshallers

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.unmarshalling
import org.reactivestreams.api.Producer
import scala.concurrent.Future
import akka.actor.ActorRefFactory
import akka.http.parsing.BodyPartParser
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.http.model._
import akka.http.util._
import MediaRanges._
import MediaTypes._
import HttpCharsets._
trait MultipartUnmarshallers {
implicit def defaultMultipartContentUnmarshaller(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory) = multipartContentUnmarshaller(`UTF-8`)
def multipartContentUnmarshaller(defaultCharset: HttpCharset)(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartContent] =
multipartPartsUnmarshaller[MultipartContent](`multipart/*`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartContent(_))
implicit def defaultMultipartByteRangesUnmarshaller(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory) = multipartByteRangesUnmarshaller(`UTF-8`)
def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartByteRanges] =
multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`,
ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_))
def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Producer[BodyPart] T)(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[T] =
Unmarshaller { entity
Future.successful {
if (mediaRange matches entity.contentType.mediaType) {
entity.contentType.mediaType.params.get("boundary") match {
case None sys.error("Content-Type with a multipart media type must have a 'boundary' parameter")
case Some(boundary)
val bodyParts = Flow(entity.dataBytes(fm))
.transform(new BodyPartParser(defaultContentType, boundary, fm, actorSystem(refFactory).log))
.splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart])
.headAndTail(fm)
.collect {
case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts)
BodyPart(createEntity(entityParts), headers)
}.toProducer(fm)
Unmarshalling.Success(create(bodyParts))
}
} else Unmarshalling.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil)
}
}
implicit def defaultMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
multipartFormDataUnmarshaller(strict = true)
def multipartFormDataUnmarshaller(strict: Boolean = true)(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts
def verify(part: BodyPart): BodyPart = part // TODO
val parts = if (strict) Flow(bodyParts).map(verify).toProducer(fm) else bodyParts
MultipartFormData(parts)
}
}
object MultipartUnmarshallers extends MultipartUnmarshallers

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.unmarshalling
import java.io.{ ByteArrayInputStream, InputStreamReader }
import scala.concurrent.{ Future, ExecutionContext }
import scala.xml.{ XML, NodeSeq }
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import akka.http.model._
import MediaTypes._
trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers {
implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] =
Unmarshaller { entity
if (entity.isKnownEmpty) Future.successful(Unmarshalling.Success(ByteString.empty))
else Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).map(Unmarshalling.Success(_)).toFuture(fm)
}
implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer,
ec: ExecutionContext): FromEntityUnmarshaller[Array[Byte]] =
byteStringUnmarshaller.map(_.toArray[Byte])
implicit def charArrayUnmarshaller(implicit fm: FlowMaterializer,
ec: ExecutionContext): FromEntityUnmarshaller[Array[Char]] =
byteStringUnmarshaller(fm) mapWithInput { (entity, bytes)
val charBuffer = entity.contentType.charset.nioCharset.decode(bytes.asByteBuffer)
val array = new Array[Char](charBuffer.length())
charBuffer.get(array)
array
}
implicit def stringUnmarshaller(implicit fm: FlowMaterializer,
ec: ExecutionContext): FromEntityUnmarshaller[String] =
byteStringUnmarshaller(fm) mapWithInput { (entity, bytes)
// FIXME: add `ByteString::decodeString(java.nio.Charset): String` overload!!!
bytes.decodeString(entity.contentType.charset.nioCharset.name) // ouch!!!
}
private val nodeSeqMediaTypes = List(`text/xml`, `application/xml`, `text/html`, `application/xhtml+xml`)
implicit def nodeSeqUnmarshaller(implicit fm: FlowMaterializer,
ec: ExecutionContext): FromEntityUnmarshaller[NodeSeq] =
byteArrayUnmarshaller flatMapWithInput { (entity, bytes)
if (nodeSeqMediaTypes contains entity.contentType.mediaType) {
val parser = XML.parser
try parser.setProperty("http://apache.org/xml/properties/locale", java.util.Locale.ROOT)
catch {
case e: org.xml.sax.SAXNotRecognizedException // property is not needed
}
val reader = new InputStreamReader(new ByteArrayInputStream(bytes), entity.contentType.charset.nioCharset)
Unmarshalling.Success(XML.withSAXParser(parser).load(reader))
} else Unmarshalling.UnsupportedContentType(nodeSeqMediaTypes map (ContentTypeRange(_)))
}
}
object PredefinedFromEntityUnmarshallers extends PredefinedFromEntityUnmarshallers

View file

@ -0,0 +1,75 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.unmarshalling
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import akka.http.model.ContentTypeRange
trait Unmarshaller[A, B] extends (A Future[Unmarshalling[B]]) {
def map[C](f: B C)(implicit ec: ExecutionContext): Unmarshaller[A, C] =
mapUnmarshalling(_ map f)
def flatMap[C](f: B Unmarshalling[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] =
mapUnmarshalling(_ flatMap f)
def mapWithInput[C](f: (A, B) C)(implicit ec: ExecutionContext): Unmarshaller[A, C] =
Unmarshaller { a this(a) map (_ map (f(a, _))) }
def flatMapWithInput[C](f: (A, B) Unmarshalling[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] =
Unmarshaller { a this(a) map (_ flatMap (f(a, _))) }
def mapUnmarshalling[C](f: Unmarshalling[B] Unmarshalling[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] =
Unmarshaller { this(_) map f }
def withDefaultValue[BB >: B](defaultValue: BB)(implicit ec: ExecutionContext): Unmarshaller[A, BB] =
mapUnmarshalling { _ recover { case Unmarshalling.ContentExpected defaultValue } }
}
object Unmarshaller
extends GenericUnmarshallers
with PredefinedFromEntityUnmarshallers
with UnmarshallerLifting {
def apply[A, B](f: A Future[Unmarshalling[B]]): Unmarshaller[A, B] =
new Unmarshaller[A, B] { def apply(a: A) = f(a) }
}
sealed trait Unmarshalling[+A] {
def isSuccess: Boolean
def isFailure: Boolean
def map[B](f: A B): Unmarshalling[B]
def flatMap[B](f: A Unmarshalling[B]): Unmarshalling[B]
def recover[AA >: A](f: PartialFunction[Unmarshalling.Failure, AA]): Unmarshalling[AA]
}
object Unmarshalling {
final case class Success[+A](value: A) extends Unmarshalling[A] {
def isSuccess = true
def isFailure = false
def map[B](f: A B) = Success(f(value))
def flatMap[B](f: A Unmarshalling[B]) = f(value)
def recover[AA >: A](f: PartialFunction[Unmarshalling.Failure, AA]) = this
}
sealed abstract class Failure extends Unmarshalling[Nothing] {
def isSuccess = false
def isFailure = true
def map[B](f: Nothing B) = this
def flatMap[B](f: Nothing Unmarshalling[B]) = this
def recover[AA >: Nothing](f: PartialFunction[Unmarshalling.Failure, AA]) =
if (f isDefinedAt this) Success(f(this)) else this
}
case object ContentExpected extends Failure
final case class InvalidContent(errorMessage: String, cause: Option[Throwable] = None) extends Failure
object InvalidContent {
def apply(errorMessage: String, cause: Throwable) = new InvalidContent(errorMessage, Some(cause))
}
case class UnsupportedContentType(supported: immutable.Seq[ContentTypeRange]) extends Failure
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.unmarshalling
import akka.http.model._
trait UnmarshallerLifting {
implicit def entity2message[A <: HttpMessage, B](implicit um: FromEntityUnmarshaller[B]): Unmarshaller[A, B] =
Unmarshaller { message um(message.entity) }
implicit def message2request[T](implicit um: FromMessageUnmarshaller[T]): FromRequestUnmarshaller[T] =
Unmarshaller { request um(request) }
implicit def message2response[T](implicit um: FromMessageUnmarshaller[T]): FromResponseUnmarshaller[T] =
Unmarshaller { response um(response) }
}
object UnmarshallerLifting extends UnmarshallerLifting

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http
import akka.http.model._
package object unmarshalling {
type FromEntityUnmarshaller[T] = Unmarshaller[HttpEntity, T]
type FromMessageUnmarshaller[T] = Unmarshaller[HttpMessage, T]
type FromResponseUnmarshaller[T] = Unmarshaller[HttpResponse, T]
type FromRequestUnmarshaller[T] = Unmarshaller[HttpRequest, T]
}