Merge pull request #16183 from spray/wip-15674-mathias

Refactor, improve Multipart model
This commit is contained in:
Björn Antonsson 2014-11-03 16:06:16 +01:00
commit 159adb79b3
14 changed files with 699 additions and 485 deletions

View file

@ -53,6 +53,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
private[this] val headerParser = HttpHeaderParser(settings, warnOnIllegalHeader) // TODO: prevent re-priming header parser from scratch
private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs
private[this] var state: ByteString StateResult = tryParseInitialBoundary
private[this] var receivedInitialBoundary = false
private[this] var terminated = false
override def isComplete = terminated
@ -70,24 +71,21 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
result.toList
}
def tryParseInitialBoundary(input: ByteString): StateResult = {
def tryParseInitialBoundary(input: ByteString): StateResult =
// we don't use boyerMoore here because we are testing for the boundary *without* a
// preceding CRLF and at a known location (the very beginning of the entity)
try {
@tailrec def rec(ix: Int): StateResult =
if (ix < needle.length) {
if (byteAt(input, ix - 2) == needle(ix)) rec(ix + 1)
if (boundary(input, 0)) {
val ix = boundaryLength
if (crlf(input, ix)) {
receivedInitialBoundary = true
parseHeaderLines(input, ix + 2)
} else if (doubleDash(input, ix)) terminate()
else parsePreamble(input, 0)
} else {
if (crlf(input, ix - 2)) parseHeaderLines(input, ix)
else if (doubleDash(input, ix - 2)) terminate()
else parsePreamble(input, 0)
}
rec(2)
} else parsePreamble(input, 0)
} catch {
case NotEnoughDataException continue((input, _) tryParseInitialBoundary(input))
}
}
def parsePreamble(input: ByteString, offset: Int): StateResult = {
try {
@ -105,6 +103,19 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
@tailrec def parseHeaderLines(input: ByteString, lineStart: Int, headers: List[HttpHeader] = Nil,
headerCount: Int = 0, cth: Option[`Content-Type`] = None): StateResult = {
def contentType =
cth match {
case Some(x) x.contentType
case None defaultContentType
}
if (boundary(input, lineStart)) {
emit(BodyPartStart(headers, _ HttpEntity.empty(contentType)))
val ix = lineStart + boundaryLength
if (crlf(input, ix)) parseHeaderLines(input, ix + 2)
else if (doubleDash(input, ix)) terminate()
else fail("Illegal multipart boundary in message content")
} else {
var lineEnd = 0
val resultHeader =
try {
@ -116,12 +127,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
resultHeader match {
case null continue(input, lineStart)(parseHeaderLinesAux(headers, headerCount, cth))
case HttpHeaderParser.EmptyHeader
val contentType = cth match {
case Some(x) x.contentType
case None defaultContentType
}
parseEntity(headers, contentType)(input, lineEnd)
case HttpHeaderParser.EmptyHeader parseEntity(headers, contentType)(input, lineEnd)
case h: `Content-Type`
if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, Some(h))
@ -133,6 +139,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
case _ fail(s"multipart part contains more than the configured limit of $maxHeaderCount headers")
}
}
}
// work-around for compiler complaining about non-tail-recursion if we inline this method
def parseHeaderLinesAux(headers: List[HttpHeader], headerCount: Int,
@ -204,11 +211,20 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
def done(): StateResult = null // StateResult is a phantom type
// the length of the needle without the preceding CRLF
def boundaryLength = needle.length - 2
@tailrec def boundary(input: ByteString, offset: Int, ix: Int = 2): Boolean =
(ix == needle.length) || (byteAt(input, offset + ix - 2) == needle(ix)) && boundary(input, offset, ix + 1)
def crlf(input: ByteString, offset: Int): Boolean =
byteChar(input, offset) == '\r' && byteChar(input, offset + 1) == '\n'
def doubleDash(input: ByteString, offset: Int): Boolean =
byteChar(input, offset) == '-' && byteChar(input, offset + 1) == '-'
override def onTermination(e: Option[Throwable]): List[BodyPartParser.Output] =
if (terminated || !receivedInitialBoundary) Nil else ParseError(ErrorInfo("Unexpected end of multipart entity")) :: Nil
}
private[http] object BodyPartParser {

View file

@ -5,7 +5,7 @@
package akka.http.engine.rendering
import java.nio.charset.Charset
import scala.annotation.tailrec
import scala.collection.immutable
import akka.event.LoggingAdapter
import akka.http.model._
import akka.http.model.headers._
@ -19,45 +19,18 @@ import HttpEntity._
/**
* INTERNAL API
*/
private[http] class BodyPartRenderer(boundary: String,
private[http] object BodyPartRenderer {
def streamed(boundary: String,
nioCharset: Charset,
partHeadersSizeHint: Int,
log: LoggingAdapter) extends Transformer[BodyPart, Source[ChunkStreamPart]] {
log: LoggingAdapter): Transformer[Multipart.BodyPart, Source[ChunkStreamPart]] =
new Transformer[Multipart.BodyPart, Source[ChunkStreamPart]] {
var firstBoundaryRendered = false
private[this] var firstBoundaryRendered = false
def onNext(bodyPart: BodyPart): List[Source[ChunkStreamPart]] = {
def onNext(bodyPart: Multipart.BodyPart): List[Source[ChunkStreamPart]] = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
def renderBoundary(): Unit = {
if (firstBoundaryRendered) r ~~ CrLf
r ~~ '-' ~~ '-' ~~ boundary ~~ CrLf
}
def render(h: HttpHeader) = r ~~ h ~~ CrLf
@tailrec def renderHeaders(remaining: List[HttpHeader]): Unit =
remaining match {
case head :: tail head match {
case x: `Content-Length`
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
renderHeaders(tail)
case x: `Content-Type`
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.")
renderHeaders(tail)
case x: RawHeader if (x is "content-type") || (x is "content-length")
suppressionWarning(log, x, "illegal RawHeader")
renderHeaders(tail)
case x
render(x)
renderHeaders(tail)
}
case Nil r ~~ CrLf
}
def bodyPartChunks(data: Source[ByteString]): List[Source[ChunkStreamPart]] = {
val entityChunks = data.map[ChunkStreamPart](Chunk(_))
(Source(Chunk(r.get) :: Nil) ++ entityChunks) :: Nil
@ -71,21 +44,62 @@ private[http] class BodyPartRenderer(boundary: String,
case IndefiniteLength(_, data) bodyPartChunks(data)
}
renderBoundary()
renderBoundary(r, boundary, suppressInitialCrLf = !firstBoundaryRendered)
firstBoundaryRendered = true
renderEntityContentType(r, bodyPart.entity)
renderHeaders(bodyPart.headers.toList)
renderHeaders(r, bodyPart.headers, log)
completePartRendering()
}
override def onTermination(e: Option[Throwable]): List[Source[ChunkStreamPart]] =
if (e.isEmpty && firstBoundaryRendered) {
val r = new ByteStringRendering(boundary.length + 4)
r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-'
renderFinalBoundary(r, boundary)
chunkStream(r.get)
} else Nil
private def chunkStream(byteString: ByteString) =
Source[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil
}
}
def strict(parts: immutable.Seq[Multipart.BodyPart.Strict], boundary: String, nioCharset: Charset,
partHeadersSizeHint: Int, log: LoggingAdapter): ByteString = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
if (parts.nonEmpty) {
for (part parts) {
renderBoundary(r, boundary, suppressInitialCrLf = part eq parts.head)
renderEntityContentType(r, part.entity)
renderHeaders(r, part.headers, log)
r ~~ part.entity.data
}
renderFinalBoundary(r, boundary)
}
r.get
}
private def renderBoundary(r: Rendering, boundary: String, suppressInitialCrLf: Boolean = false): Unit = {
if (!suppressInitialCrLf) r ~~ CrLf
r ~~ '-' ~~ '-' ~~ boundary ~~ CrLf
}
private def renderFinalBoundary(r: Rendering, boundary: String): Unit =
r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-'
private def renderHeaders(r: Rendering, headers: immutable.Seq[HttpHeader], log: LoggingAdapter): Unit = {
headers foreach renderHeader(r, log)
r ~~ CrLf
}
private def renderHeader(r: Rendering, log: LoggingAdapter): HttpHeader Unit = {
case x: `Content-Length`
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
case x: `Content-Type`
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.")
case x: RawHeader if (x is "content-type") || (x is "content-length")
suppressionWarning(log, x, "illegal RawHeader")
case x r ~~ x ~~ CrLf
}
}

View file

@ -10,15 +10,14 @@ import java.lang.{ Iterable ⇒ JIterable }
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.util.ByteString
import akka.stream.{ FlowMaterializer, TimerTransformer, Transformer }
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable }
import akka.stream.{ TimerTransformer, Transformer }
import akka.http.util._
import japi.JavaMapping.Implicits._
import scala.util.control.NonFatal
/**
* Models the entity (aka "body" or "content) of an HTTP message.
*/
@ -40,7 +39,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
/**
* Collects all possible parts and returns a potentially future Strict entity for easier processing.
* The Deferrable is failed with an TimeoutException if the stream isn't completed after the given timeout.
* The Future is failed with an TimeoutException if the stream isn't completed after the given timeout.
*/
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[HttpEntity.Strict] = {
def transformer() =
@ -163,6 +162,8 @@ object HttpEntity {
def withContentType(contentType: ContentType): Strict =
if (contentType == this.contentType) this else copy(contentType = contentType)
override def productPrefix = "HttpEntity.Strict"
}
/**
@ -187,6 +188,8 @@ object HttpEntity {
def withContentType(contentType: ContentType): Default =
if (contentType == this.contentType) this else copy(contentType = contentType)
override def productPrefix = "HttpEntity.Default"
}
/**
@ -198,7 +201,7 @@ object HttpEntity {
def contentType: ContentType
def data: Source[ByteString]
def isKnownEmpty = data eq EmptyPublisher
def isKnownEmpty = data eq Source.empty
def dataBytes: Source[ByteString] = data
}
@ -219,6 +222,8 @@ object HttpEntity {
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString]): CloseDelimited =
HttpEntity.CloseDelimited(contentType,
data.transform("transformDataBytes-CloseDelimited", transformer))
override def productPrefix = "HttpEntity.CloseDelimited"
}
/**
@ -235,6 +240,8 @@ object HttpEntity {
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString]): IndefiniteLength =
HttpEntity.IndefiniteLength(contentType,
data.transform("transformDataBytes-IndefiniteLength", transformer))
override def productPrefix = "HttpEntity.IndefiniteLength"
}
/**
@ -243,7 +250,7 @@ object HttpEntity {
final case class Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart])
extends japi.HttpEntityChunked with MessageEntity {
def isKnownEmpty = chunks eq EmptyPublisher
def isKnownEmpty = chunks eq Source.empty
override def isChunked: Boolean = true
def dataBytes: Source[ByteString] =
@ -283,6 +290,8 @@ object HttpEntity {
def withContentType(contentType: ContentType): Chunked =
if (contentType == this.contentType) this else copy(contentType = contentType)
override def productPrefix = "HttpEntity.Chunked"
/** Java API */
def getChunks: Source[japi.ChunkStreamPart] = chunks.asInstanceOf[Source[japi.ChunkStreamPart]]
}

View file

@ -277,7 +277,7 @@ object HttpRequest {
} else // http://tools.ietf.org/html/rfc7230#section-5.4
if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty ||
hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri
else throw new IllegalUriException("'Host' header value of request to `$uri` doesn't match request target authority",
else throw new IllegalUriException(s"'Host' header value of request to `$uri` doesn't match request target authority",
s"Host header: $hostHeader\nrequest target authority: ${uri.authority}")
}
}

View file

@ -0,0 +1,286 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.model
import scala.collection.immutable.VectorBuilder
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, ExecutionContext }
import scala.collection.immutable
import scala.util.{ Failure, Success, Try }
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Source
import akka.http.util.FastFuture
import akka.http.model.headers._
import FastFuture._
trait Multipart {
def mediaType: MultipartMediaType
def parts: Source[Multipart.BodyPart]
/**
* Converts this content into its strict counterpart.
* The given ``timeout`` denotes the max time that an individual part must be read in.
* The Future is failed with an TimeoutException if one part isn't read completely after the given timeout.
*/
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Multipart.Strict]
}
object Multipart {
trait Strict extends Multipart {
def strictParts: immutable.Seq[BodyPart.Strict]
}
trait BodyPart {
def entity: BodyPartEntity
def headers: immutable.Seq[HttpHeader]
def contentDispositionHeader: Option[`Content-Disposition`] =
headers.collectFirst { case x: `Content-Disposition` x }
def dispositionParams: Map[String, String] =
contentDispositionHeader match {
case Some(`Content-Disposition`(_, params)) params
case None Map.empty
}
def dispositionType: Option[ContentDispositionType] =
contentDispositionHeader.map(_.dispositionType)
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict]
}
object BodyPart {
trait Strict extends BodyPart {
override def entity: HttpEntity.Strict
}
}
private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP])(f: BP Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] =
// TODO: move to Vector `:+` when https://issues.scala-lang.org/browse/SI-8930 is fixed
parts.fold(new VectorBuilder[Future[BPS]]) {
case (builder, part) builder += f(part)
}.fast.flatMap(builder FastFuture.sequence(builder.result()))
//////////////////////// CONCRETE multipart types /////////////////////////
/**
* Basic model for multipart content as defined by http://tools.ietf.org/html/rfc2046.
*/
sealed abstract class General extends Multipart {
def mediaType: MultipartMediaType
def parts: Source[General.BodyPart]
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[General.Strict] =
strictify(parts)(_.toStrict(timeout)).fast.map(General.Strict(mediaType, _))
}
object General {
def apply(mediaType: MultipartMediaType, parts: BodyPart.Strict*): Strict = Strict(mediaType, parts.toVector)
def apply(_mediaType: MultipartMediaType, _parts: Source[BodyPart]): General =
new General {
def mediaType = _mediaType
def parts = _parts
override def toString = s"General($mediaType, $parts)"
}
def unapply(value: General): Option[(MultipartMediaType, Source[BodyPart])] = Some(value.mediaType -> value.parts)
/**
* Strict [[General]].
*/
case class Strict(mediaType: MultipartMediaType, strictParts: immutable.Seq[BodyPart.Strict]) extends General with Multipart.Strict {
def parts: Source[BodyPart.Strict] = Source(strictParts)
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
FastFuture.successful(this)
override def productPrefix = "General.Strict"
}
/**
* Body part of the [[General]] model.
*/
sealed abstract class BodyPart extends Multipart.BodyPart {
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] =
entity.toStrict(timeout).map(BodyPart.Strict(_, headers))
def toFormDataBodyPart: Try[FormData.BodyPart]
def toByteRangesBodyPart: Try[ByteRanges.BodyPart]
private[BodyPart] def tryCreateFormDataBodyPart[T](f: (String, Map[String, String], immutable.Seq[HttpHeader]) T): Try[T] = {
val params = dispositionParams
params.get("name") match {
case Some(name) Success(f(name, params - "name", headers.filterNot(_ is "content-disposition")))
case None Failure(new IllegalHeaderException("multipart/form-data part must contain `Content-Disposition` header with `name` parameter"))
}
}
private[BodyPart] def tryCreateByteRangesBodyPart[T](f: (ContentRange, RangeUnit, immutable.Seq[HttpHeader]) T): Try[T] =
headers.collectFirst { case x: `Content-Range` x } match {
case Some(`Content-Range`(unit, range)) Success(f(range, unit, headers.filterNot(_ is "content-range")))
case None Failure(new IllegalHeaderException("multipart/byteranges part must contain `Content-Range` header"))
}
}
object BodyPart {
def apply(_entity: BodyPartEntity, _headers: immutable.Seq[HttpHeader] = Nil): BodyPart =
new BodyPart {
def entity = _entity
def headers: immutable.Seq[HttpHeader] = _headers
def toFormDataBodyPart: Try[FormData.BodyPart] = tryCreateFormDataBodyPart(FormData.BodyPart(_, entity, _, _))
def toByteRangesBodyPart: Try[ByteRanges.BodyPart] = tryCreateByteRangesBodyPart(ByteRanges.BodyPart(_, entity, _, _))
override def toString = s"General.BodyPart($entity, $headers)"
}
def unapply(value: BodyPart): Option[(BodyPartEntity, immutable.Seq[HttpHeader])] = Some(value.entity -> value.headers)
/**
* Strict [[General.BodyPart]].
*/
case class Strict(entity: HttpEntity.Strict, headers: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict {
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] =
FastFuture.successful(this)
override def toFormDataBodyPart: Try[FormData.BodyPart.Strict] = tryCreateFormDataBodyPart(FormData.BodyPart.Strict(_, entity, _, _))
override def toByteRangesBodyPart: Try[ByteRanges.BodyPart.Strict] = tryCreateByteRangesBodyPart(ByteRanges.BodyPart.Strict(_, entity, _, _))
override def productPrefix = "General.BodyPart.Strict"
}
}
}
/**
* Model for `multipart/form-data` content as defined in http://tools.ietf.org/html/rfc2388.
* All parts must have distinct names. (This is not verified!)
*/
sealed abstract class FormData extends Multipart {
def mediaType = MediaTypes.`multipart/form-data`
def parts: Source[FormData.BodyPart]
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[FormData.Strict] =
strictify(parts)(_.toStrict(timeout)).fast.map(FormData.Strict(_))
}
object FormData {
def apply(parts: BodyPart.Strict*): Strict = Strict(parts.toVector)
def apply(fields: Map[String, HttpEntity.Strict]): Strict = Strict {
fields.map { case (name, entity) BodyPart.Strict(name, entity) }(collection.breakOut)
}
def apply(_parts: Source[BodyPart]): FormData = new FormData {
def parts = _parts
override def toString = s"FormData($parts)"
}
/**
* Strict [[FormData]].
*/
case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends FormData with Multipart.Strict {
def parts: Source[BodyPart.Strict] = Source(strictParts)
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
FastFuture.successful(this)
override def productPrefix = "FormData.Strict"
}
/**
* Body part of the [[FormData]] model.
*/
sealed abstract class BodyPart extends Multipart.BodyPart {
def name: String
def additionalDispositionParams: Map[String, String]
def additionalHeaders: immutable.Seq[HttpHeader]
override def headers = contentDispositionHeader.get +: additionalHeaders
override def contentDispositionHeader = Some(`Content-Disposition`(dispositionType.get, dispositionParams))
override def dispositionParams = additionalDispositionParams.updated("name", name)
override def dispositionType = Some(ContentDispositionTypes.`form-data`)
def filename: Option[String] = additionalDispositionParams.get("filename")
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] =
entity.toStrict(timeout).map(BodyPart.Strict(name, _, additionalDispositionParams, additionalHeaders))
}
object BodyPart {
def apply(_name: String, _entity: BodyPartEntity,
_additionalDispositionParams: Map[String, String] = Map.empty,
_additionalHeaders: immutable.Seq[HttpHeader] = Nil): BodyPart =
new BodyPart {
def name = _name
def additionalDispositionParams = _additionalDispositionParams
def additionalHeaders = _additionalHeaders
def entity = _entity
override def toString = s"FormData.BodyPart(${_name}, ${_entity}, ${_additionalDispositionParams}, ${_additionalHeaders})"
}
def unapply(value: BodyPart): Option[(String, BodyPartEntity, Map[String, String], immutable.Seq[HttpHeader])] =
Some((value.name, value.entity, value.additionalDispositionParams, value.additionalHeaders))
/**
* Strict [[FormData.BodyPart]].
*/
case class Strict(name: String, entity: HttpEntity.Strict,
additionalDispositionParams: Map[String, String] = Map.empty,
additionalHeaders: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict {
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] =
FastFuture.successful(this)
override def productPrefix = "FormData.BodyPart.Strict"
}
}
}
/**
* Model for ``multipart/byteranges`` content as defined by
* https://tools.ietf.org/html/rfc7233#section-5.4.1 and https://tools.ietf.org/html/rfc7233#appendix-A
*/
sealed abstract class ByteRanges extends Multipart {
def mediaType = MediaTypes.`multipart/byteranges`
def parts: Source[ByteRanges.BodyPart]
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[ByteRanges.Strict] =
strictify(parts)(_.toStrict(timeout)).fast.map(ByteRanges.Strict(_))
}
object ByteRanges {
def apply(parts: BodyPart.Strict*): Strict = Strict(parts.toVector)
def apply(_parts: Source[BodyPart]): ByteRanges =
new ByteRanges {
def parts = _parts
override def toString = s"ByteRanges($parts)"
}
/**
* Strict [[ByteRanges]].
*/
case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends ByteRanges with Multipart.Strict {
def parts: Source[BodyPart.Strict] = Source(strictParts)
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
FastFuture.successful(this)
override def productPrefix = "ByteRanges.Strict"
}
/**
* Body part of the [[ByteRanges]] model.
*/
sealed abstract class BodyPart extends Multipart.BodyPart {
def contentRange: ContentRange
def rangeUnit: RangeUnit
def additionalHeaders: immutable.Seq[HttpHeader]
override def headers = contentRangeHeader +: additionalHeaders
def contentRangeHeader = `Content-Range`(rangeUnit, contentRange)
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] =
entity.toStrict(timeout).map(BodyPart.Strict(contentRange, _, rangeUnit, additionalHeaders))
}
object BodyPart {
def apply(_contentRange: ContentRange, _entity: BodyPartEntity, _rangeUnit: RangeUnit = RangeUnits.Bytes,
_additionalHeaders: immutable.Seq[HttpHeader] = Nil): BodyPart =
new BodyPart {
def contentRange = _contentRange
def entity = _entity
def rangeUnit = _rangeUnit
def additionalHeaders = _additionalHeaders
override def toString = s"ByteRanges.BodyPart(${_contentRange}, ${_entity}, ${_rangeUnit}, ${_additionalHeaders})"
}
def unapply(value: BodyPart): Option[(ContentRange, BodyPartEntity, RangeUnit, immutable.Seq[HttpHeader])] =
Some((value.contentRange, value.entity, value.rangeUnit, value.additionalHeaders))
/**
* Strict [[ByteRanges.BodyPart]].
*/
case class Strict(contentRange: ContentRange, entity: HttpEntity.Strict, rangeUnit: RangeUnit = RangeUnits.Bytes,
additionalHeaders: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict {
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] =
FastFuture.successful(this)
override def productPrefix = "ByteRanges.BodyPart.Strict"
}
}
}
}

