diff --git a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala
index 34c04db61c..914a9dd1d7 100644
--- a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala
+++ b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala
@@ -5,11 +5,15 @@
package akka.http.model
import java.io.File
+import akka.stream.FlowMaterializer
+import akka.stream.scaladsl.Flow
import org.reactivestreams.Publisher
import akka.stream.impl.SynchronousPublisherFromIterable
import scala.collection.immutable
import headers._
+import scala.concurrent.{ ExecutionContext, Future }
+
trait MultipartParts {
def parts: Publisher[BodyPart]
}
@@ -47,8 +51,26 @@ object MultipartByteRanges {
* All parts must contain a Content-Disposition header with a type form-data
* and a name parameter that is unique.
*/
-final case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts {
- // def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName))
+case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts {
+ /**
+ * Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off
+ * hint.
+ */
+ def toStrict(materializer: FlowMaterializer, maxFieldCount: Int = 1000)(implicit ec: ExecutionContext): Future[StrictMultipartFormData] =
+ Flow(parts).grouped(maxFieldCount).toFuture(materializer).map(new StrictMultipartFormData(_))
+}
+
+/**
+ * A specialized `MultipartFormData` that allows full random access to its parts.
+ */
+class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends MultipartFormData(SynchronousPublisherFromIterable(fields)) {
+ /**
+ * Returns the BodyPart with the given name, if found.
+ */
+ def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName))
+
+ override def toStrict(materializer: FlowMaterializer, maxFieldCount: Int)(implicit ec: ExecutionContext): Future[StrictMultipartFormData] =
+ Future.successful(this)
}
object MultipartFormData {
diff --git a/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala b/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala
index 9a2e9ca938..8f2686a6a1 100644
--- a/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala
+++ b/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala
@@ -37,12 +37,15 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup
- private[this] val needle = new Array[Byte](boundary.length + 4)
- needle(0) = '\r'.toByte
- needle(1) = '\n'.toByte
- needle(2) = '-'.toByte
- needle(3) = '-'.toByte
- boundary.getAsciiBytes(needle, 4)
+ private[this] val needle: Array[Byte] = {
+ val array = new Array[Byte](boundary.length + 4)
+ array(0) = '\r'.toByte
+ array(1) = '\n'.toByte
+ array(2) = '-'.toByte
+ array(3) = '-'.toByte
+ boundary.getAsciiBytes(array, 4)
+ array
+ }
// we use the Boyer-Moore string search algorithm for finding the boundaries in the multipart entity,
// TODO: evaluate whether an upgrade to the more efficient FJS is worth the implementation cost
@@ -64,13 +67,14 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
try state(input)
catch {
case e: ParsingException ⇒ fail(e.info)
- case NotEnoughDataException ⇒ throw new IllegalStateException // we are missing a try/catch{continue} wrapper somewhere
+ case NotEnoughDataException ⇒ throw new IllegalStateException(NotEnoughDataException) // we are missing a try/catch{continue} wrapper somewhere
}
result.toList
}
def tryParseInitialBoundary(input: ByteString): StateResult = {
- // we don't use boyerMoore here because we are looking for the boundary *without* a preceding CRLF
+ // we don't use boyerMoore here because we are testing for the boundary *without* a
+ // preceding CRLF and at a known location (the very beginning of the entity)
try {
@tailrec def rec(ix: Int): StateResult =
if (ix < needle.length) {
diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala
index 6f84d3104b..52821c8b49 100644
--- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala
+++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala
@@ -4,7 +4,8 @@
package akka.http.marshalling
-import java.util.Random
+import scala.concurrent.ExecutionContext
+import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.ActorRefFactory
import akka.parboiled2.util.Base64
import akka.stream.{ FlattenStrategy, FlowMaterializer }
@@ -14,10 +15,8 @@ import akka.http.util.actorSystem
import akka.http.model._
import MediaTypes._
-import scala.concurrent.ExecutionContext
-
trait MultipartMarshallers {
- protected val multipartBoundaryRandom = new Random
+ protected val multipartBoundaryRandom: java.util.Random = ThreadLocalRandom.current()
/**
* Creates a new random 144-bit number and base64 encodes it (using a custom "safe" alphabet, yielding 24 characters).
diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala
index e08f718ae7..39ea9e7c49 100644
--- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala
+++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala
@@ -56,14 +56,30 @@ trait MultipartUnmarshallers {
implicit def defaultMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer,
refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
- multipartFormDataUnmarshaller(strict = true)
- def multipartFormDataUnmarshaller(strict: Boolean = true)(implicit fm: FlowMaterializer,
- refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
+ multipartFormDataUnmarshaller(verifyIntegrity = true)
+ def multipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit fm: FlowMaterializer,
+ refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts ⇒
def verify(part: BodyPart): BodyPart = part // TODO
- val parts = if (strict) Flow(bodyParts).map(verify).toPublisher(fm) else bodyParts
+ val parts = if (verifyIntegrity) Flow(bodyParts).map(verify).toPublisher(fm) else bodyParts
MultipartFormData(parts)
}
+
+ implicit def defaultStrictMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer,
+ refFactory: ActorRefFactory): FromEntityUnmarshaller[StrictMultipartFormData] =
+ strictMultipartFormDataUnmarshaller(verifyIntegrity = true)
+ def strictMultipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit fm: FlowMaterializer,
+ refFactory: ActorRefFactory): FromEntityUnmarshaller[StrictMultipartFormData] = {
+ implicit val ec = actorSystem(refFactory).dispatcher
+ val m = multipartFormDataUnmarshaller(verifyIntegrity)
+ Unmarshaller {
+ m(_) flatMap {
+ case Unmarshalling.Success(mfd) ⇒ mfd.toStrict(fm).map(Unmarshalling.Success.apply)
+ case e: Unmarshalling.Failure ⇒ Future.successful(e)
+ }
+ }
+ }
+
}
object MultipartUnmarshallers extends MultipartUnmarshallers
\ No newline at end of file
diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala
index f253332eb1..8123b437f2 100644
--- a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala
+++ b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala
@@ -42,6 +42,7 @@ object Unmarshaller
sealed trait Unmarshalling[+A] {
def isSuccess: Boolean
def isFailure: Boolean
+ def value: A
def map[B](f: A ⇒ B): Unmarshalling[B]
def flatMap[B](f: A ⇒ Unmarshalling[B]): Unmarshalling[B]
def recover[AA >: A](f: PartialFunction[Unmarshalling.Failure, AA]): Unmarshalling[AA]
@@ -59,6 +60,7 @@ object Unmarshalling {
sealed abstract class Failure extends Unmarshalling[Nothing] {
def isSuccess = false
def isFailure = true
+ def value = sys.error("Expected Unmarshalling.Success but got " + this)
def map[B](f: Nothing ⇒ B) = this
def flatMap[B](f: Nothing ⇒ Unmarshalling[B]) = this
def recover[AA >: Nothing](f: PartialFunction[Unmarshalling.Failure, AA]) =
diff --git a/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala b/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala
index 17a13ae270..88df8fbcee 100644
--- a/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala
+++ b/akka-http/src/test/scala/akka/http/UnmarshallingSpec.scala
@@ -4,6 +4,7 @@
package akka.http
+import scala.xml.NodeSeq
import scala.concurrent.duration._
import scala.concurrent.{ Future, Await }
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
@@ -30,9 +31,9 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
"charArrayUnmarshaller should unmarshal `text/plain` content in UTF-8 to char arrays" in {
Unmarshal(HttpEntity("árvíztűrő ütvefúrógép")).to[Array[Char]] should evaluateTo("árvíztűrő ütvefúrógép".toCharArray)
}
- // "nodeSeqUnmarshaller should unmarshal `text/xml` content in UTF-8 to NodeSeqs" in {
- // Unmarshal(HttpEntity(`text/xml`, "Hällö")).to[NodeSeq].map(_.map(_.text)) shouldEqual "Hällö"
- // }
+ "nodeSeqUnmarshaller should unmarshal `text/xml` content in UTF-8 to NodeSeqs" in {
+ Unmarshal(HttpEntity(`text/xml`, "Hällö")).to[NodeSeq].map(_.map(_.text)) should evaluateTo("Hällö")
+ }
}
"The MultipartUnmarshallers." - {
@@ -116,26 +117,31 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|test@there.com
|--XYZABC--""".stripMarginWithNewline("\r\n"))).to[MultipartFormData] should haveFormData(
- "email" -> BodyPart(
- HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"), "email"))
+ "email" -> BodyPart(HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"), "email"))
}
- // "with a file" in {
- // HttpEntity(`multipart/form-data` withBoundary "XYZABC",
- // """|--XYZABC
- // |Content-Disposition: form-data; name="email"
- // |
- // |test@there.com
- // |--XYZABC
- // |Content-Disposition: form-data; name="userfile"; filename="test.dat"
- // |Content-Type: application/octet-stream
- // |Content-Transfer-Encoding: binary
- // |
- // |filecontent
- // |--XYZABC--""".stripMargin).as[MultipartFormData].get.fields.map {
- // case part @ BodyPart(entity, _) ⇒
- // part.name.get + ": " + entity.as[String].get + part.filename.map(",filename: " + _).getOrElse("")
- // }.mkString("|") === "email: test@there.com|userfile: filecontent,filename: test.dat"
- // }
+ "with a file" in {
+ Unmarshal(HttpEntity(`multipart/form-data` withBoundary "XYZABC",
+ """--XYZABC
+ |Content-Disposition: form-data; name="email"
+ |
+ |test@there.com
+ |--XYZABC
+ |Content-Disposition: form-data; name="userfile"; filename="test.dat"
+ |Content-Type: application/pdf
+ |Content-Transfer-Encoding: binary
+ |
+ |filecontent
+ |--XYZABC--""".stripMarginWithNewline("\r\n"))).to[StrictMultipartFormData] should haveFormData(
+ "email" -> BodyPart(
+ HttpEntity(ContentTypes.`application/octet-stream`, "test@there.com"),
+ List(`Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "email")))),
+ "userfile" -> BodyPart(
+ HttpEntity(MediaTypes.`application/pdf`, "filecontent"),
+ List(RawHeader("Content-Transfer-Encoding", "binary"),
+ `Content-Disposition`(ContentDispositionTypes.`form-data`, Map("name" -> "userfile", "filename" -> "test.dat")))))
+ }
+ // TODO: reactivate after multipart/form-data unmarshalling integrity verification is implemented
+ //
// "reject illegal multipart content" in {
// val Left(MalformedContent(msg, _)) = HttpEntity(`multipart/form-data` withBoundary "XYZABC", "--noboundary--").as[MultipartFormData]
// msg === "Missing start boundary"