From e39f4fd0d2e37f1913d74f8b2d65521fb853f0ef Mon Sep 17 00:00:00 2001 From: Mathias Date: Sat, 19 Jul 2014 00:10:13 +0200 Subject: [PATCH] +htp add first version of new unmarshalling infrastructure, tests still missing --- .../akka/http/parsing/BodyPartParser.scala | 237 ++++++++++++++++++ .../scala/akka/http/parsing/BoyerMoore.scala | 68 +++++ .../scala/akka/http/parsing/package.scala | 9 +- .../unmarshalling/GenericUnmarshallers.scala | 27 ++ .../MultipartUnmarshallers.scala | 68 +++++ .../PredefinedFromEntityUnmarshallers.scala | 61 +++++ .../http/unmarshalling/Unmarshaller.scala | 75 ++++++ .../unmarshalling/UnmarshallerLifting.scala | 21 ++ .../akka/http/unmarshalling/package.scala | 14 ++ 9 files changed, 578 insertions(+), 2 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala create mode 100644 akka-http-core/src/main/scala/akka/http/parsing/BoyerMoore.scala create mode 100644 akka-http/src/main/scala/akka/http/unmarshalling/GenericUnmarshallers.scala create mode 100644 akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala create mode 100644 akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala create mode 100644 akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala create mode 100644 akka-http/src/main/scala/akka/http/unmarshalling/UnmarshallerLifting.scala create mode 100644 akka-http/src/main/scala/akka/http/unmarshalling/package.scala diff --git a/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala b/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala new file mode 100644 index 0000000000..68ec7d5e87 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala @@ -0,0 +1,237 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.parsing + +import akka.event.LoggingAdapter +import org.reactivestreams.api.Producer +import scala.annotation.tailrec +import scala.collection.immutable +import scala.collection.mutable.ListBuffer +import akka.parboiled2.CharPredicate +import akka.stream.scaladsl.Flow +import akka.stream.{ FlowMaterializer, Transformer } +import akka.util.ByteString +import akka.http.model._ +import akka.http.util._ +import headers._ + +/** + * INTERNAL API + * + * see: http://tools.ietf.org/html/rfc2046#section-5.1.1 + */ +private[http] final class BodyPartParser(defaultContentType: ContentType, + boundary: String, + materializer: FlowMaterializer, + log: LoggingAdapter, + settings: BodyPartParser.Settings = BodyPartParser.defaultSettings) + extends Transformer[ByteString, BodyPartParser.Output] { + import BodyPartParser._ + import settings._ + + require(boundary.nonEmpty, "'boundary' parameter of multipart Content-Type must be non-empty") + require(boundary.charAt(boundary.length - 1) != ' ', "'boundary' parameter of multipart Content-Type must not end with a space char") + require(boundaryCharNoSpace matchesAll boundary, + s"'boundary' parameter of multipart Content-Type contains illegal character '${boundaryCharNoSpace.firstMismatch(boundary).get}'") + + sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup + + private[this] val needle = new Array[Byte](boundary.length + 4) + needle(0) = '\r'.toByte + needle(1) = '\n'.toByte + needle(2) = '-'.toByte + needle(3) = '-'.toByte + boundary.getAsciiBytes(needle, 4) + private[this] val boyerMoore = new BoyerMoore(needle) + private[this] val headerParser = HttpHeaderParser.unprimed(settings, warnOnIllegalHeader) + private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs + private[this] var state: ByteString ⇒ StateResult = tryParseInitialBoundary + private[this] var terminated = false + + override def isComplete = terminated + + def warnOnIllegalHeader(errorInfo: ErrorInfo): Unit = + if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty) + + def onNext(input: ByteString): immutable.Seq[Output] = { + result.clear() + try state(input) + catch { + case e: ParsingException ⇒ fail(e.info) + case NotEnoughDataException ⇒ throw new IllegalStateException // we are missing a try/catch{continue} wrapper somewhere + } + result.toList + } + + def tryParseInitialBoundary(input: ByteString): StateResult = { + try { + @tailrec def rec(ix: Int): StateResult = + if (ix < needle.length) { + if (byteAt(input, ix - 2) == needle(ix)) rec(ix + 1) + else parsePreamble(input, 0) + } else { + if (crlf(input, ix - 2)) parseHeaderLines(input, ix) + else if (doubleDash(input, ix - 2)) terminate() + else parsePreamble(input, 0) + } + rec(2) + } catch { + case NotEnoughDataException ⇒ continue((input, _) ⇒ tryParseInitialBoundary(input)) + } + } + + def parsePreamble(input: ByteString, offset: Int): StateResult = { + try { + @tailrec def rec(index: Int): StateResult = { + val needleEnd = boyerMoore.nextIndex(input, index) + needle.length + if (crlf(input, needleEnd)) parseHeaderLines(input, needleEnd + 2) + else if (doubleDash(input, needleEnd)) terminate() + else rec(needleEnd) + } + rec(offset) + } catch { + case NotEnoughDataException ⇒ continue(input.takeRight(needle.length), 0)(parsePreamble) + } + } + + @tailrec def parseHeaderLines(input: ByteString, lineStart: Int, headers: List[HttpHeader] = Nil, + headerCount: Int = 0, cth: Option[`Content-Type`] = None): StateResult = { + var lineEnd = 0 + val resultHeader = + try { + lineEnd = headerParser.parseHeaderLine(input, lineStart)() + headerParser.resultHeader + } catch { + case NotEnoughDataException ⇒ null + } + resultHeader match { + case null ⇒ continue(input, lineStart)(parseHeaderLinesAux(headers, headerCount, cth)) + + case HttpHeaderParser.EmptyHeader ⇒ + val contentType = cth match { + case Some(x) ⇒ x.contentType + case None ⇒ defaultContentType + } + parseEntity(headers, contentType)(input, lineEnd) + + case h: `Content-Type` ⇒ + if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, Some(h)) + else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, cth) + else fail("multipart part must not contain more than one Content-Type header") + + case h if headerCount < maxHeaderCount ⇒ parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, cth) + + case _ ⇒ fail(s"multipart part contains more than the configured limit of $maxHeaderCount headers") + } + } + + // work-around for compiler complaining about non-tail-recursion if we inline this method + def parseHeaderLinesAux(headers: List[HttpHeader], headerCount: Int, + cth: Option[`Content-Type`])(input: ByteString, lineStart: Int): StateResult = + parseHeaderLines(input, lineStart, headers, headerCount, cth) + + def parseEntity(headers: List[HttpHeader], contentType: ContentType, + emitPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = { + (headers, ct, bytes) ⇒ + emit(BodyPartStart(headers, entityParts ⇒ HttpEntity.CloseDelimited(ct, + Flow(entityParts).collect { case EntityPart(data) ⇒ data }.toProducer(materializer)))) + emit(bytes) + }, + emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = { + (headers, ct, bytes) ⇒ emit(BodyPartStart(headers, _ ⇒ HttpEntity.Strict(ct, bytes))) + })(input: ByteString, offset: Int): StateResult = + try { + @tailrec def rec(index: Int): StateResult = { + val currentPartEnd = boyerMoore.nextIndex(input, index) + def emitFinalChunk() = emitFinalPartChunk(headers, contentType, input.slice(offset, currentPartEnd)) + val needleEnd = currentPartEnd + needle.length + if (crlf(input, needleEnd)) { + emitFinalChunk() + parseHeaderLines(input, needleEnd + 2) + } else if (doubleDash(input, needleEnd)) { + emitFinalChunk() + terminate() + } else rec(needleEnd) + } + rec(offset) + } catch { + case NotEnoughDataException ⇒ + // we cannot emit all input bytes since the end of the input might be the start of the next boundary + val emitEnd = math.max(input.length - needle.length, offset) + emitPartChunk(headers, contentType, input.slice(offset, emitEnd)) + val simpleEmit: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = (_, _, bytes) ⇒ emit(bytes) + continue(input drop emitEnd, 0)(parseEntity(null, null, simpleEmit, simpleEmit)) + } + + def emit(bytes: ByteString): Unit = if (bytes.nonEmpty) emit(EntityPart(bytes)) + + def emit(output: Output): Unit = result += output + + def continue(input: ByteString, offset: Int)(next: (ByteString, Int) ⇒ StateResult): StateResult = { + state = + math.signum(offset - input.length) match { + case -1 ⇒ more ⇒ next(input ++ more, offset) + case 0 ⇒ next(_, 0) + case 1 ⇒ throw new IllegalStateException + } + done() + } + + def continue(next: (ByteString, Int) ⇒ StateResult): StateResult = { + state = next(_, 0) + done() + } + + def fail(summary: String): StateResult = fail(ErrorInfo(summary)) + def fail(info: ErrorInfo): StateResult = { + emit(ParseError(info)) + terminate() + } + + def terminate(): StateResult = { + terminated = true + done() + } + + def done(): StateResult = null // StateResult is a phantom type + + def crlf(input: ByteString, offset: Int): Boolean = + byteChar(input, offset) == '\r' && byteChar(input, offset + 1) == '\n' + + def doubleDash(input: ByteString, offset: Int): Boolean = + byteChar(input, offset) == '-' && byteChar(input, offset + 1) == '-' +} + +private[http] object BodyPartParser { + // http://tools.ietf.org/html/rfc2046#section-5.1.1 + val boundaryCharNoSpace = CharPredicate.Digit ++ CharPredicate.Alpha ++ "'()+_,-./:=?" + + sealed trait Output + final case class BodyPartStart(headers: List[HttpHeader], createEntity: Producer[Output] ⇒ HttpEntity) extends Output + final case class EntityPart(data: ByteString) extends Output + final case class ParseError(info: ErrorInfo) extends Output + + final case class Settings( + maxHeaderNameLength: Int, + maxHeaderValueLength: Int, + maxHeaderCount: Int, + illegalHeaderWarnings: Boolean, + headerValueCacheLimit: Int) extends HttpHeaderParser.Settings { + require(maxHeaderNameLength > 0, "maxHeaderNameLength must be > 0") + require(maxHeaderValueLength > 0, "maxHeaderValueLength must be > 0") + require(maxHeaderCount > 0, "maxHeaderCount must be > 0") + require(headerValueCacheLimit >= 0, "headerValueCacheLimit must be >= 0") + def headerValueCacheLimit(headerName: String) = headerValueCacheLimit + } + + // TODO: load from config + val defaultSettings = Settings( + maxHeaderNameLength = 64, + maxHeaderValueLength = 8192, + maxHeaderCount = 64, + illegalHeaderWarnings = true, + headerValueCacheLimit = 8) +} + diff --git a/akka-http-core/src/main/scala/akka/http/parsing/BoyerMoore.scala b/akka-http-core/src/main/scala/akka/http/parsing/BoyerMoore.scala new file mode 100644 index 0000000000..33546932a4 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/parsing/BoyerMoore.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.parsing + +import scala.annotation.tailrec +import akka.util.ByteString + +/** + * Straight-forward Boyer-Moore string search implementation. + */ +private class BoyerMoore(needle: Array[Byte]) { + require(needle.length > 0, "needle must be non-empty") + + private[this] val nl1 = needle.length - 1 + + private[this] val charTable: Array[Int] = { + val table = Array.fill(256)(needle.length) + @tailrec def rec(i: Int): Unit = + if (i < nl1) { + table(needle(i)) = nl1 - i + rec(i + 1) + } + rec(0) + table + } + + private[this] val offsetTable: Array[Int] = { + val table = new Array[Int](needle.length) + + @tailrec def isPrefix(i: Int, j: Int): Boolean = + i == needle.length || needle(i) == needle(j) && isPrefix(i + 1, j + 1) + @tailrec def loop1(i: Int, lastPrefixPosition: Int): Unit = + if (i >= 0) { + val nextLastPrefixPosition = if (isPrefix(i + 1, 0)) i + 1 else lastPrefixPosition + table(nl1 - i) = nextLastPrefixPosition - i + nl1 + loop1(i - 1, nextLastPrefixPosition) + } + loop1(nl1, needle.length) + + @tailrec def suffixLength(i: Int, j: Int, result: Int): Int = + if (i >= 0 && needle(i) == needle(j)) suffixLength(i - 1, j - 1, result + 1) else result + @tailrec def loop2(i: Int): Unit = + if (i < nl1) { + val sl = suffixLength(i, nl1, 0) + table(sl) = nl1 - i + sl + loop2(i + 1) + } + loop2(0) + table + } + + /** + * Returns the index of the next occurrence of `needle` in `haystack` that is >= `offset`. + * If none is found a `NotEnoughDataException` is thrown. + */ + def nextIndex(haystack: ByteString, offset: Int): Int = { + @tailrec def rec(i: Int, j: Int): Int = { + val byte = byteAt(haystack, i) + if (needle(j) == byte) { + if (j == 0) i // found + else rec(i - 1, j - 1) + } else rec(i + math.max(offsetTable(nl1 - j), charTable(byte)), nl1) + } + rec(nl1, nl1) + } +} diff --git a/akka-http-core/src/main/scala/akka/http/parsing/package.scala b/akka-http-core/src/main/scala/akka/http/parsing/package.scala index a67ecc58b7..5e82234018 100644 --- a/akka-http-core/src/main/scala/akka/http/parsing/package.scala +++ b/akka-http-core/src/main/scala/akka/http/parsing/package.scala @@ -26,8 +26,13 @@ package object parsing { /** * INTERNAL API */ - private[http] def byteChar(input: ByteString, ix: Int): Char = - if (ix < input.length) input(ix).toChar else throw NotEnoughDataException + private[http] def byteChar(input: ByteString, ix: Int): Char = byteAt(input, ix).toChar + + /** + * INTERNAL API + */ + private[http] def byteAt(input: ByteString, ix: Int): Byte = + if (ix < input.length) input(ix) else throw NotEnoughDataException /** * INTERNAL API diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/GenericUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/GenericUnmarshallers.scala new file mode 100644 index 0000000000..700fa58972 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/unmarshalling/GenericUnmarshallers.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.unmarshalling + +import akka.http.unmarshalling.Unmarshalling._ + +import scala.concurrent.{ Future, ExecutionContext } + +trait GenericUnmarshallers { + + implicit def targetOptionUnmarshaller[A, B](implicit um: Unmarshaller[A, B], ec: ExecutionContext): Unmarshaller[A, Option[B]] = + um.mapUnmarshalling { + case Success(b) ⇒ Success(Some(b)) + case ContentExpected ⇒ Success(None) + case x: Failure ⇒ x + } + + implicit def sourceOptionUnmarshaller[A, B](implicit um: Unmarshaller[A, B], ec: ExecutionContext): Unmarshaller[Option[A], B] = + Unmarshaller { + case Some(a) ⇒ um(a) + case None ⇒ Future.successful(ContentExpected) + } +} + +object GenericUnmarshallers extends GenericUnmarshallers \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala new file mode 100644 index 0000000000..860579e4c0 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.unmarshalling + +import org.reactivestreams.api.Producer +import scala.concurrent.Future +import akka.actor.ActorRefFactory +import akka.http.parsing.BodyPartParser +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Flow +import akka.http.model._ +import akka.http.util._ +import MediaRanges._ +import MediaTypes._ +import HttpCharsets._ + +trait MultipartUnmarshallers { + + implicit def defaultMultipartContentUnmarshaller(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory) = multipartContentUnmarshaller(`UTF-8`) + def multipartContentUnmarshaller(defaultCharset: HttpCharset)(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartContent] = + multipartPartsUnmarshaller[MultipartContent](`multipart/*`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartContent(_)) + + implicit def defaultMultipartByteRangesUnmarshaller(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory) = multipartByteRangesUnmarshaller(`UTF-8`) + def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartByteRanges] = + multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`, + ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_)) + + def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Producer[BodyPart] ⇒ T)(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory): FromEntityUnmarshaller[T] = + Unmarshaller { entity ⇒ + Future.successful { + if (mediaRange matches entity.contentType.mediaType) { + entity.contentType.mediaType.params.get("boundary") match { + case None ⇒ sys.error("Content-Type with a multipart media type must have a 'boundary' parameter") + case Some(boundary) ⇒ + val bodyParts = Flow(entity.dataBytes(fm)) + .transform(new BodyPartParser(defaultContentType, boundary, fm, actorSystem(refFactory).log)) + .splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart]) + .headAndTail(fm) + .collect { + case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts) ⇒ + BodyPart(createEntity(entityParts), headers) + }.toProducer(fm) + Unmarshalling.Success(create(bodyParts)) + } + } else Unmarshalling.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil) + } + } + + implicit def defaultMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = + multipartFormDataUnmarshaller(strict = true) + def multipartFormDataUnmarshaller(strict: Boolean = true)(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = + multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts ⇒ + def verify(part: BodyPart): BodyPart = part // TODO + val parts = if (strict) Flow(bodyParts).map(verify).toProducer(fm) else bodyParts + MultipartFormData(parts) + } +} + +object MultipartUnmarshallers extends MultipartUnmarshallers \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala new file mode 100644 index 0000000000..e97340d23b --- /dev/null +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.unmarshalling + +import java.io.{ ByteArrayInputStream, InputStreamReader } + +import scala.concurrent.{ Future, ExecutionContext } +import scala.xml.{ XML, NodeSeq } +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Flow +import akka.util.ByteString +import akka.http.model._ +import MediaTypes._ + +trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { + + implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = + Unmarshaller { entity ⇒ + if (entity.isKnownEmpty) Future.successful(Unmarshalling.Success(ByteString.empty)) + else Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).map(Unmarshalling.Success(_)).toFuture(fm) + } + + implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, + ec: ExecutionContext): FromEntityUnmarshaller[Array[Byte]] = + byteStringUnmarshaller.map(_.toArray[Byte]) + + implicit def charArrayUnmarshaller(implicit fm: FlowMaterializer, + ec: ExecutionContext): FromEntityUnmarshaller[Array[Char]] = + byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒ + val charBuffer = entity.contentType.charset.nioCharset.decode(bytes.asByteBuffer) + val array = new Array[Char](charBuffer.length()) + charBuffer.get(array) + array + } + + implicit def stringUnmarshaller(implicit fm: FlowMaterializer, + ec: ExecutionContext): FromEntityUnmarshaller[String] = + byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒ + // FIXME: add `ByteString::decodeString(java.nio.Charset): String` overload!!! + bytes.decodeString(entity.contentType.charset.nioCharset.name) // ouch!!! + } + + private val nodeSeqMediaTypes = List(`text/xml`, `application/xml`, `text/html`, `application/xhtml+xml`) + implicit def nodeSeqUnmarshaller(implicit fm: FlowMaterializer, + ec: ExecutionContext): FromEntityUnmarshaller[NodeSeq] = + byteArrayUnmarshaller flatMapWithInput { (entity, bytes) ⇒ + if (nodeSeqMediaTypes contains entity.contentType.mediaType) { + val parser = XML.parser + try parser.setProperty("http://apache.org/xml/properties/locale", java.util.Locale.ROOT) + catch { + case e: org.xml.sax.SAXNotRecognizedException ⇒ // property is not needed + } + val reader = new InputStreamReader(new ByteArrayInputStream(bytes), entity.contentType.charset.nioCharset) + Unmarshalling.Success(XML.withSAXParser(parser).load(reader)) + } else Unmarshalling.UnsupportedContentType(nodeSeqMediaTypes map (ContentTypeRange(_))) + } +} + +object PredefinedFromEntityUnmarshallers extends PredefinedFromEntityUnmarshallers \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala new file mode 100644 index 0000000000..4b37c04c47 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.unmarshalling + +import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future } +import akka.http.model.ContentTypeRange + +trait Unmarshaller[A, B] extends (A ⇒ Future[Unmarshalling[B]]) { + + def map[C](f: B ⇒ C)(implicit ec: ExecutionContext): Unmarshaller[A, C] = + mapUnmarshalling(_ map f) + + def flatMap[C](f: B ⇒ Unmarshalling[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + mapUnmarshalling(_ flatMap f) + + def mapWithInput[C](f: (A, B) ⇒ C)(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller { a ⇒ this(a) map (_ map (f(a, _))) } + + def flatMapWithInput[C](f: (A, B) ⇒ Unmarshalling[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller { a ⇒ this(a) map (_ flatMap (f(a, _))) } + + def mapUnmarshalling[C](f: Unmarshalling[B] ⇒ Unmarshalling[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller { this(_) map f } + + def withDefaultValue[BB >: B](defaultValue: BB)(implicit ec: ExecutionContext): Unmarshaller[A, BB] = + mapUnmarshalling { _ recover { case Unmarshalling.ContentExpected ⇒ defaultValue } } +} + +object Unmarshaller + extends GenericUnmarshallers + with PredefinedFromEntityUnmarshallers + with UnmarshallerLifting { + + def apply[A, B](f: A ⇒ Future[Unmarshalling[B]]): Unmarshaller[A, B] = + new Unmarshaller[A, B] { def apply(a: A) = f(a) } +} + +sealed trait Unmarshalling[+A] { + def isSuccess: Boolean + def isFailure: Boolean + def map[B](f: A ⇒ B): Unmarshalling[B] + def flatMap[B](f: A ⇒ Unmarshalling[B]): Unmarshalling[B] + def recover[AA >: A](f: PartialFunction[Unmarshalling.Failure, AA]): Unmarshalling[AA] +} + +object Unmarshalling { + final case class Success[+A](value: A) extends Unmarshalling[A] { + def isSuccess = true + def isFailure = false + def map[B](f: A ⇒ B) = Success(f(value)) + def flatMap[B](f: A ⇒ Unmarshalling[B]) = f(value) + def recover[AA >: A](f: PartialFunction[Unmarshalling.Failure, AA]) = this + } + + sealed abstract class Failure extends Unmarshalling[Nothing] { + def isSuccess = false + def isFailure = true + def map[B](f: Nothing ⇒ B) = this + def flatMap[B](f: Nothing ⇒ Unmarshalling[B]) = this + def recover[AA >: Nothing](f: PartialFunction[Unmarshalling.Failure, AA]) = + if (f isDefinedAt this) Success(f(this)) else this + } + + case object ContentExpected extends Failure + + final case class InvalidContent(errorMessage: String, cause: Option[Throwable] = None) extends Failure + object InvalidContent { + def apply(errorMessage: String, cause: Throwable) = new InvalidContent(errorMessage, Some(cause)) + } + + case class UnsupportedContentType(supported: immutable.Seq[ContentTypeRange]) extends Failure +} \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/UnmarshallerLifting.scala b/akka-http/src/main/scala/akka/http/unmarshalling/UnmarshallerLifting.scala new file mode 100644 index 0000000000..1c5b09754f --- /dev/null +++ b/akka-http/src/main/scala/akka/http/unmarshalling/UnmarshallerLifting.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.unmarshalling + +import akka.http.model._ + +trait UnmarshallerLifting { + + implicit def entity2message[A <: HttpMessage, B](implicit um: FromEntityUnmarshaller[B]): Unmarshaller[A, B] = + Unmarshaller { message ⇒ um(message.entity) } + + implicit def message2request[T](implicit um: FromMessageUnmarshaller[T]): FromRequestUnmarshaller[T] = + Unmarshaller { request ⇒ um(request) } + + implicit def message2response[T](implicit um: FromMessageUnmarshaller[T]): FromResponseUnmarshaller[T] = + Unmarshaller { response ⇒ um(response) } +} + +object UnmarshallerLifting extends UnmarshallerLifting \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/package.scala b/akka-http/src/main/scala/akka/http/unmarshalling/package.scala new file mode 100644 index 0000000000..6d130638fa --- /dev/null +++ b/akka-http/src/main/scala/akka/http/unmarshalling/package.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http + +import akka.http.model._ + +package object unmarshalling { + type FromEntityUnmarshaller[T] = Unmarshaller[HttpEntity, T] + type FromMessageUnmarshaller[T] = Unmarshaller[HttpMessage, T] + type FromResponseUnmarshaller[T] = Unmarshaller[HttpResponse, T] + type FromRequestUnmarshaller[T] = Unmarshaller[HttpRequest, T] +}