View file

@ -1,167 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.model
import java.io.File
import scala.concurrent.{ Future, ExecutionContext }
import scala.collection.immutable
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.impl.SynchronousPublisherFromIterable
import akka.http.util.FastFuture
import FastFuture._
import headers._
trait MultipartParts {
def parts: Source[BodyPart]
}
/**
* Basic model for multipart content as defined in RFC 2046.
* If you are looking for a model for `multipart/form-data` you should be using [[MultipartFormData]].
*/
final case class MultipartContent(parts: Source[BodyPart]) extends MultipartParts
object MultipartContent {
val Empty = MultipartContent(Source[BodyPart](Nil))
def apply(parts: BodyPart*): MultipartContent = apply(Source[BodyPart](parts.toList))
def apply(files: Map[String, FormFile]): MultipartContent =
apply(files.map(e BodyPart(e._2, e._1))(collection.breakOut): _*)
}
/**
* Model for multipart/byteranges content as defined in RFC 2046.
* If you are looking for a model for `multipart/form-data` you should be using [[MultipartFormData]].
*/
final case class MultipartByteRanges(parts: Source[BodyPart]) extends MultipartParts
object MultipartByteRanges {
val Empty = MultipartByteRanges(Source[BodyPart](Nil))
def apply(parts: BodyPart*): MultipartByteRanges =
if (parts.isEmpty) Empty else MultipartByteRanges(Source[BodyPart](parts.toList))
}
/**
* Model for `multipart/form-data` content as defined in RFC 2388.
* All parts must contain a Content-Disposition header with a type form-data
* and a name parameter that is unique.
*/
case class MultipartFormData(parts: Source[BodyPart]) extends MultipartParts {
/**
* Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off
* hint.
*/
def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[StrictMultipartFormData] =
parts.grouped(maxFieldCount).runWith(Sink.future).fast.map(new StrictMultipartFormData(_))
}
/**
* A specialized `MultipartFormData` that allows full random access to its parts.
*/
class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends MultipartFormData(Source(fields)) {
/**
* Returns the BodyPart with the given name, if found.
*/
def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName))
override def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
FastFuture.successful(this)
}
object MultipartFormData {
val Empty = MultipartFormData()
def apply(parts: BodyPart*): MultipartFormData = apply(Source[BodyPart](parts.toList))
def apply(fields: Map[String, BodyPart]): MultipartFormData = apply {
fields.map {
case (key, value) value.copy(headers = `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> key)) +: value.headers)
}(collection.breakOut): _*
}
}
final case class FormFile(name: Option[String], entity: BodyPartEntity)
object FormFile {
def apply(name: String, entity: BodyPartEntity): FormFile = apply(Some(name), entity)
}
/**
* Model for one part of a multipart message.
*/
final case class BodyPart(entity: BodyPartEntity, headers: immutable.Seq[HttpHeader] = Nil) {
val name: Option[String] = dispositionParameterValue("name")
def filename: Option[String] = dispositionParameterValue("filename")
def dispositionType: Option[ContentDispositionType] =
headers.collectFirst {
case `Content-Disposition`(dispositionType, _) dispositionType
}
def dispositionParameterValue(parameter: String): Option[String] =
headers.collectFirst {
case `Content-Disposition`(ContentDispositionTypes.`form-data`, params) if params.contains(parameter)
params(parameter)
}
def contentRange: Option[ContentRange] =
headers.collectFirst {
case `Content-Range`(_, contentRange) contentRange
}
}
object BodyPart {
def apply(file: File, fieldName: String): BodyPart = apply(file, fieldName, ContentTypes.`application/octet-stream`)
def apply(file: File, fieldName: String, contentType: ContentType): BodyPart =
apply(HttpEntity(contentType, file), fieldName, Map.empty.updated("filename", file.getName))
def apply(formFile: FormFile, fieldName: String): BodyPart =
formFile.name match {
case Some(name) apply(formFile.entity, fieldName, Map.empty.updated("filename", name))
case None apply(formFile.entity, fieldName)
}
def apply(entity: BodyPartEntity, fieldName: String): BodyPart = apply(entity, fieldName, Map.empty[String, String])
def apply(entity: BodyPartEntity, fieldName: String, params: Map[String, String]): BodyPart =
BodyPart(entity, immutable.Seq(`Content-Disposition`(ContentDispositionTypes.`form-data`, params.updated("name", fieldName))))
}
/**
* A convenience extractor that allows to match on a BodyPart including its name if the body-part
* is used as part of form-data. If the part has no name the extractor won't match.
*
* Example:
*
* {{{
* (formData: StrictMultipartFormData).fields collect {
* case NamedBodyPart("address", data, headers) => data
* }
* }}}
*/
object NamedBodyPart {
def unapply(part: BodyPart): Option[(String, BodyPartEntity, immutable.Seq[HttpHeader])] =
part.name.map(name (name, part.entity, part.headers))
}
/**
* A convenience extractor that allows to match on a BodyPart including its name and filename
* if the body-part is used as part of form-data. If the part has no name an empty string will be
* extracted, instead. If the part has no filename the extractor won't match.
*
* Example:
*
* {{{
* (formData: StrictMultipartFormData).fields collect {
* case FileBodyPart("file", filename, data, headers) => filename -> data
* }
* }}}
*/
object FileBodyPart {
def unapply(part: BodyPart): Option[(String, String, BodyPartEntity, immutable.Seq[HttpHeader])] =
part.filename.map(filename (part.name.getOrElse(""), filename, part.entity, part.headers))
}

View file

@ -1,54 +0,0 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.model
import akka.http.model.headers.{ ContentDispositionTypes, `Content-Disposition` }
import akka.util.ByteString
import org.scalatest.{ Inside, Matchers, WordSpec }
class MultipartContentSpec extends WordSpec with Matchers with Inside {
"BodyPart" should {
val data = HttpEntity(ByteString("data"))
"be matched with NamedBodyPart extractor if it has a name" in {
val part = BodyPart(data, "name")
inside(part) {
case NamedBodyPart("name", entity, _) entity should equal(data)
}
}
"not be matched with NamedBodyPart extractor if it has no name" in {
val part = BodyPart(data)
inside(part) {
case NamedBodyPart(name, entity, _) fail(s"Shouldn't match but did match with name '$name'")
case _
}
}
"be matched with FileBodyPart extractor if it contains a file" in {
val part = BodyPart(FormFile("data.txt", data), "name")
inside(part) {
case FileBodyPart("name", "data.txt", entity, _) entity should equal(data)
}
}
"be matched with FileBodyPart extractor if it contains a file but no name" in {
val part = BodyPart(data, `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("filename" -> "data.txt")) :: Nil)
inside(part) {
case FileBodyPart("", "data.txt", entity, _) entity should equal(data)
}
}
"not be matched with NamedBodyPart extractor if it doesn't contains a file" in {
val part = BodyPart(data)
inside(part) {
case FileBodyPart(name, filename, entity, _) fail(s"Shouldn't match but did match with name '$name' and filename '$filename'")
case _
}
}
}
}

