From 57efc12fb15a2a3d5a53896d5b73ddc83966a5c2 Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 30 Oct 2014 17:04:36 +0100 Subject: [PATCH 1/4] !str switch to a stable `Source.empty` val --- .../src/main/scala/akka/stream/scaladsl/Source.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 4a44b83e0a..9dadc22883 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -167,10 +167,10 @@ object Source { def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) /** - * Create a `Source` with no elements, i.e. an empty stream that is completed immediately - * for every connected `Sink`. + * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ - def empty[T](): Source[T] = apply(EmptyPublisher[T]) + def empty[T](): Source[T] = _empty + private[this] val _empty: Source[Nothing] = apply(EmptyPublisher) /** * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. From 3400436cc5b29da235b58cb789cddb8bba69bece Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 30 Oct 2014 17:06:30 +0100 Subject: [PATCH 2/4] =htc small fix and improvements to HttpEntity --- .../scala/akka/http/model/HttpEntity.scala | 23 +++++++++++++------ .../scala/akka/http/model/HttpMessage.scala | 2 +- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index f6bfe07d25..015d8b9095 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -10,15 +10,14 @@ import java.lang.{ Iterable ⇒ JIterable } import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.duration.FiniteDuration import scala.collection.immutable +import scala.util.control.NonFatal import akka.util.ByteString -import akka.stream.{ FlowMaterializer, TimerTransformer, Transformer } +import akka.stream.FlowMaterializer import akka.stream.scaladsl._ -import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable } +import akka.stream.{ TimerTransformer, Transformer } import akka.http.util._ import japi.JavaMapping.Implicits._ -import scala.util.control.NonFatal - /** * Models the entity (aka "body" or "content) of an HTTP message. */ @@ -40,7 +39,7 @@ sealed trait HttpEntity extends japi.HttpEntity { /** * Collects all possible parts and returns a potentially future Strict entity for easier processing. - * The Deferrable is failed with an TimeoutException if the stream isn't completed after the given timeout. + * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout. */ def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[HttpEntity.Strict] = { def transformer() = @@ -163,6 +162,8 @@ object HttpEntity { def withContentType(contentType: ContentType): Strict = if (contentType == this.contentType) this else copy(contentType = contentType) + + override def productPrefix = "HttpEntity.Strict" } /** @@ -187,6 +188,8 @@ object HttpEntity { def withContentType(contentType: ContentType): Default = if (contentType == this.contentType) this else copy(contentType = contentType) + + override def productPrefix = "HttpEntity.Default" } /** @@ -198,7 +201,7 @@ object HttpEntity { def contentType: ContentType def data: Source[ByteString] - def isKnownEmpty = data eq EmptyPublisher + def isKnownEmpty = data eq Source.empty def dataBytes: Source[ByteString] = data } @@ -219,6 +222,8 @@ object HttpEntity { override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): CloseDelimited = HttpEntity.CloseDelimited(contentType, data.transform("transformDataBytes-CloseDelimited", transformer)) + + override def productPrefix = "HttpEntity.CloseDelimited" } /** @@ -235,6 +240,8 @@ object HttpEntity { override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): IndefiniteLength = HttpEntity.IndefiniteLength(contentType, data.transform("transformDataBytes-IndefiniteLength", transformer)) + + override def productPrefix = "HttpEntity.IndefiniteLength" } /** @@ -243,7 +250,7 @@ object HttpEntity { final case class Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart]) extends japi.HttpEntityChunked with MessageEntity { - def isKnownEmpty = chunks eq EmptyPublisher + def isKnownEmpty = chunks eq Source.empty override def isChunked: Boolean = true def dataBytes: Source[ByteString] = @@ -283,6 +290,8 @@ object HttpEntity { def withContentType(contentType: ContentType): Chunked = if (contentType == this.contentType) this else copy(contentType = contentType) + override def productPrefix = "HttpEntity.Chunked" + /** Java API */ def getChunks: Source[japi.ChunkStreamPart] = chunks.asInstanceOf[Source[japi.ChunkStreamPart]] } diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index 190eb612e0..d71ed5c0af 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -277,7 +277,7 @@ object HttpRequest { } else // http://tools.ietf.org/html/rfc7230#section-5.4 if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty || hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri - else throw new IllegalUriException("'Host' header value of request to `$uri` doesn't match request target authority", + else throw new IllegalUriException(s"'Host' header value of request to `$uri` doesn't match request target authority", s"Host header: $hostHeader\nrequest target authority: ${uri.authority}") } } From f0f1397058363168f51daa6c450789375d7f3a3a Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 30 Oct 2014 17:07:36 +0100 Subject: [PATCH 3/4] !htc #15674 refactor, improve Multipart model --- .../http/engine/parsing/BodyPartParser.scala | 80 +++-- .../engine/rendering/BodyPartRenderer.scala | 130 ++++---- .../scala/akka/http/model/Multipart.scala | 286 ++++++++++++++++++ .../akka/http/model/MultipartContent.scala | 167 ---------- .../http/model/MultipartContentSpec.scala | 54 ---- .../scala/akka/http/model/MultipartSpec.scala | 66 ++++ 6 files changed, 472 insertions(+), 311 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/model/Multipart.scala delete mode 100644 akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala delete mode 100644 akka-http-core/src/test/scala/akka/http/model/MultipartContentSpec.scala create mode 100644 akka-http-core/src/test/scala/akka/http/model/MultipartSpec.scala diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala index df2a355a67..53810b35a7 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala @@ -53,6 +53,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, private[this] val headerParser = HttpHeaderParser(settings, warnOnIllegalHeader) // TODO: prevent re-priming header parser from scratch private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs private[this] var state: ByteString ⇒ StateResult = tryParseInitialBoundary + private[this] var receivedInitialBoundary = false private[this] var terminated = false override def isComplete = terminated @@ -70,24 +71,21 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, result.toList } - def tryParseInitialBoundary(input: ByteString): StateResult = { + def tryParseInitialBoundary(input: ByteString): StateResult = // we don't use boyerMoore here because we are testing for the boundary *without* a // preceding CRLF and at a known location (the very beginning of the entity) try { - @tailrec def rec(ix: Int): StateResult = - if (ix < needle.length) { - if (byteAt(input, ix - 2) == needle(ix)) rec(ix + 1) - else parsePreamble(input, 0) - } else { - if (crlf(input, ix - 2)) parseHeaderLines(input, ix) - else if (doubleDash(input, ix - 2)) terminate() - else parsePreamble(input, 0) - } - rec(2) + if (boundary(input, 0)) { + val ix = boundaryLength + if (crlf(input, ix)) { + receivedInitialBoundary = true + parseHeaderLines(input, ix + 2) + } else if (doubleDash(input, ix)) terminate() + else parsePreamble(input, 0) + } else parsePreamble(input, 0) } catch { case NotEnoughDataException ⇒ continue((input, _) ⇒ tryParseInitialBoundary(input)) } - } def parsePreamble(input: ByteString, offset: Int): StateResult = { try { @@ -105,32 +103,41 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, @tailrec def parseHeaderLines(input: ByteString, lineStart: Int, headers: List[HttpHeader] = Nil, headerCount: Int = 0, cth: Option[`Content-Type`] = None): StateResult = { - var lineEnd = 0 - val resultHeader = - try { - lineEnd = headerParser.parseHeaderLine(input, lineStart)() - headerParser.resultHeader - } catch { - case NotEnoughDataException ⇒ null + def contentType = + cth match { + case Some(x) ⇒ x.contentType + case None ⇒ defaultContentType } - resultHeader match { - case null ⇒ continue(input, lineStart)(parseHeaderLinesAux(headers, headerCount, cth)) - case HttpHeaderParser.EmptyHeader ⇒ - val contentType = cth match { - case Some(x) ⇒ x.contentType - case None ⇒ defaultContentType + if (boundary(input, lineStart)) { + emit(BodyPartStart(headers, _ ⇒ HttpEntity.empty(contentType))) + val ix = lineStart + boundaryLength + if (crlf(input, ix)) parseHeaderLines(input, ix + 2) + else if (doubleDash(input, ix)) terminate() + else fail("Illegal multipart boundary in message content") + } else { + var lineEnd = 0 + val resultHeader = + try { + lineEnd = headerParser.parseHeaderLine(input, lineStart)() + headerParser.resultHeader + } catch { + case NotEnoughDataException ⇒ null } - parseEntity(headers, contentType)(input, lineEnd) + resultHeader match { + case null ⇒ continue(input, lineStart)(parseHeaderLinesAux(headers, headerCount, cth)) - case h: `Content-Type` ⇒ - if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, Some(h)) - else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, cth) - else fail("multipart part must not contain more than one Content-Type header") + case HttpHeaderParser.EmptyHeader ⇒ parseEntity(headers, contentType)(input, lineEnd) - case h if headerCount < maxHeaderCount ⇒ parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, cth) + case h: `Content-Type` ⇒ + if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, Some(h)) + else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, cth) + else fail("multipart part must not contain more than one Content-Type header") - case _ ⇒ fail(s"multipart part contains more than the configured limit of $maxHeaderCount headers") + case h if headerCount < maxHeaderCount ⇒ parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, cth) + + case _ ⇒ fail(s"multipart part contains more than the configured limit of $maxHeaderCount headers") + } } } @@ -204,11 +211,20 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, def done(): StateResult = null // StateResult is a phantom type + // the length of the needle without the preceding CRLF + def boundaryLength = needle.length - 2 + + @tailrec def boundary(input: ByteString, offset: Int, ix: Int = 2): Boolean = + (ix == needle.length) || (byteAt(input, offset + ix - 2) == needle(ix)) && boundary(input, offset, ix + 1) + def crlf(input: ByteString, offset: Int): Boolean = byteChar(input, offset) == '\r' && byteChar(input, offset + 1) == '\n' def doubleDash(input: ByteString, offset: Int): Boolean = byteChar(input, offset) == '-' && byteChar(input, offset + 1) == '-' + + override def onTermination(e: Option[Throwable]): List[BodyPartParser.Output] = + if (terminated || !receivedInitialBoundary) Nil else ParseError(ErrorInfo("Unexpected end of multipart entity")) :: Nil } private[http] object BodyPartParser { diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala index a6cf660013..da36a7f045 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala @@ -5,7 +5,7 @@ package akka.http.engine.rendering import java.nio.charset.Charset -import scala.annotation.tailrec +import scala.collection.immutable import akka.event.LoggingAdapter import akka.http.model._ import akka.http.model.headers._ @@ -19,73 +19,87 @@ import HttpEntity._ /** * INTERNAL API */ -private[http] class BodyPartRenderer(boundary: String, - nioCharset: Charset, - partHeadersSizeHint: Int, - log: LoggingAdapter) extends Transformer[BodyPart, Source[ChunkStreamPart]] { +private[http] object BodyPartRenderer { - private[this] var firstBoundaryRendered = false + def streamed(boundary: String, + nioCharset: Charset, + partHeadersSizeHint: Int, + log: LoggingAdapter): Transformer[Multipart.BodyPart, Source[ChunkStreamPart]] = + new Transformer[Multipart.BodyPart, Source[ChunkStreamPart]] { + var firstBoundaryRendered = false - def onNext(bodyPart: BodyPart): List[Source[ChunkStreamPart]] = { - val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint) + def onNext(bodyPart: Multipart.BodyPart): List[Source[ChunkStreamPart]] = { + val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint) - def renderBoundary(): Unit = { - if (firstBoundaryRendered) r ~~ CrLf - r ~~ '-' ~~ '-' ~~ boundary ~~ CrLf - } - - def render(h: HttpHeader) = r ~~ h ~~ CrLf - - @tailrec def renderHeaders(remaining: List[HttpHeader]): Unit = - remaining match { - case head :: tail ⇒ head match { - case x: `Content-Length` ⇒ - suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") - renderHeaders(tail) - - case x: `Content-Type` ⇒ - suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.") - renderHeaders(tail) - - case x: RawHeader if (x is "content-type") || (x is "content-length") ⇒ - suppressionWarning(log, x, "illegal RawHeader") - renderHeaders(tail) - - case x ⇒ - render(x) - renderHeaders(tail) + def bodyPartChunks(data: Source[ByteString]): List[Source[ChunkStreamPart]] = { + val entityChunks = data.map[ChunkStreamPart](Chunk(_)) + (Source(Chunk(r.get) :: Nil) ++ entityChunks) :: Nil } - case Nil ⇒ r ~~ CrLf + + def completePartRendering(): List[Source[ChunkStreamPart]] = + bodyPart.entity match { + case x if x.isKnownEmpty ⇒ chunkStream(r.get) + case Strict(_, data) ⇒ chunkStream((r ~~ data).get) + case Default(_, _, data) ⇒ bodyPartChunks(data) + case IndefiniteLength(_, data) ⇒ bodyPartChunks(data) + } + + renderBoundary(r, boundary, suppressInitialCrLf = !firstBoundaryRendered) + firstBoundaryRendered = true + renderEntityContentType(r, bodyPart.entity) + renderHeaders(r, bodyPart.headers, log) + completePartRendering() } - def bodyPartChunks(data: Source[ByteString]): List[Source[ChunkStreamPart]] = { - val entityChunks = data.map[ChunkStreamPart](Chunk(_)) - (Source(Chunk(r.get) :: Nil) ++ entityChunks) :: Nil + override def onTermination(e: Option[Throwable]): List[Source[ChunkStreamPart]] = + if (e.isEmpty && firstBoundaryRendered) { + val r = new ByteStringRendering(boundary.length + 4) + renderFinalBoundary(r, boundary) + chunkStream(r.get) + } else Nil + + private def chunkStream(byteString: ByteString) = + Source[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil } - def completePartRendering(): List[Source[ChunkStreamPart]] = - bodyPart.entity match { - case x if x.isKnownEmpty ⇒ chunkStream(r.get) - case Strict(_, data) ⇒ chunkStream((r ~~ data).get) - case Default(_, _, data) ⇒ bodyPartChunks(data) - case IndefiniteLength(_, data) ⇒ bodyPartChunks(data) + def strict(parts: immutable.Seq[Multipart.BodyPart.Strict], boundary: String, nioCharset: Charset, + partHeadersSizeHint: Int, log: LoggingAdapter): ByteString = { + val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint) + if (parts.nonEmpty) { + for (part ← parts) { + renderBoundary(r, boundary, suppressInitialCrLf = part eq parts.head) + renderEntityContentType(r, part.entity) + renderHeaders(r, part.headers, log) + r ~~ part.entity.data } - - renderBoundary() - firstBoundaryRendered = true - renderEntityContentType(r, bodyPart.entity) - renderHeaders(bodyPart.headers.toList) - completePartRendering() + renderFinalBoundary(r, boundary) + } + r.get } - override def onTermination(e: Option[Throwable]): List[Source[ChunkStreamPart]] = - if (e.isEmpty && firstBoundaryRendered) { - val r = new ByteStringRendering(boundary.length + 4) - r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-' - chunkStream(r.get) - } else Nil + private def renderBoundary(r: Rendering, boundary: String, suppressInitialCrLf: Boolean = false): Unit = { + if (!suppressInitialCrLf) r ~~ CrLf + r ~~ '-' ~~ '-' ~~ boundary ~~ CrLf + } - private def chunkStream(byteString: ByteString) = - Source[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil -} + private def renderFinalBoundary(r: Rendering, boundary: String): Unit = + r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-' + private def renderHeaders(r: Rendering, headers: immutable.Seq[HttpHeader], log: LoggingAdapter): Unit = { + headers foreach renderHeader(r, log) + r ~~ CrLf + } + + private def renderHeader(r: Rendering, log: LoggingAdapter): HttpHeader ⇒ Unit = { + case x: `Content-Length` ⇒ + suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") + + case x: `Content-Type` ⇒ + suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.") + + case x: RawHeader if (x is "content-type") || (x is "content-length") ⇒ + suppressionWarning(log, x, "illegal RawHeader") + + case x ⇒ r ~~ x ~~ CrLf + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala new file mode 100644 index 0000000000..08ccce6cc1 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala @@ -0,0 +1,286 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.model + +import scala.collection.immutable.VectorBuilder +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ Future, ExecutionContext } +import scala.collection.immutable +import scala.util.{ Failure, Success, Try } +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Source +import akka.http.util.FastFuture +import akka.http.model.headers._ +import FastFuture._ + +trait Multipart { + def mediaType: MultipartMediaType + def parts: Source[Multipart.BodyPart] + + /** + * Converts this content into its strict counterpart. + * The given ``timeout`` denotes the max time that an individual part must be read in. + * The Future is failed with an TimeoutException if one part isn't read completely after the given timeout. + */ + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Multipart.Strict] +} + +object Multipart { + + trait Strict extends Multipart { + def strictParts: immutable.Seq[BodyPart.Strict] + } + + trait BodyPart { + def entity: BodyPartEntity + def headers: immutable.Seq[HttpHeader] + + def contentDispositionHeader: Option[`Content-Disposition`] = + headers.collectFirst { case x: `Content-Disposition` ⇒ x } + def dispositionParams: Map[String, String] = + contentDispositionHeader match { + case Some(`Content-Disposition`(_, params)) ⇒ params + case None ⇒ Map.empty + } + def dispositionType: Option[ContentDispositionType] = + contentDispositionHeader.map(_.dispositionType) + + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] + } + + object BodyPart { + trait Strict extends BodyPart { + override def entity: HttpEntity.Strict + } + } + + private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP])(f: BP ⇒ Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] = + // TODO: move to Vector `:+` when https://issues.scala-lang.org/browse/SI-8930 is fixed + parts.fold(new VectorBuilder[Future[BPS]]) { + case (builder, part) ⇒ builder += f(part) + }.fast.flatMap(builder ⇒ FastFuture.sequence(builder.result())) + + //////////////////////// CONCRETE multipart types ///////////////////////// + + /** + * Basic model for multipart content as defined by http://tools.ietf.org/html/rfc2046. + */ + sealed abstract class General extends Multipart { + def mediaType: MultipartMediaType + def parts: Source[General.BodyPart] + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[General.Strict] = + strictify(parts)(_.toStrict(timeout)).fast.map(General.Strict(mediaType, _)) + } + object General { + def apply(mediaType: MultipartMediaType, parts: BodyPart.Strict*): Strict = Strict(mediaType, parts.toVector) + + def apply(_mediaType: MultipartMediaType, _parts: Source[BodyPart]): General = + new General { + def mediaType = _mediaType + def parts = _parts + override def toString = s"General($mediaType, $parts)" + } + + def unapply(value: General): Option[(MultipartMediaType, Source[BodyPart])] = Some(value.mediaType -> value.parts) + + /** + * Strict [[General]]. + */ + case class Strict(mediaType: MultipartMediaType, strictParts: immutable.Seq[BodyPart.Strict]) extends General with Multipart.Strict { + def parts: Source[BodyPart.Strict] = Source(strictParts) + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = + FastFuture.successful(this) + override def productPrefix = "General.Strict" + } + + /** + * Body part of the [[General]] model. + */ + sealed abstract class BodyPart extends Multipart.BodyPart { + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] = + entity.toStrict(timeout).map(BodyPart.Strict(_, headers)) + def toFormDataBodyPart: Try[FormData.BodyPart] + def toByteRangesBodyPart: Try[ByteRanges.BodyPart] + + private[BodyPart] def tryCreateFormDataBodyPart[T](f: (String, Map[String, String], immutable.Seq[HttpHeader]) ⇒ T): Try[T] = { + val params = dispositionParams + params.get("name") match { + case Some(name) ⇒ Success(f(name, params - "name", headers.filterNot(_ is "content-disposition"))) + case None ⇒ Failure(new IllegalHeaderException("multipart/form-data part must contain `Content-Disposition` header with `name` parameter")) + } + } + private[BodyPart] def tryCreateByteRangesBodyPart[T](f: (ContentRange, RangeUnit, immutable.Seq[HttpHeader]) ⇒ T): Try[T] = + headers.collectFirst { case x: `Content-Range` ⇒ x } match { + case Some(`Content-Range`(unit, range)) ⇒ Success(f(range, unit, headers.filterNot(_ is "content-range"))) + case None ⇒ Failure(new IllegalHeaderException("multipart/byteranges part must contain `Content-Range` header")) + } + } + object BodyPart { + def apply(_entity: BodyPartEntity, _headers: immutable.Seq[HttpHeader] = Nil): BodyPart = + new BodyPart { + def entity = _entity + def headers: immutable.Seq[HttpHeader] = _headers + def toFormDataBodyPart: Try[FormData.BodyPart] = tryCreateFormDataBodyPart(FormData.BodyPart(_, entity, _, _)) + def toByteRangesBodyPart: Try[ByteRanges.BodyPart] = tryCreateByteRangesBodyPart(ByteRanges.BodyPart(_, entity, _, _)) + override def toString = s"General.BodyPart($entity, $headers)" + } + + def unapply(value: BodyPart): Option[(BodyPartEntity, immutable.Seq[HttpHeader])] = Some(value.entity -> value.headers) + + /** + * Strict [[General.BodyPart]]. + */ + case class Strict(entity: HttpEntity.Strict, headers: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict { + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] = + FastFuture.successful(this) + override def toFormDataBodyPart: Try[FormData.BodyPart.Strict] = tryCreateFormDataBodyPart(FormData.BodyPart.Strict(_, entity, _, _)) + override def toByteRangesBodyPart: Try[ByteRanges.BodyPart.Strict] = tryCreateByteRangesBodyPart(ByteRanges.BodyPart.Strict(_, entity, _, _)) + override def productPrefix = "General.BodyPart.Strict" + } + } + } + + /** + * Model for `multipart/form-data` content as defined in http://tools.ietf.org/html/rfc2388. + * All parts must have distinct names. (This is not verified!) + */ + sealed abstract class FormData extends Multipart { + def mediaType = MediaTypes.`multipart/form-data` + def parts: Source[FormData.BodyPart] + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[FormData.Strict] = + strictify(parts)(_.toStrict(timeout)).fast.map(FormData.Strict(_)) + } + object FormData { + def apply(parts: BodyPart.Strict*): Strict = Strict(parts.toVector) + + def apply(fields: Map[String, HttpEntity.Strict]): Strict = Strict { + fields.map { case (name, entity) ⇒ BodyPart.Strict(name, entity) }(collection.breakOut) + } + + def apply(_parts: Source[BodyPart]): FormData = new FormData { + def parts = _parts + override def toString = s"FormData($parts)" + } + + /** + * Strict [[FormData]]. + */ + case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends FormData with Multipart.Strict { + def parts: Source[BodyPart.Strict] = Source(strictParts) + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = + FastFuture.successful(this) + override def productPrefix = "FormData.Strict" + } + + /** + * Body part of the [[FormData]] model. + */ + sealed abstract class BodyPart extends Multipart.BodyPart { + def name: String + def additionalDispositionParams: Map[String, String] + def additionalHeaders: immutable.Seq[HttpHeader] + override def headers = contentDispositionHeader.get +: additionalHeaders + override def contentDispositionHeader = Some(`Content-Disposition`(dispositionType.get, dispositionParams)) + override def dispositionParams = additionalDispositionParams.updated("name", name) + override def dispositionType = Some(ContentDispositionTypes.`form-data`) + def filename: Option[String] = additionalDispositionParams.get("filename") + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] = + entity.toStrict(timeout).map(BodyPart.Strict(name, _, additionalDispositionParams, additionalHeaders)) + } + object BodyPart { + def apply(_name: String, _entity: BodyPartEntity, + _additionalDispositionParams: Map[String, String] = Map.empty, + _additionalHeaders: immutable.Seq[HttpHeader] = Nil): BodyPart = + new BodyPart { + def name = _name + def additionalDispositionParams = _additionalDispositionParams + def additionalHeaders = _additionalHeaders + def entity = _entity + override def toString = s"FormData.BodyPart(${_name}, ${_entity}, ${_additionalDispositionParams}, ${_additionalHeaders})" + } + + def unapply(value: BodyPart): Option[(String, BodyPartEntity, Map[String, String], immutable.Seq[HttpHeader])] = + Some((value.name, value.entity, value.additionalDispositionParams, value.additionalHeaders)) + + /** + * Strict [[FormData.BodyPart]]. + */ + case class Strict(name: String, entity: HttpEntity.Strict, + additionalDispositionParams: Map[String, String] = Map.empty, + additionalHeaders: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict { + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] = + FastFuture.successful(this) + override def productPrefix = "FormData.BodyPart.Strict" + } + } + } + + /** + * Model for ``multipart/byteranges`` content as defined by + * https://tools.ietf.org/html/rfc7233#section-5.4.1 and https://tools.ietf.org/html/rfc7233#appendix-A + */ + sealed abstract class ByteRanges extends Multipart { + def mediaType = MediaTypes.`multipart/byteranges` + def parts: Source[ByteRanges.BodyPart] + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[ByteRanges.Strict] = + strictify(parts)(_.toStrict(timeout)).fast.map(ByteRanges.Strict(_)) + } + object ByteRanges { + def apply(parts: BodyPart.Strict*): Strict = Strict(parts.toVector) + + def apply(_parts: Source[BodyPart]): ByteRanges = + new ByteRanges { + def parts = _parts + override def toString = s"ByteRanges($parts)" + } + + /** + * Strict [[ByteRanges]]. + */ + case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends ByteRanges with Multipart.Strict { + def parts: Source[BodyPart.Strict] = Source(strictParts) + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = + FastFuture.successful(this) + override def productPrefix = "ByteRanges.Strict" + } + + /** + * Body part of the [[ByteRanges]] model. + */ + sealed abstract class BodyPart extends Multipart.BodyPart { + def contentRange: ContentRange + def rangeUnit: RangeUnit + def additionalHeaders: immutable.Seq[HttpHeader] + override def headers = contentRangeHeader +: additionalHeaders + def contentRangeHeader = `Content-Range`(rangeUnit, contentRange) + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] = + entity.toStrict(timeout).map(BodyPart.Strict(contentRange, _, rangeUnit, additionalHeaders)) + } + object BodyPart { + def apply(_contentRange: ContentRange, _entity: BodyPartEntity, _rangeUnit: RangeUnit = RangeUnits.Bytes, + _additionalHeaders: immutable.Seq[HttpHeader] = Nil): BodyPart = + new BodyPart { + def contentRange = _contentRange + def entity = _entity + def rangeUnit = _rangeUnit + def additionalHeaders = _additionalHeaders + override def toString = s"ByteRanges.BodyPart(${_contentRange}, ${_entity}, ${_rangeUnit}, ${_additionalHeaders})" + } + + def unapply(value: BodyPart): Option[(ContentRange, BodyPartEntity, RangeUnit, immutable.Seq[HttpHeader])] = + Some((value.contentRange, value.entity, value.rangeUnit, value.additionalHeaders)) + + /** + * Strict [[ByteRanges.BodyPart]]. + */ + case class Strict(contentRange: ContentRange, entity: HttpEntity.Strict, rangeUnit: RangeUnit = RangeUnits.Bytes, + additionalHeaders: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict { + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] = + FastFuture.successful(this) + override def productPrefix = "ByteRanges.BodyPart.Strict" + } + } + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala deleted file mode 100644 index 3b8e1a4f6b..0000000000 --- a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.model - -import java.io.File -import scala.concurrent.{ Future, ExecutionContext } -import scala.collection.immutable -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.{ Sink, Source } -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.http.util.FastFuture -import FastFuture._ -import headers._ - -trait MultipartParts { - def parts: Source[BodyPart] -} - -/** - * Basic model for multipart content as defined in RFC 2046. - * If you are looking for a model for `multipart/form-data` you should be using [[MultipartFormData]]. - */ -final case class MultipartContent(parts: Source[BodyPart]) extends MultipartParts - -object MultipartContent { - val Empty = MultipartContent(Source[BodyPart](Nil)) - - def apply(parts: BodyPart*): MultipartContent = apply(Source[BodyPart](parts.toList)) - - def apply(files: Map[String, FormFile]): MultipartContent = - apply(files.map(e ⇒ BodyPart(e._2, e._1))(collection.breakOut): _*) -} - -/** - * Model for multipart/byteranges content as defined in RFC 2046. - * If you are looking for a model for `multipart/form-data` you should be using [[MultipartFormData]]. - */ -final case class MultipartByteRanges(parts: Source[BodyPart]) extends MultipartParts - -object MultipartByteRanges { - val Empty = MultipartByteRanges(Source[BodyPart](Nil)) - - def apply(parts: BodyPart*): MultipartByteRanges = - if (parts.isEmpty) Empty else MultipartByteRanges(Source[BodyPart](parts.toList)) -} - -/** - * Model for `multipart/form-data` content as defined in RFC 2388. - * All parts must contain a Content-Disposition header with a type form-data - * and a name parameter that is unique. - */ -case class MultipartFormData(parts: Source[BodyPart]) extends MultipartParts { - /** - * Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off - * hint. - */ - def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[StrictMultipartFormData] = - parts.grouped(maxFieldCount).runWith(Sink.future).fast.map(new StrictMultipartFormData(_)) -} - -/** - * A specialized `MultipartFormData` that allows full random access to its parts. - */ -class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends MultipartFormData(Source(fields)) { - /** - * Returns the BodyPart with the given name, if found. - */ - def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName)) - - override def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer) = - FastFuture.successful(this) -} - -object MultipartFormData { - val Empty = MultipartFormData() - - def apply(parts: BodyPart*): MultipartFormData = apply(Source[BodyPart](parts.toList)) - - def apply(fields: Map[String, BodyPart]): MultipartFormData = apply { - fields.map { - case (key, value) ⇒ value.copy(headers = `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> key)) +: value.headers) - }(collection.breakOut): _* - } -} - -final case class FormFile(name: Option[String], entity: BodyPartEntity) - -object FormFile { - def apply(name: String, entity: BodyPartEntity): FormFile = apply(Some(name), entity) -} - -/** - * Model for one part of a multipart message. - */ -final case class BodyPart(entity: BodyPartEntity, headers: immutable.Seq[HttpHeader] = Nil) { - val name: Option[String] = dispositionParameterValue("name") - - def filename: Option[String] = dispositionParameterValue("filename") - def dispositionType: Option[ContentDispositionType] = - headers.collectFirst { - case `Content-Disposition`(dispositionType, _) ⇒ dispositionType - } - - def dispositionParameterValue(parameter: String): Option[String] = - headers.collectFirst { - case `Content-Disposition`(ContentDispositionTypes.`form-data`, params) if params.contains(parameter) ⇒ - params(parameter) - } - - def contentRange: Option[ContentRange] = - headers.collectFirst { - case `Content-Range`(_, contentRange) ⇒ contentRange - } -} - -object BodyPart { - def apply(file: File, fieldName: String): BodyPart = apply(file, fieldName, ContentTypes.`application/octet-stream`) - def apply(file: File, fieldName: String, contentType: ContentType): BodyPart = - apply(HttpEntity(contentType, file), fieldName, Map.empty.updated("filename", file.getName)) - - def apply(formFile: FormFile, fieldName: String): BodyPart = - formFile.name match { - case Some(name) ⇒ apply(formFile.entity, fieldName, Map.empty.updated("filename", name)) - case None ⇒ apply(formFile.entity, fieldName) - } - - def apply(entity: BodyPartEntity, fieldName: String): BodyPart = apply(entity, fieldName, Map.empty[String, String]) - def apply(entity: BodyPartEntity, fieldName: String, params: Map[String, String]): BodyPart = - BodyPart(entity, immutable.Seq(`Content-Disposition`(ContentDispositionTypes.`form-data`, params.updated("name", fieldName)))) -} - -/** - * A convenience extractor that allows to match on a BodyPart including its name if the body-part - * is used as part of form-data. If the part has no name the extractor won't match. - * - * Example: - * - * {{{ - * (formData: StrictMultipartFormData).fields collect { - * case NamedBodyPart("address", data, headers) => data - * } - * }}} - */ -object NamedBodyPart { - def unapply(part: BodyPart): Option[(String, BodyPartEntity, immutable.Seq[HttpHeader])] = - part.name.map(name ⇒ (name, part.entity, part.headers)) -} - -/** - * A convenience extractor that allows to match on a BodyPart including its name and filename - * if the body-part is used as part of form-data. If the part has no name an empty string will be - * extracted, instead. If the part has no filename the extractor won't match. - * - * Example: - * - * {{{ - * (formData: StrictMultipartFormData).fields collect { - * case FileBodyPart("file", filename, data, headers) => filename -> data - * } - * }}} - */ -object FileBodyPart { - def unapply(part: BodyPart): Option[(String, String, BodyPartEntity, immutable.Seq[HttpHeader])] = - part.filename.map(filename ⇒ (part.name.getOrElse(""), filename, part.entity, part.headers)) -} diff --git a/akka-http-core/src/test/scala/akka/http/model/MultipartContentSpec.scala b/akka-http-core/src/test/scala/akka/http/model/MultipartContentSpec.scala deleted file mode 100644 index 87fbe41fc7..0000000000 --- a/akka-http-core/src/test/scala/akka/http/model/MultipartContentSpec.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.model - -import akka.http.model.headers.{ ContentDispositionTypes, `Content-Disposition` } -import akka.util.ByteString - -import org.scalatest.{ Inside, Matchers, WordSpec } - -class MultipartContentSpec extends WordSpec with Matchers with Inside { - "BodyPart" should { - val data = HttpEntity(ByteString("data")) - - "be matched with NamedBodyPart extractor if it has a name" in { - val part = BodyPart(data, "name") - inside(part) { - case NamedBodyPart("name", entity, _) ⇒ entity should equal(data) - } - } - - "not be matched with NamedBodyPart extractor if it has no name" in { - val part = BodyPart(data) - inside(part) { - case NamedBodyPart(name, entity, _) ⇒ fail(s"Shouldn't match but did match with name '$name'") - case _ ⇒ - } - } - - "be matched with FileBodyPart extractor if it contains a file" in { - val part = BodyPart(FormFile("data.txt", data), "name") - inside(part) { - case FileBodyPart("name", "data.txt", entity, _) ⇒ entity should equal(data) - } - } - - "be matched with FileBodyPart extractor if it contains a file but no name" in { - val part = BodyPart(data, `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("filename" -> "data.txt")) :: Nil) - - inside(part) { - case FileBodyPart("", "data.txt", entity, _) ⇒ entity should equal(data) - } - } - - "not be matched with NamedBodyPart extractor if it doesn't contains a file" in { - val part = BodyPart(data) - inside(part) { - case FileBodyPart(name, filename, entity, _) ⇒ fail(s"Shouldn't match but did match with name '$name' and filename '$filename'") - case _ ⇒ - } - } - } -} diff --git a/akka-http-core/src/test/scala/akka/http/model/MultipartSpec.scala b/akka-http-core/src/test/scala/akka/http/model/MultipartSpec.scala new file mode 100644 index 0000000000..bf599243bc --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/model/MultipartSpec.scala @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.model + +import com.typesafe.config.{ Config, ConfigFactory } +import scala.concurrent.Await +import scala.concurrent.duration._ +import org.scalatest.{ BeforeAndAfterAll, Inside, Matchers, WordSpec } +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Source +import akka.util.ByteString +import akka.actor.ActorSystem +import headers._ + +class MultipartSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll { + + val testConf: Config = ConfigFactory.parseString(""" + akka.event-handlers = ["akka.testkit.TestEventListener"] + akka.loglevel = WARNING""") + implicit val system = ActorSystem(getClass.getSimpleName, testConf) + import system.dispatcher + implicit val materializer = FlowMaterializer() + override def afterAll() = system.shutdown() + + "Multipart.General" should { + "support `toStrict` on the streamed model" in { + val streamed = Multipart.General( + MediaTypes.`multipart/mixed`, + Source(Multipart.General.BodyPart(defaultEntity("data"), List(ETag("xzy"))) :: Nil)) + val strict = Await.result(streamed.toStrict(1.second), 1.second) + + strict shouldEqual Multipart.General( + MediaTypes.`multipart/mixed`, + Multipart.General.BodyPart.Strict(HttpEntity("data"), List(ETag("xzy")))) + } + } + + "Multipart.FormData" should { + "support `toStrict` on the streamed model" in { + val streamed = Multipart.FormData(Source( + Multipart.FormData.BodyPart("foo", defaultEntity("FOO")) :: + Multipart.FormData.BodyPart("bar", defaultEntity("BAR")) :: Nil)) + val strict = Await.result(streamed.toStrict(1.second), 1.second) + + strict shouldEqual Multipart.FormData(Map("foo" -> HttpEntity("FOO"), "bar" -> HttpEntity("BAR"))) + } + } + + "Multipart.ByteRanges" should { + "support `toStrict` on the streamed model" in { + val streamed = Multipart.ByteRanges(Source( + Multipart.ByteRanges.BodyPart(ContentRange(0, 6), defaultEntity("snippet"), _additionalHeaders = List(ETag("abc"))) :: + Multipart.ByteRanges.BodyPart(ContentRange(8, 9), defaultEntity("PR"), _additionalHeaders = List(ETag("xzy"))) :: Nil)) + val strict = Await.result(streamed.toStrict(1.second), 1.second) + + strict shouldEqual Multipart.ByteRanges( + Multipart.ByteRanges.BodyPart.Strict(ContentRange(0, 6), HttpEntity("snippet"), additionalHeaders = List(ETag("abc"))), + Multipart.ByteRanges.BodyPart.Strict(ContentRange(8, 9), HttpEntity("PR"), additionalHeaders = List(ETag("xzy")))) + } + } + + def defaultEntity(content: String) = + HttpEntity.Default(ContentTypes.`text/plain(UTF-8)`, content.length, Source(ByteString(content) :: Nil)) +} From 4682f5ddb2e038486f59cfdec43f1bec178ced33 Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 30 Oct 2014 17:09:28 +0100 Subject: [PATCH 4/4] !htp #15674 upgrade to new Multipart model --- .../http/marshalling/MarshallingSpec.scala | 45 +++-- .../unmarshalling/UnmarshallingSpec.scala | 161 +++++++++++------- .../akka/http/marshalling/Marshallers.scala | 4 +- .../marshalling/MultipartMarshallers.scala | 48 +++--- .../MultipartUnmarshallers.scala | 112 +++++++----- 5 files changed, 207 insertions(+), 163 deletions(-) diff --git a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala index 98d838c343..6f75f59271 100644 --- a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Source import akka.http.util._ import akka.http.model._ import headers._ @@ -51,9 +52,9 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with } "The MultipartMarshallers." - { - "multipartContentMarshaller should correctly marshal multipart content with" - { + "multipartMarshaller should correctly marshal multipart content with" - { "one empty part" in { - marshal(MultipartContent(BodyPart(""))) shouldEqual HttpEntity( + marshal(Multipart.General(`multipart/mixed`, Multipart.General.BodyPart.Strict(""))) shouldEqual HttpEntity( contentType = ContentType(`multipart/mixed` withBoundary randomBoundary), string = s"""--$randomBoundary |Content-Type: text/plain; charset=UTF-8 @@ -62,11 +63,11 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with |--$randomBoundary--""".stripMarginWithNewline("\r\n")) } "one non-empty part" in { - marshal(MultipartContent(BodyPart( + marshal(Multipart.General(`multipart/alternative`, Multipart.General.BodyPart.Strict( entity = HttpEntity(ContentType(`text/plain`, `UTF-8`), "test@there.com"), headers = `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")) :: Nil))) shouldEqual HttpEntity( - contentType = ContentType(`multipart/mixed` withBoundary randomBoundary), + contentType = ContentType(`multipart/alternative` withBoundary randomBoundary), string = s"""--$randomBoundary |Content-Type: text/plain; charset=UTF-8 |Content-Disposition: form-data; name=email @@ -75,13 +76,13 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with |--$randomBoundary--""".stripMarginWithNewline("\r\n")) } "two different parts" in { - marshal(MultipartContent( - BodyPart(HttpEntity(ContentType(`text/plain`, Some(`US-ASCII`)), "first part, with a trailing linebreak\r\n")), - BodyPart( + marshal(Multipart.General(`multipart/related`, + Multipart.General.BodyPart.Strict(HttpEntity(ContentType(`text/plain`, Some(`US-ASCII`)), "first part, with a trailing linebreak\r\n")), + Multipart.General.BodyPart.Strict( HttpEntity(ContentType(`application/octet-stream`), "filecontent"), RawHeader("Content-Transfer-Encoding", "binary") :: Nil))) shouldEqual HttpEntity( - contentType = ContentType(`multipart/mixed` withBoundary randomBoundary), + contentType = ContentType(`multipart/related` withBoundary randomBoundary), string = s"""--$randomBoundary |Content-Type: text/plain; charset=US-ASCII | @@ -98,9 +99,9 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with "multipartFormDataMarshaller should correctly marshal 'multipart/form-data' content with" - { "two fields" in { - marshal(MultipartFormData(ListMap( - "surname" -> BodyPart("Mike"), - "age" -> BodyPart(marshal(42))))) shouldEqual + marshal(Multipart.FormData(ListMap( + "surname" -> HttpEntity("Mike"), + "age" -> marshal(42)))) shouldEqual HttpEntity( contentType = ContentType(`multipart/form-data` withBoundary randomBoundary), string = s"""--$randomBoundary @@ -117,32 +118,26 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with } "two fields having a custom `Content-Disposition`" in { - marshal(MultipartFormData( - BodyPart( - HttpEntity(`text/csv`, "name,age\r\n\"John Doe\",20\r\n"), - List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "attachment[0]", "filename" -> "attachment.csv")))), - BodyPart( - HttpEntity("name,age\r\n\"John Doe\",20\r\n".getBytes), - List( - `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "attachment[1]", "filename" -> "attachment.csv")), - RawHeader("Content-Transfer-Encoding", "binary"))))) shouldEqual + marshal(Multipart.FormData(Source(List( + Multipart.FormData.BodyPart("attachment[0]", HttpEntity(`text/csv`, "name,age\r\n\"John Doe\",20\r\n"), + Map("filename" -> "attachment.csv")), + Multipart.FormData.BodyPart("attachment[1]", HttpEntity("naice!".getBytes), + Map("filename" -> "attachment2.csv"), List(RawHeader("Content-Transfer-Encoding", "binary"))))))) shouldEqual HttpEntity( contentType = ContentType(`multipart/form-data` withBoundary randomBoundary), string = s"""--$randomBoundary |Content-Type: text/csv - |Content-Disposition: form-data; name="attachment[0]"; filename=attachment.csv + |Content-Disposition: form-data; filename=attachment.csv; name="attachment[0]" | |name,age |"John Doe",20 | |--$randomBoundary |Content-Type: application/octet-stream - |Content-Disposition: form-data; name="attachment[1]"; filename=attachment.csv + |Content-Disposition: form-data; filename=attachment2.csv; name="attachment[1]" |Content-Transfer-Encoding: binary | - |name,age - |"John Doe",20 - | + |naice! |--$randomBoundary--""".stripMarginWithNewline("\r\n")) } } diff --git a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala index df07ae9cb6..190f95580f 100644 --- a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala @@ -4,6 +4,8 @@ package akka.http.unmarshalling +import akka.util.ByteString + import scala.xml.NodeSeq import scala.concurrent.duration._ import scala.concurrent.{ Future, Await } @@ -37,25 +39,50 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { "The MultipartUnmarshallers." - { - "multipartContentUnmarshaller should correctly unmarshal 'multipart/*' content with" - { - "one empty part" in { + "multipartGeneralUnmarshaller should correctly unmarshal 'multipart/*' content with" - { + "an empty entity" in { + Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC", ByteString.empty)).to[Multipart.General] should haveParts() + } + "an entity without initial boundary" in { + Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC", + """this is + |just preamble text""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts() + } + "an empty part" in { Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC", """--XYZABC - |--XYZABC--""".stripMargin)).to[MultipartContent] should haveParts() + |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts( + Multipart.General.BodyPart.Strict(HttpEntity.empty(ContentTypes.`text/plain(UTF-8)`))) } - "with one part" in { + "two empty parts" in { + Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC", + """--XYZABC + |--XYZABC + |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts( + Multipart.General.BodyPart.Strict(HttpEntity.empty(ContentTypes.`text/plain(UTF-8)`)), + Multipart.General.BodyPart.Strict(HttpEntity.empty(ContentTypes.`text/plain(UTF-8)`))) + } + "a part without entity and missing header separation CRLF" in { + Unmarshal(HttpEntity(`multipart/mixed` withBoundary "XYZABC", + """--XYZABC + |Content-type: text/xml + |Age: 12 + |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts( + Multipart.General.BodyPart.Strict(HttpEntity.empty(MediaTypes.`text/xml`), List(RawHeader("Age", "12")))) + } + "one non-empty part" in { Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", """--- |Content-type: text/plain; charset=UTF8 |content-disposition: form-data; name="email" | |test@there.com - |-----""".stripMarginWithNewline("\r\n"))).to[MultipartContent] should haveParts( - BodyPart( + |-----""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts( + Multipart.General.BodyPart.Strict( HttpEntity(ContentTypes.`text/plain(UTF-8)`, "test@there.com"), List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email"))))) } - "with two different parts" in { + "two different parts" in { Unmarshal(HttpEntity(`multipart/mixed` withBoundary "12345", """--12345 | @@ -66,38 +93,48 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Content-Transfer-Encoding: binary | |filecontent - |--12345--""".stripMarginWithNewline("\r\n"))).to[MultipartContent] should haveParts( - BodyPart(HttpEntity(ContentTypes.`text/plain(UTF-8)`, "first part, with a trailing newline\r\n")), - BodyPart( + |--12345--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts( + Multipart.General.BodyPart.Strict(HttpEntity(ContentTypes.`text/plain(UTF-8)`, "first part, with a trailing newline\r\n")), + Multipart.General.BodyPart.Strict( HttpEntity(`application/octet-stream`, "filecontent"), List(RawHeader("Content-Transfer-Encoding", "binary")))) } - "with illegal headers" in ( + "illegal headers" in ( Unmarshal(HttpEntity(`multipart/form-data` withBoundary "XYZABC", """--XYZABC |Date: unknown |content-disposition: form-data; name=email | |test@there.com - |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[MultipartContent] should haveParts( - BodyPart( + |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.General] should haveParts( + Multipart.General.BodyPart.Strict( HttpEntity(ContentTypes.`text/plain(UTF-8)`, "test@there.com"), List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")), RawHeader("date", "unknown"))))) } - "multipartContentUnmarshaller should reject illegal multipart content" in { - val mpc = Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", - """--- - |Content-type: text/plain; charset=UTF8 - |Content-type: application/json - |content-disposition: form-data; name="email" - | - |test@there.com - |-----""".stripMarginWithNewline("\r\n"))) - .to[MultipartContent], 1.second) - Await.result(mpc.parts.runWith(Sink.future).failed, 1.second).getMessage shouldEqual - "multipart part must not contain more than one Content-Type header" + "multipartGeneralUnmarshaller should reject illegal multipart content with" - { + "a stray boundary" in { + Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "ABC", + """--ABC + |Content-type: text/plain; charset=UTF8 + |--ABCContent-type: application/json + |content-disposition: form-data; name="email" + |-----""".stripMarginWithNewline("\r\n"))) + .to[Multipart.General].failed, 1.second).getMessage shouldEqual "Illegal multipart boundary in message content" + } + "duplicate Content-Type header" in { + Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", + """--- + |Content-type: text/plain; charset=UTF8 + |Content-type: application/json + |content-disposition: form-data; name="email" + | + |test@there.com + |-----""".stripMarginWithNewline("\r\n"))) + .to[Multipart.General].failed, 1.second).getMessage shouldEqual + "multipart part must not contain more than one Content-Type header" + } } "multipartByteRangesUnmarshaller should correctly unmarshal multipart/byteranges content with two different parts" in { @@ -112,9 +149,9 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Content-Type: text/plain | |XYZ - |--12345--""".stripMarginWithNewline("\r\n"))).to[MultipartByteRanges] should haveParts( - BodyPart(HttpEntity(ContentTypes.`text/plain`, "ABC"), List(`Content-Range`(ContentRange(0, 2, 26)))), - BodyPart(HttpEntity(ContentTypes.`text/plain`, "XYZ"), List(`Content-Range`(ContentRange(23, 25, 26))))) + |--12345--""".stripMarginWithNewline("\r\n"))).to[Multipart.ByteRanges] should haveParts( + Multipart.ByteRanges.BodyPart.Strict(ContentRange(0, 2, 26), HttpEntity(ContentTypes.`text/plain`, "ABC")), + Multipart.ByteRanges.BodyPart.Strict(ContentRange(23, 25, 26), HttpEntity(ContentTypes.`text/plain`, "XYZ"))) } "multipartFormDataUnmarshaller should correctly unmarshal 'multipart/form-data' content" - { @@ -124,29 +161,37 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |content-disposition: form-data; name=email | |test@there.com - |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[MultipartFormData] should haveFormData( - "email" -> BodyPart(HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"), "email")) + |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[Multipart.FormData] should haveParts( + Multipart.FormData.BodyPart.Strict("email", HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"))) } "with a file" in { - Unmarshal(HttpEntity(`multipart/form-data` withBoundary "XYZABC", - """--XYZABC - |Content-Disposition: form-data; name="email" - | - |test@there.com - |--XYZABC - |Content-Disposition: form-data; name="userfile"; filename="test.dat" - |Content-Type: application/pdf - |Content-Transfer-Encoding: binary - | - |filecontent - |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[StrictMultipartFormData] should haveFormData( - "email" -> BodyPart( - HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"), - List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")))), - "userfile" -> BodyPart( - HttpEntity(MediaTypes.`application/pdf`, "filecontent"), - List(RawHeader("Content-Transfer-Encoding", "binary"), - `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "userfile", "filename" -> "test.dat"))))) + Unmarshal { + HttpEntity.Default( + contentType = `multipart/form-data` withBoundary "XYZABC", + contentLength = 1, // not verified during unmarshalling + data = Source { + List( + ByteString { + """--XYZABC + |Content-Disposition: form-data; name="email" + | + |test@there.com + |--XYZABC + |Content-Dispo""".stripMarginWithNewline("\r\n") + }, + ByteString { + """sition: form-data; name="userfile"; filename="test.dat" + |Content-Type: application/pdf + |Content-Transfer-Encoding: binary + | + |filecontent + |--XYZABC--""".stripMarginWithNewline("\r\n") + }) + }) + }.to[Multipart.FormData].flatMap(_.toStrict(1.second)) should haveParts( + Multipart.FormData.BodyPart.Strict("email", HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com")), + Multipart.FormData.BodyPart.Strict("userfile", HttpEntity(MediaTypes.`application/pdf`, "filecontent"), + Map("filename" -> "test.dat"), List(RawHeader("Content-Transfer-Encoding", "binary")))) } // TODO: reactivate after multipart/form-data unmarshalling integrity verification is implemented // @@ -171,24 +216,10 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def evaluateTo[T](value: T): Matcher[Future[T]] = equal(value).matcher[T] compose (x ⇒ Await.result(x, 1.second)) - def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Future[T]] = - equal(parts).matcher[Seq[BodyPart]] compose { x ⇒ + def haveParts[T <: Multipart](parts: Multipart.BodyPart*): Matcher[Future[T]] = + equal(parts).matcher[Seq[Multipart.BodyPart]] compose { x ⇒ Await.result(x .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.future)) .fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second) } - - def haveFormData(fields: (String, BodyPart)*): Matcher[Future[MultipartFormData]] = - equal(fields).matcher[Seq[(String, BodyPart)]] compose { x ⇒ - Await.result(x - .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.future)) - .fast.recover { case _: NoSuchElementException ⇒ Nil } - .fast.map { - _ map { part ⇒ - part.headers.collectFirst { - case `Content-Disposition`(ContentDispositionTypes.`form-data`, params) ⇒ params("name") - }.get -> part - } - }, 1.second) - } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala index 6650e2934c..36c7361178 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala @@ -27,10 +27,8 @@ object Marshallers extends SingleMarshallerMarshallers { Marshallers(f(first) +: vector) } - implicit val NodeSeqMarshallers: ToEntityMarshallers[NodeSeq] = { - import scala.concurrent.ExecutionContext.Implicits.global + implicit def nodeSeqMarshallers(implicit ec: ExecutionContext): ToEntityMarshallers[NodeSeq] = Marshallers(`text/xml`, `application/xml`, `text/html`, `application/xhtml+xml`)(PredefinedToEntityMarshallers.nodeSeqMarshaller) - } implicit def entity2response[T](implicit m: Marshallers[T, ResponseEntity], ec: ExecutionContext): Marshallers[T, HttpResponse] = m map (entity ⇒ HttpResponse(entity = entity)) diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 53bc3e1174..15dc65e10f 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -4,17 +4,15 @@ package akka.http.marshalling -import scala.concurrent.ExecutionContext +import akka.event.{ NoLogging, LoggingAdapter } + import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.ActorRefFactory import akka.parboiled2.util.Base64 import akka.stream.FlattenStrategy import akka.stream.scaladsl._ import akka.http.engine.rendering.BodyPartRenderer -import akka.http.util.actorSystem -import akka.http.util.FastFuture._ +import akka.http.util.FastFuture import akka.http.model._ -import MediaTypes._ trait MultipartMarshallers { protected val multipartBoundaryRandom: java.util.Random = ThreadLocalRandom.current() @@ -28,31 +26,23 @@ trait MultipartMarshallers { Base64.custom.encodeToString(array, false) } - implicit def multipartByteRangesMarshaller(implicit refFactory: ActorRefFactory): ToEntityMarshaller[MultipartByteRanges] = - multipartPartsMarshaller[MultipartByteRanges](`multipart/byteranges`)(refFactory) - implicit def multipartContentMarshaller(implicit refFactory: ActorRefFactory): ToEntityMarshaller[MultipartContent] = - multipartPartsMarshaller[MultipartContent](`multipart/mixed`)(refFactory) - - private def multipartPartsMarshaller[T <: MultipartParts](mediaType: MultipartMediaType)(implicit refFactory: ActorRefFactory): ToEntityMarshaller[T] = { - val boundary = randomBoundary - val mediaTypeWithBoundary = mediaType withBoundary boundary - Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset) ⇒ - val log = actorSystem(refFactory).log - val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, log) - val chunks = value.parts.transform("bodyPartRenderer", () ⇒ bodyPartRenderer).flatten(FlattenStrategy.concat) - HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks) - } - } - - implicit def multipartFormDataMarshaller(implicit mcm: ToEntityMarshaller[MultipartContent], - ec: ExecutionContext): ToEntityMarshaller[MultipartFormData] = + implicit def multipartMarshaller[T <: Multipart](implicit log: LoggingAdapter = NoLogging): ToEntityMarshaller[T] = Marshaller { value ⇒ - mcm(MultipartContent(value.parts)).fast.map { - case Marshalling.WithOpenCharset(mt, marshal) ⇒ - val mediaType = `multipart/form-data` withBoundary mt.params("boundary") - Marshalling.WithOpenCharset(mediaType, cs ⇒ MediaTypeOverrider.forEntity(marshal(cs), mediaType)) - case x ⇒ throw new IllegalStateException("ToRegularEntityMarshaller[MultipartContent] is expected to produce " + - "a Marshalling.WithOpenCharset, not a " + x) + val boundary = randomBoundary + val contentType = ContentType(value.mediaType withBoundary boundary) + FastFuture.successful { + Marshalling.WithOpenCharset(contentType.mediaType, { charset ⇒ + value match { + case x: Multipart.Strict ⇒ + val data = BodyPartRenderer.strict(x.strictParts, boundary, charset.nioCharset, partHeadersSizeHint = 128, log) + HttpEntity(contentType, data) + case _ ⇒ + val chunks = value.parts + .transform("bodyPartRenderer", () ⇒ BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) + .flatten(FlattenStrategy.concat) + HttpEntity.Chunked(contentType, chunks) + } + }) } } } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 51bb68b7d7..9b1a0b9b63 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -4,65 +4,95 @@ package akka.http.unmarshalling -import akka.actor.ActorRefFactory -import akka.stream.FlowMaterializer -import akka.stream.scaladsl._ +import scala.collection.immutable +import scala.collection.immutable.VectorBuilder +import scala.concurrent.ExecutionContext +import akka.event.{ NoLogging, LoggingAdapter } import akka.http.engine.parsing.BodyPartParser import akka.http.model._ import akka.http.util._ +import akka.stream.scaladsl._ import MediaRanges._ import MediaTypes._ import HttpCharsets._ trait MultipartUnmarshallers { - implicit def defaultMultipartContentUnmarshaller(implicit refFactory: ActorRefFactory) = multipartContentUnmarshaller(`UTF-8`) - def multipartContentUnmarshaller(defaultCharset: HttpCharset)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartContent] = - multipartPartsUnmarshaller[MultipartContent](`multipart/*`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartContent(_)) + implicit def defaultMultipartGeneralUnmarshaller(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.General] = + multipartGeneralUnmarshaller(`UTF-8`) + def multipartGeneralUnmarshaller(defaultCharset: HttpCharset)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.General] = + multipartUnmarshaller[Multipart.General, Multipart.General.BodyPart, Multipart.General.BodyPart.Strict]( + mediaRange = `multipart/*`, + defaultContentType = ContentTypes.`text/plain` withCharset defaultCharset, + createBodyPart = Multipart.General.BodyPart(_, _), + createStreamed = Multipart.General(_, _), + createStrictBodyPart = Multipart.General.BodyPart.Strict, + createStrict = Multipart.General.Strict) - implicit def defaultMultipartByteRangesUnmarshaller(implicit refFactory: ActorRefFactory) = multipartByteRangesUnmarshaller(`UTF-8`) - def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartByteRanges] = - multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`, - ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_)) + implicit def multipartFormDataUnmarshaller(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.FormData] = + multipartUnmarshaller[Multipart.FormData, Multipart.FormData.BodyPart, Multipart.FormData.BodyPart.Strict]( + mediaRange = `multipart/form-data`, + defaultContentType = ContentTypes.`application/octet-stream`, + createBodyPart = (entity, headers) ⇒ Multipart.General.BodyPart(entity, headers).toFormDataBodyPart.get, + createStreamed = (_, parts) ⇒ Multipart.FormData(parts), + createStrictBodyPart = (entity, headers) ⇒ Multipart.General.BodyPart.Strict(entity, headers).toFormDataBodyPart.get, + createStrict = (_, parts) ⇒ Multipart.FormData.Strict(parts)) - def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Source[BodyPart] ⇒ T)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[T] = + implicit def defaultMultipartByteRangesUnmarshaller(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.ByteRanges] = + multipartByteRangesUnmarshaller(`UTF-8`) + def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[Multipart.ByteRanges] = + multipartUnmarshaller[Multipart.ByteRanges, Multipart.ByteRanges.BodyPart, Multipart.ByteRanges.BodyPart.Strict]( + mediaRange = `multipart/byteranges`, + defaultContentType = ContentTypes.`text/plain` withCharset defaultCharset, + createBodyPart = (entity, headers) ⇒ Multipart.General.BodyPart(entity, headers).toByteRangesBodyPart.get, + createStreamed = (_, parts) ⇒ Multipart.ByteRanges(parts), + createStrictBodyPart = (entity, headers) ⇒ Multipart.General.BodyPart.Strict(entity, headers).toByteRangesBodyPart.get, + createStrict = (_, parts) ⇒ Multipart.ByteRanges.Strict(parts)) + + def multipartUnmarshaller[T <: Multipart, BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](mediaRange: MediaRange, + defaultContentType: ContentType, + createBodyPart: (BodyPartEntity, List[HttpHeader]) ⇒ BP, + createStreamed: (MultipartMediaType, Source[BP]) ⇒ T, + createStrictBodyPart: (HttpEntity.Strict, List[HttpHeader]) ⇒ BPS, + createStrict: (MultipartMediaType, immutable.Seq[BPS]) ⇒ T)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[T] = Unmarshaller { entity ⇒ - if (mediaRange matches entity.contentType.mediaType) { + if (entity.contentType.mediaType.isMultipart && mediaRange.matches(entity.contentType.mediaType)) { entity.contentType.mediaType.params.get("boundary") match { - case None ⇒ FastFuture.failed(UnmarshallingError.InvalidContent("Content-Type with a multipart media type must have a 'boundary' parameter")) + case None ⇒ + FastFuture.failed(UnmarshallingError.InvalidContent("Content-Type with a multipart media type must have a 'boundary' parameter")) case Some(boundary) ⇒ - val bodyParts = entity.dataBytes - .transform("bodyPart", () ⇒ new BodyPartParser(defaultContentType, boundary, actorSystem(refFactory).log)) - .splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart]) - .headAndTail - .collect { - case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts) ⇒ - BodyPart(createEntity(entityParts), headers) - case (BodyPartParser.ParseError(errorInfo), _) ⇒ throw new ParsingException(errorInfo) + import BodyPartParser._ + val parser = new BodyPartParser(defaultContentType, boundary, log) + FastFuture.successful { + entity match { + case HttpEntity.Strict(ContentType(mediaType: MultipartMediaType, _), data) ⇒ + val builder = new VectorBuilder[BPS]() + (parser.onNext(data) ++ parser.onTermination(None)) foreach { + case BodyPartStart(headers, createEntity) ⇒ + val entity = createEntity(Source.empty()) match { + case x: HttpEntity.Strict ⇒ x + case x ⇒ throw new IllegalStateException("Unexpected entity type from strict BodyPartParser: " + x.getClass.getName) + } + builder += createStrictBodyPart(entity, headers) + case ParseError(errorInfo) ⇒ throw new ParsingException(errorInfo) + case x ⇒ throw new IllegalStateException(s"Unexpected BodyPartParser result `x` in strict case") + } + createStrict(mediaType, builder.result()) + case _ ⇒ + val bodyParts = entity.dataBytes + .transform("bodyPart", () ⇒ parser) + .splitWhen(_.isInstanceOf[BodyPartStart]) + .headAndTail + .collect { + case (BodyPartStart(headers, createEntity), entityParts) ⇒ createBodyPart(createEntity(entityParts), headers) + case (ParseError(errorInfo), _) ⇒ throw new ParsingException(errorInfo) + } + createStreamed(entity.contentType.mediaType.asInstanceOf[MultipartMediaType], bodyParts) } - FastFuture.successful(create(bodyParts)) + } } } else FastFuture.failed(UnmarshallingError.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil)) } - - implicit def defaultMultipartFormDataUnmarshaller(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = - multipartFormDataUnmarshaller(verifyIntegrity = true) - def multipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = - multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts ⇒ - def verify(part: BodyPart): BodyPart = part // TODO - val parts = if (verifyIntegrity) bodyParts.map(verify) else bodyParts - MultipartFormData(parts) - } - - implicit def defaultStrictMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[StrictMultipartFormData] = - strictMultipartFormDataUnmarshaller(verifyIntegrity = true) - def strictMultipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[StrictMultipartFormData] = { - implicit val ec = actorSystem(refFactory).dispatcher - multipartFormDataUnmarshaller(verifyIntegrity) flatMap (mfd ⇒ mfd.toStrict()) - } - } object MultipartUnmarshallers extends MultipartUnmarshallers \ No newline at end of file