View file

@ -0,0 +1,66 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.model
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Inside, Matchers, WordSpec }
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import akka.actor.ActorSystem
import headers._
class MultipartSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
implicit val materializer = FlowMaterializer()
override def afterAll() = system.shutdown()
"Multipart.General" should {
"support `toStrict` on the streamed model" in {
val streamed = Multipart.General(
MediaTypes.`multipart/mixed`,
Source(Multipart.General.BodyPart(defaultEntity("data"), List(ETag("xzy"))) :: Nil))
val strict = Await.result(streamed.toStrict(1.second), 1.second)
strict shouldEqual Multipart.General(
MediaTypes.`multipart/mixed`,
Multipart.General.BodyPart.Strict(HttpEntity("data"), List(ETag("xzy"))))
}
}
"Multipart.FormData" should {
"support `toStrict` on the streamed model" in {
val streamed = Multipart.FormData(Source(
Multipart.FormData.BodyPart("foo", defaultEntity("FOO")) ::
Multipart.FormData.BodyPart("bar", defaultEntity("BAR")) :: Nil))
val strict = Await.result(streamed.toStrict(1.second), 1.second)
strict shouldEqual Multipart.FormData(Map("foo" -> HttpEntity("FOO"), "bar" -> HttpEntity("BAR")))
}
}
"Multipart.ByteRanges" should {
"support `toStrict` on the streamed model" in {
val streamed = Multipart.ByteRanges(Source(
Multipart.ByteRanges.BodyPart(ContentRange(0, 6), defaultEntity("snippet"), _additionalHeaders = List(ETag("abc"))) ::
Multipart.ByteRanges.BodyPart(ContentRange(8, 9), defaultEntity("PR"), _additionalHeaders = List(ETag("xzy"))) :: Nil))
val strict = Await.result(streamed.toStrict(1.second), 1.second)
strict shouldEqual Multipart.ByteRanges(
Multipart.ByteRanges.BodyPart.Strict(ContentRange(0, 6), HttpEntity("snippet"), additionalHeaders = List(ETag("abc"))),
Multipart.ByteRanges.BodyPart.Strict(ContentRange(8, 9), HttpEntity("PR"), additionalHeaders = List(ETag("xzy"))))
}
}
def defaultEntity(content: String) =
HttpEntity.Default(ContentTypes.`text/plain(UTF-8)`, content.length, Source(ByteString(content) :: Nil))
}

View file

@ -10,6 +10,7 @@ import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Source
import akka.http.util._
import akka.http.model._
import headers._
@ -51,9 +52,9 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with
}
"The MultipartMarshallers." - {
"multipartContentMarshaller should correctly marshal multipart content with" - {
"multipartMarshaller should correctly marshal multipart content with" - {
"one empty part" in {
marshal(MultipartContent(BodyPart(""))) shouldEqual HttpEntity(
marshal(Multipart.General(`multipart/mixed`, Multipart.General.BodyPart.Strict(""))) shouldEqual HttpEntity(
contentType = ContentType(`multipart/mixed` withBoundary randomBoundary),
string = s"""--$randomBoundary
|Content-Type: text/plain; charset=UTF-8
@ -62,11 +63,11 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with
|--$randomBoundary--""".stripMarginWithNewline("\r\n"))
}
"one non-empty part" in {
marshal(MultipartContent(BodyPart(
marshal(Multipart.General(`multipart/alternative`, Multipart.General.BodyPart.Strict(
entity = HttpEntity(ContentType(`text/plain`, `UTF-8`), "test@there.com"),
headers = `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")) :: Nil))) shouldEqual
HttpEntity(
contentType = ContentType(`multipart/mixed` withBoundary randomBoundary),
contentType = ContentType(`multipart/alternative` withBoundary randomBoundary),
string = s"""--$randomBoundary
|Content-Type: text/plain; charset=UTF-8
|Content-Disposition: form-data; name=email
@ -75,13 +76,13 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with
|--$randomBoundary--""".stripMarginWithNewline("\r\n"))
}
"two different parts" in {
marshal(MultipartContent(
BodyPart(HttpEntity(ContentType(`text/plain`, Some(`US-ASCII`)), "first part, with a trailing linebreak\r\n")),
BodyPart(
marshal(Multipart.General(`multipart/related`,
Multipart.General.BodyPart.Strict(HttpEntity(ContentType(`text/plain`, Some(`US-ASCII`)), "first part, with a trailing linebreak\r\n")),
Multipart.General.BodyPart.Strict(
HttpEntity(ContentType(`application/octet-stream`), "filecontent"),
RawHeader("Content-Transfer-Encoding", "binary") :: Nil))) shouldEqual
HttpEntity(
contentType = ContentType(`multipart/mixed` withBoundary randomBoundary),
contentType = ContentType(`multipart/related` withBoundary randomBoundary),
string = s"""--$randomBoundary
|Content-Type: text/plain; charset=US-ASCII
|
@ -98,9 +99,9 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with
"multipartFormDataMarshaller should correctly marshal 'multipart/form-data' content with" - {
"two fields" in {
marshal(MultipartFormData(ListMap(
"surname" -> BodyPart("Mike"),
"age" -> BodyPart(marshal(<int>42</int>))))) shouldEqual
marshal(Multipart.FormData(ListMap(
"surname" -> HttpEntity("Mike"),
"age" -> marshal(<int>42</int>)))) shouldEqual
HttpEntity(
contentType = ContentType(`multipart/form-data` withBoundary randomBoundary),
string = s"""--$randomBoundary
@ -117,32 +118,26 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with
}
"two fields having a custom `Content-Disposition`" in {
marshal(MultipartFormData(
BodyPart(
HttpEntity(`text/csv`, "name,age\r\n\"John Doe\",20\r\n"),
List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "attachment[0]", "filename" -> "attachment.csv")))),
BodyPart(
HttpEntity("name,age\r\n\"John Doe\",20\r\n".getBytes),
List(
`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "attachment[1]", "filename" -> "attachment.csv")),
RawHeader("Content-Transfer-Encoding", "binary"))))) shouldEqual
marshal(Multipart.FormData(Source(List(
Multipart.FormData.BodyPart("attachment[0]", HttpEntity(`text/csv`, "name,age\r\n\"John Doe\",20\r\n"),
Map("filename" -> "attachment.csv")),
Multipart.FormData.BodyPart("attachment[1]", HttpEntity("naice!".getBytes),
Map("filename" -> "attachment2.csv"), List(RawHeader("Content-Transfer-Encoding", "binary"))))))) shouldEqual
HttpEntity(
contentType = ContentType(`multipart/form-data` withBoundary randomBoundary),
string = s"""--$randomBoundary
|Content-Type: text/csv
|Content-Disposition: form-data; name="attachment[0]"; filename=attachment.csv
|Content-Disposition: form-data; filename=attachment.csv; name="attachment[0]"
|
|name,age
|"John Doe",20
|
|--$randomBoundary
|Content-Type: application/octet-stream
|Content-Disposition: form-data; name="attachment[1]"; filename=attachment.csv
|Content-Disposition: form-data; filename=attachment2.csv; name="attachment[1]"
|Content-Transfer-Encoding: binary
|
|name,age
|"John Doe",20
|
|naice!
|--$randomBoundary--""".stripMarginWithNewline("\r\n"))
}
}

View file

@ -4,6 +4,8 @@
package akka.http.unmarshalling
import akka.util.ByteString
import scala.xml.NodeSeq
import scala.concurrent.duration._
import scala.concurrent.{ Future, Await }
@ -37,25 +39,50 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
"The MultipartUnmarshallers." - {
"multipartContentUnmarshaller should correctly unmarshal 'multipart/*' content with" - {
"one empty part" in {
"multipartGeneralUnmarshaller should correctly unmarshal 'multipart/*' content with" - {
"an empty entity" in {
Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC", ByteString.empty)).to[Multipart.General] should haveParts()
}
"an entity without initial boundary" in {
Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC",
"""this is
|just preamble text""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts()
}
"an empty part" in {
Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC",
"""--XYZABC
|--XYZABC--""".stripMargin)).to[MultipartContent] should haveParts()
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts(
Multipart.General.BodyPart.Strict(HttpEntity.empty(ContentTypes.`text/plain(UTF-8)`)))
}
"with one part" in {
"two empty parts" in {
Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC",
"""--XYZABC
|--XYZABC
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts(
Multipart.General.BodyPart.Strict(HttpEntity.empty(ContentTypes.`text/plain(UTF-8)`)),
Multipart.General.BodyPart.Strict(HttpEntity.empty(ContentTypes.`text/plain(UTF-8)`)))
}
"a part without entity and missing header separation CRLF" in {
Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC",
"""--XYZABC
|Content-type: text/xml
|Age: 12
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts(
Multipart.General.BodyPart.Strict(HttpEntity.empty(MediaTypes.`text/xml`), List(RawHeader("Age", "12"))))
}
"one non-empty part" in {
Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-",
"""---
|Content-type: text/plain; charset=UTF8
|content-disposition: form-data; name="email"
|
|test@there.com
|-----""".stripMarginWithNewline("\r\n"))).to[MultipartContent] should haveParts(
BodyPart(
|-----""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts(
Multipart.General.BodyPart.Strict(
HttpEntity(ContentTypes.`text/plain(UTF-8)`, "test@there.com"),
List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")))))
}
"with two different parts" in {
"two different parts" in {
Unmarshal(HttpEntity(`multipart/mixed` withBoundary "12345",
"""--12345
|
@ -66,28 +93,38 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Content-Transfer-Encoding: binary
|
|filecontent
|--12345--""".stripMarginWithNewline("\r\n"))).to[MultipartContent] should haveParts(
BodyPart(HttpEntity(ContentTypes.`text/plain(UTF-8)`, "first part, with a trailing newline\r\n")),
BodyPart(
|--12345--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts(
Multipart.General.BodyPart.Strict(HttpEntity(ContentTypes.`text/plain(UTF-8)`, "first part, with a trailing newline\r\n")),
Multipart.General.BodyPart.Strict(
HttpEntity(`application/octet-stream`, "filecontent"),
List(RawHeader("Content-Transfer-Encoding", "binary"))))
}
"with illegal headers" in (
"illegal headers" in (
Unmarshal(HttpEntity(`multipart/form-data` withBoundary "XYZABC",
"""--XYZABC
|Date: unknown
|content-disposition: form-data; name=email
|
|test@there.com
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[MultipartContent] should haveParts(
BodyPart(
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts(
Multipart.General.BodyPart.Strict(
HttpEntity(ContentTypes.`text/plain(UTF-8)`, "test@there.com"),
List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")),
RawHeader("date", "unknown")))))
}
"multipartContentUnmarshaller should reject illegal multipart content" in {
val mpc = Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-",
"multipartGeneralUnmarshaller should reject illegal multipart content with" - {
"a stray boundary" in {
Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "ABC",
"""--ABC
|Content-type: text/plain; charset=UTF8
|--ABCContent-type: application/json
|content-disposition: form-data; name="email"
|-----""".stripMarginWithNewline("\r\n")))
.to[Multipart.General].failed, 1.second).getMessage shouldEqual "Illegal multipart boundary in message content"
}
"duplicate Content-Type header" in {
Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-",
"""---
|Content-type: text/plain; charset=UTF8
|Content-type: application/json
@ -95,10 +132,10 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|test@there.com
|-----""".stripMarginWithNewline("\r\n")))
.to[MultipartContent], 1.second)
Await.result(mpc.parts.runWith(Sink.future).failed, 1.second).getMessage shouldEqual
.to[Multipart.General].failed, 1.second).getMessage shouldEqual
"multipart part must not contain more than one Content-Type header"
}
}
"multipartByteRangesUnmarshaller should correctly unmarshal multipart/byteranges content with two different parts" in {
Unmarshal(HttpEntity(`multipart/byteranges` withBoundary "12345",
@ -112,9 +149,9 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Content-Type: text/plain
|
|XYZ
|--12345--""".stripMarginWithNewline("\r\n"))).to[MultipartByteRanges] should haveParts(
BodyPart(HttpEntity(ContentTypes.`text/plain`, "ABC"), List(`Content-Range`(ContentRange(0, 2, 26)))),
BodyPart(HttpEntity(ContentTypes.`text/plain`, "XYZ"), List(`Content-Range`(ContentRange(23, 25, 26)))))
|--12345--""".stripMarginWithNewline("\r\n"))).to[Multipart.ByteRanges] should haveParts(
Multipart.ByteRanges.BodyPart.Strict(ContentRange(0, 2, 26), HttpEntity(ContentTypes.`text/plain`, "ABC")),
Multipart.ByteRanges.BodyPart.Strict(ContentRange(23, 25, 26), HttpEntity(ContentTypes.`text/plain`, "XYZ")))
}
"multipartFormDataUnmarshaller should correctly unmarshal 'multipart/form-data' content" - {
@ -124,29 +161,37 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|content-disposition: form-data; name=email
|
|test@there.com
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[MultipartFormData] should haveFormData(
"email" -> BodyPart(HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"), "email"))
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.FormData] should haveParts(
Multipart.FormData.BodyPart.Strict("email", HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com")))
}
"with a file" in {
Unmarshal(HttpEntity(`multipart/form-data` withBoundary "XYZABC",
Unmarshal {
HttpEntity.Default(
contentType = `multipart/form-data` withBoundary "XYZABC",
contentLength = 1, // not verified during unmarshalling
data = Source {
List(
ByteString {
"""--XYZABC
|Content-Disposition: form-data; name="email"
|
|test@there.com
|--XYZABC
|Content-Disposition: form-data; name="userfile"; filename="test.dat"
|Content-Dispo""".stripMarginWithNewline("\r\n")
},
ByteString {
"""sition: form-data; name="userfile"; filename="test.dat"
|Content-Type: application/pdf
|Content-Transfer-Encoding: binary
|
|filecontent
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[StrictMultipartFormData] should haveFormData(
"email" -> BodyPart(
HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"),
List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")))),
"userfile" -> BodyPart(
HttpEntity(MediaTypes.`application/pdf`, "filecontent"),
List(RawHeader("Content-Transfer-Encoding", "binary"),
`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "userfile", "filename" -> "test.dat")))))
|--XYZABC--""".stripMarginWithNewline("\r\n")
})
})
}.to[Multipart.FormData].flatMap(_.toStrict(1.second)) should haveParts(
Multipart.FormData.BodyPart.Strict("email", HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com")),
Multipart.FormData.BodyPart.Strict("userfile", HttpEntity(MediaTypes.`application/pdf`, "filecontent"),
Map("filename" -> "test.dat"), List(RawHeader("Content-Transfer-Encoding", "binary"))))
}
// TODO: reactivate after multipart/form-data unmarshalling integrity verification is implemented
//
@ -171,24 +216,10 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def evaluateTo[T](value: T): Matcher[Future[T]] =
equal(value).matcher[T] compose (x Await.result(x, 1.second))
def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Future[T]] =
equal(parts).matcher[Seq[BodyPart]] compose { x
def haveParts[T <: Multipart](parts: Multipart.BodyPart*): Matcher[Future[T]] =
equal(parts).matcher[Seq[Multipart.BodyPart]] compose { x
Await.result(x
.fast.flatMap(x x.parts.grouped(100).runWith(Sink.future))
.fast.recover { case _: NoSuchElementException Nil }, 1.second)
}
def haveFormData(fields: (String, BodyPart)*): Matcher[Future[MultipartFormData]] =
equal(fields).matcher[Seq[(String, BodyPart)]] compose { x
Await.result(x
.fast.flatMap(x x.parts.grouped(100).runWith(Sink.future))
.fast.recover { case _: NoSuchElementException Nil }
.fast.map {
_ map { part
part.headers.collectFirst {
case `Content-Disposition`(ContentDispositionTypes.`form-data`, params) params("name")
}.get -> part
}
}, 1.second)
}
}

View file

@ -27,10 +27,8 @@ object Marshallers extends SingleMarshallerMarshallers {
Marshallers(f(first) +: vector)
}
implicit val NodeSeqMarshallers: ToEntityMarshallers[NodeSeq] = {
import scala.concurrent.ExecutionContext.Implicits.global
implicit def nodeSeqMarshallers(implicit ec: ExecutionContext): ToEntityMarshallers[NodeSeq] =
Marshallers(`text/xml`, `application/xml`, `text/html`, `application/xhtml+xml`)(PredefinedToEntityMarshallers.nodeSeqMarshaller)
}
implicit def entity2response[T](implicit m: Marshallers[T, ResponseEntity], ec: ExecutionContext): Marshallers[T, HttpResponse] =
m map (entity HttpResponse(entity = entity))

View file

@ -4,17 +4,15 @@
package akka.http.marshalling
import scala.concurrent.ExecutionContext
import akka.event.{ NoLogging, LoggingAdapter }
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.ActorRefFactory
import akka.parboiled2.util.Base64
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.http.engine.rendering.BodyPartRenderer
import akka.http.util.actorSystem
import akka.http.util.FastFuture._
import akka.http.util.FastFuture
import akka.http.model._
import MediaTypes._
trait MultipartMarshallers {
protected val multipartBoundaryRandom: java.util.Random = ThreadLocalRandom.current()
@ -28,31 +26,23 @@ trait MultipartMarshallers {
Base64.custom.encodeToString(array, false)
}
implicit def multipartByteRangesMarshaller(implicit refFactory: ActorRefFactory): ToEntityMarshaller[MultipartByteRanges] =
multipartPartsMarshaller[MultipartByteRanges](`multipart/byteranges`)(refFactory)
implicit def multipartContentMarshaller(implicit refFactory: ActorRefFactory): ToEntityMarshaller[MultipartContent] =
multipartPartsMarshaller[MultipartContent](`multipart/mixed`)(refFactory)
private def multipartPartsMarshaller[T <: MultipartParts](mediaType: MultipartMediaType)(implicit refFactory: ActorRefFactory): ToEntityMarshaller[T] = {
val boundary = randomBoundary
val mediaTypeWithBoundary = mediaType withBoundary boundary
Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset)
val log = actorSystem(refFactory).log
val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)
val chunks = value.parts.transform("bodyPartRenderer", () bodyPartRenderer).flatten(FlattenStrategy.concat)
HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks)
}
}
implicit def multipartFormDataMarshaller(implicit mcm: ToEntityMarshaller[MultipartContent],
ec: ExecutionContext): ToEntityMarshaller[MultipartFormData] =
implicit def multipartMarshaller[T <: Multipart](implicit log: LoggingAdapter = NoLogging): ToEntityMarshaller[T] =
Marshaller { value
mcm(MultipartContent(value.parts)).fast.map {
case Marshalling.WithOpenCharset(mt, marshal)
val mediaType = `multipart/form-data` withBoundary mt.params("boundary")
Marshalling.WithOpenCharset(mediaType, cs MediaTypeOverrider.forEntity(marshal(cs), mediaType))
case x throw new IllegalStateException("ToRegularEntityMarshaller[MultipartContent] is expected to produce " +
"a Marshalling.WithOpenCharset, not a " + x)
val boundary = randomBoundary
val contentType = ContentType(value.mediaType withBoundary boundary)
FastFuture.successful {
Marshalling.WithOpenCharset(contentType.mediaType, { charset
value match {
case x: Multipart.Strict
val data = BodyPartRenderer.strict(x.strictParts, boundary, charset.nioCharset, partHeadersSizeHint = 128, log)
HttpEntity(contentType, data)
case _
val chunks = value.parts
.transform("bodyPartRenderer", () BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.flatten(FlattenStrategy.concat)
HttpEntity.Chunked(contentType, chunks)
}
})
}
}
}

View file

@ -4,65 +4,95 @@
package akka.http.unmarshalling
import akka.actor.ActorRefFactory
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.ExecutionContext
import akka.event.{ NoLogging, LoggingAdapter }
import akka.http.engine.parsing.BodyPartParser
import akka.http.model._
import akka.http.util._
import akka.stream.scaladsl._
import MediaRanges._
import MediaTypes._
import HttpCharsets._
trait MultipartUnmarshallers {
implicit def defaultMultipartContentUnmarshaller(implicit refFactory: ActorRefFactory) = multipartContentUnmarshaller(`UTF-8`)
def multipartContentUnmarshaller(defaultCharset: HttpCharset)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartContent] =
multipartPartsUnmarshaller[MultipartContent](`multipart/*`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartContent(_))
implicit def defaultMultipartGeneralUnmarshaller(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.General] =
multipartGeneralUnmarshaller(`UTF-8`)
def multipartGeneralUnmarshaller(defaultCharset: HttpCharset)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.General] =
multipartUnmarshaller[Multipart.General, Multipart.General.BodyPart, Multipart.General.BodyPart.Strict](
mediaRange = `multipart/*`,
defaultContentType = ContentTypes.`text/plain` withCharset defaultCharset,
createBodyPart = Multipart.General.BodyPart(_, _),
createStreamed = Multipart.General(_, _),
createStrictBodyPart = Multipart.General.BodyPart.Strict,
createStrict = Multipart.General.Strict)
implicit def defaultMultipartByteRangesUnmarshaller(implicit refFactory: ActorRefFactory) = multipartByteRangesUnmarshaller(`UTF-8`)
def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartByteRanges] =
multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`,
ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_))
implicit def multipartFormDataUnmarshaller(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.FormData] =
multipartUnmarshaller[Multipart.FormData, Multipart.FormData.BodyPart, Multipart.FormData.BodyPart.Strict](
mediaRange = `multipart/form-data`,
defaultContentType = ContentTypes.`application/octet-stream`,
createBodyPart = (entity, headers) Multipart.General.BodyPart(entity, headers).toFormDataBodyPart.get,
createStreamed = (_, parts) Multipart.FormData(parts),
createStrictBodyPart = (entity, headers) Multipart.General.BodyPart.Strict(entity, headers).toFormDataBodyPart.get,
createStrict = (_, parts) Multipart.FormData.Strict(parts))
def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Source[BodyPart] T)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[T] =
implicit def defaultMultipartByteRangesUnmarshaller(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.ByteRanges] =
multipartByteRangesUnmarshaller(`UTF-8`)
def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.ByteRanges] =
multipartUnmarshaller[Multipart.ByteRanges, Multipart.ByteRanges.BodyPart, Multipart.ByteRanges.BodyPart.Strict](
mediaRange = `multipart/byteranges`,
defaultContentType = ContentTypes.`text/plain` withCharset defaultCharset,
createBodyPart = (entity, headers) Multipart.General.BodyPart(entity, headers).toByteRangesBodyPart.get,
createStreamed = (_, parts) Multipart.ByteRanges(parts),
createStrictBodyPart = (entity, headers) Multipart.General.BodyPart.Strict(entity, headers).toByteRangesBodyPart.get,
createStrict = (_, parts) Multipart.ByteRanges.Strict(parts))
def multipartUnmarshaller[T <: Multipart, BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](mediaRange: MediaRange,
defaultContentType: ContentType,
createBodyPart: (BodyPartEntity, List[HttpHeader]) BP,
createStreamed: (MultipartMediaType, Source[BP]) T,
createStrictBodyPart: (HttpEntity.Strict, List[HttpHeader]) BPS,
createStrict: (MultipartMediaType, immutable.Seq[BPS]) T)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[T] =
Unmarshaller { entity
if (mediaRange matches entity.contentType.mediaType) {
if (entity.contentType.mediaType.isMultipart && mediaRange.matches(entity.contentType.mediaType)) {
entity.contentType.mediaType.params.get("boundary") match {
case None FastFuture.failed(UnmarshallingError.InvalidContent("Content-Type with a multipart media type must have a 'boundary' parameter"))
case None
FastFuture.failed(UnmarshallingError.InvalidContent("Content-Type with a multipart media type must have a 'boundary' parameter"))
case Some(boundary)
import BodyPartParser._
val parser = new BodyPartParser(defaultContentType, boundary, log)
FastFuture.successful {
entity match {
case HttpEntity.Strict(ContentType(mediaType: MultipartMediaType, _), data)
val builder = new VectorBuilder[BPS]()
(parser.onNext(data) ++ parser.onTermination(None)) foreach {
case BodyPartStart(headers, createEntity)
val entity = createEntity(Source.empty()) match {
case x: HttpEntity.Strict x
case x throw new IllegalStateException("Unexpected entity type from strict BodyPartParser: " + x.getClass.getName)
}
builder += createStrictBodyPart(entity, headers)
case ParseError(errorInfo) throw new ParsingException(errorInfo)
case x throw new IllegalStateException(s"Unexpected BodyPartParser result `x` in strict case")
}
createStrict(mediaType, builder.result())
case _
val bodyParts = entity.dataBytes
.transform("bodyPart", () new BodyPartParser(defaultContentType, boundary, actorSystem(refFactory).log))
.splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart])
.transform("bodyPart", () parser)
.splitWhen(_.isInstanceOf[BodyPartStart])
.headAndTail
.collect {
case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts)
BodyPart(createEntity(entityParts), headers)
case (BodyPartParser.ParseError(errorInfo), _) throw new ParsingException(errorInfo)
case (BodyPartStart(headers, createEntity), entityParts) createBodyPart(createEntity(entityParts), headers)
case (ParseError(errorInfo), _) throw new ParsingException(errorInfo)
}
createStreamed(entity.contentType.mediaType.asInstanceOf[MultipartMediaType], bodyParts)
}
}
FastFuture.successful(create(bodyParts))
}
} else FastFuture.failed(UnmarshallingError.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil))
}
implicit def defaultMultipartFormDataUnmarshaller(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
multipartFormDataUnmarshaller(verifyIntegrity = true)
def multipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts
def verify(part: BodyPart): BodyPart = part // TODO
val parts = if (verifyIntegrity) bodyParts.map(verify) else bodyParts
MultipartFormData(parts)
}
implicit def defaultStrictMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[StrictMultipartFormData] =
strictMultipartFormDataUnmarshaller(verifyIntegrity = true)
def strictMultipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[StrictMultipartFormData] = {
implicit val ec = actorSystem(refFactory).dispatcher
multipartFormDataUnmarshaller(verifyIntegrity) flatMap (mfd mfd.toStrict())
}
}
object MultipartUnmarshallers extends MultipartUnmarshallers

View file

@ -168,10 +168,10 @@ object Source {
def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element)))
/**
* Create a `Source` with no elements, i.e. an empty stream that is completed immediately
* for every connected `Sink`.
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
*/
def empty[T](): Source[T] = apply(EmptyPublisher[T])
def empty[T](): Source[T] = _empty
private[this] val _empty: Source[Nothing] = apply(EmptyPublisher)
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.