diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala new file mode 100644 index 0000000000..dba56ff2b2 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.http.scaladsl.server.directives + +import java.io.File + +import akka.http.scaladsl.model.{ MediaTypes, HttpEntity, Multipart, StatusCodes } +import akka.stream.io.Framing +import akka.util.ByteString +import docs.http.scaladsl.server.RoutingSpec + +import scala.concurrent.Future +import scala.util.{ Success, Failure } + +class FileUploadDirectivesExamplesSpec extends RoutingSpec { + + override def testConfigSource = "akka.actor.default-mailbox.mailbox-type = \"akka.dispatch.UnboundedMailbox\"" + + "uploadedFile" in { + + val route = + uploadedFile("csv") { + case (metadata, file) => + // do something with the file and file metadata ... + file.delete() + complete(StatusCodes.OK) + } + + // tests: + val multipartForm = + Multipart.FormData( + Multipart.FormData.BodyPart.Strict( + "csv", + HttpEntity(MediaTypes.`text/plain`, "1,5,7\n11,13,17"), + Map("filename" -> "data.csv"))) + + Post("/", multipartForm) ~> route ~> check { + status shouldEqual StatusCodes.OK + } + + } + + "fileUpload" in { + + // adding integers as a service ;) + val route = + extractRequestContext { ctx => + implicit val mat = ctx.materializer + implicit val ec = ctx.executionContext + + fileUpload("csv") { + case (metadata, byteSource) => + + val sumF: Future[Int] = + // sum the numbers as they arrive so that we can + // accept any size of file + byteSource.via(Framing.delimiter(ByteString("\n"), 1024)) + .mapConcat(_.utf8String.split(",").toVector) + .map(_.toInt) + .runFold(0) { (acc, n) => acc + n } + + onSuccess(sumF) { sum => complete(s"Sum: $sum") } + } + } + + // tests: + val multipartForm = + Multipart.FormData(Multipart.FormData.BodyPart.Strict( + "csv", + HttpEntity(MediaTypes.`text/plain`, "2,3,5\n7,11,13,17,23\n29,31,37\n"), + Map("filename" -> "primes.csv"))) + + Post("/", multipartForm) ~> route ~> check { + status shouldEqual StatusCodes.OK + responseAs[String] shouldEqual "Sum: 178" + } + + } + +} diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/alphabetically.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/alphabetically.rst index dbedb4cfc7..276f7dc681 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/directives/alphabetically.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/alphabetically.rst @@ -62,6 +62,7 @@ Directive Description :ref:`-extractUri-` Extracts the complete request URI :ref:`-failWith-` Bubbles the given error up the response chain where it is dealt with by the closest :ref:`-handleExceptions-` directive and its ``ExceptionHandler`` +:ref:`-fileUpload-` Provides a stream of an uploaded file from a multipart request :ref:`-formField-` Extracts an HTTP form field from the request :ref:`-formFields-` Extracts a number of HTTP form field from the request :ref:`-get-` Rejects all non-GET requests @@ -199,6 +200,7 @@ Directive Description :ref:`-setCookie-` Adds a ``Set-Cookie`` response header with the given cookies :ref:`-textract-` Extracts a number of values using a ``RequestContext ⇒ Tuple`` function :ref:`-tprovide-` Injects a given tuple of values into a directive +:ref:`-uploadedFile-` Streams one uploaded file from a multipart request to a file on disk :ref:`-validate-` Checks a given condition before running its inner route :ref:`-withExecutionContext-` Runs its inner route with the given alternative ``ExecutionContext`` :ref:`-withMaterializer-` Runs its inner route with the given alternative ``Materializer`` diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/by-trait.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/by-trait.rst index 6d2aff8a15..46552c0f73 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/directives/by-trait.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/by-trait.rst @@ -41,6 +41,9 @@ Directives filtering or extracting from the request :ref:`BasicDirectives` and :ref:`MiscDirectives` Directives handling request properties. +:ref:`FileUploadDirectives` + Handle file uploads. + .. _Response Directives: @@ -85,6 +88,7 @@ List of predefined directives by trait debugging-directives/index execution-directives/index file-and-resource-directives/index + file-upload-directives/index form-field-directives/index future-directives/index header-directives/index diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/fileUpload.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/fileUpload.rst new file mode 100644 index 0000000000..d0256781dd --- /dev/null +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/fileUpload.rst @@ -0,0 +1,25 @@ +.. _-fileUpload-: + +fileUpload +========== + +Signature +--------- + +.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala + :snippet: fileUpload + +Description +----------- +Simple access to the stream of bytes for a file uploaded as a multipart form together with metadata +about the upload as extracted value. + +If there is no field with the given name the request will be rejected, if there are multiple file parts +with the same name, the first one will be used and the subsequent ones ignored. + + +Example +------- + +.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala + :snippet: fileUpload diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/index.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/index.rst new file mode 100644 index 0000000000..1af52aaeab --- /dev/null +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/index.rst @@ -0,0 +1,10 @@ +.. _FileUploadDirectives: + +FileUploadDirectives +==================== + +.. toctree:: + :maxdepth: 1 + + uploadedFile + fileUpload diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/uploadedFile.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/uploadedFile.rst new file mode 100644 index 0000000000..99216da5ea --- /dev/null +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-upload-directives/uploadedFile.rst @@ -0,0 +1,31 @@ +.. _-uploadedFile-: + +uploadedFile +============ + +Signature +--------- + +.. includecode2:: /../../akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala + :snippet: uploadedFile + +Description +----------- +Streams the contents of a file uploaded as a multipart form into a temporary file on disk and provides the file and +metadata about the upload as extracted value. + +If there is an error writing to disk the request will be failed with the thrown exception, if there is no field +with the given name the request will be rejected, if there are multiple file parts with the same name, the first +one will be used and the subsequent ones ignored. + +.. note:: + This directive will stream contents of the request into a file, however one can not start processing these + until the file has been written completely. For streaming APIs it is preferred to use the :ref:`-fileUpload-` + directive, as it allows for streaming handling of the incoming data bytes. + + +Example +------- + +.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/FileUploadDirectivesExamplesSpec.scala + :snippet: uploadedFile diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/index.rst b/akka-docs-dev/rst/scala/http/routing-dsl/index.rst index d962292cee..f1e1ca5dc2 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/index.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/index.rst @@ -77,6 +77,7 @@ in the :ref:`exception-handling-scala` section of the documtnation. You can use File uploads ^^^^^^^^^^^^ +For high level directives to handle uploads see the :ref:`FileUploadDirectives`. Handling a simple file upload from for example a browser form with a `file` input can be done by accepting a `Multipart.FormData` entity, note that the body parts are `Source` rather than diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FileUploadDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FileUploadDirectivesSpec.scala new file mode 100644 index 0000000000..23f6f5d290 --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/FileUploadDirectivesSpec.scala @@ -0,0 +1,155 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.scaladsl.server.directives + +import java.io.{ FileInputStream, File } +import java.nio.charset.StandardCharsets + +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.{ MissingFormFieldRejection, RoutingSpec } +import akka.util.ByteString + +class FileUploadDirectivesSpec extends RoutingSpec { + + "the uploadedFile directive" should { + + "write a posted file to a temporary file on disk" in { + + val xml = "42" + + val simpleMultipartUpload = + Multipart.FormData(Multipart.FormData.BodyPart.Strict( + "fieldName", + HttpEntity(MediaTypes.`text/xml`, xml), + Map("filename" -> "age.xml"))) + + @volatile var file: Option[File] = None + + try { + Post("/", simpleMultipartUpload) ~> { + uploadedFile("fieldName") { + case (info, tmpFile) ⇒ + file = Some(tmpFile) + complete(info.toString) + } + } ~> check { + file.isDefined === true + responseAs[String] === FileInfo("fieldName", "age.xml", ContentTypes.`text/xml`).toString + read(file.get) === xml + } + } finally { + file.foreach(_.delete()) + } + } + } + + "the fileUpload directive" should { + + def echoAsAService = + extractRequestContext { ctx ⇒ + implicit val mat = ctx.materializer + + fileUpload("field1") { + case (info, bytes) ⇒ + // stream the bytes somewhere + val allBytesF = bytes.runFold(ByteString()) { (all, bytes) ⇒ all ++ bytes } + + // sum all individual file sizes + onSuccess(allBytesF) { allBytes ⇒ + complete(allBytes) + } + } + } + + "stream the file upload" in { + // byte count as a service ;) + val route = echoAsAService + + // tests: + val str1 = "some data" + val multipartForm = + Multipart.FormData(Multipart.FormData.BodyPart.Strict( + "field1", + HttpEntity(MediaTypes.`text/plain`, str1), + Map("filename" -> "data1.txt"))) + + Post("/", multipartForm) ~> route ~> check { + status shouldEqual StatusCodes.OK + responseAs[String] shouldEqual str1 + } + + } + + "stream the first file upload if multiple with the same name are posted" in { + // byte count as a service ;) + val route = echoAsAService + + // tests: + val str1 = "some data" + val str2 = "other data" + val multipartForm = + Multipart.FormData( + Multipart.FormData.BodyPart.Strict( + "field1", + HttpEntity(MediaTypes.`text/plain`, str1), + Map("filename" -> "data1.txt")), + Multipart.FormData.BodyPart.Strict( + "field1", + HttpEntity(MediaTypes.`text/plain`, str2), + Map("filename" -> "data2.txt"))) + + Post("/", multipartForm) ~> route ~> check { + status shouldEqual StatusCodes.OK + responseAs[String] shouldEqual str1 + } + + } + + "reject the file upload if the field name is missing" in { + // byte count as a service ;) + val route = + extractRequestContext { ctx ⇒ + implicit val mat = ctx.materializer + + fileUpload("missing") { + case (info, bytes) ⇒ + // stream the bytes somewhere + val allBytesF = bytes.runFold(ByteString()) { (all, bytes) ⇒ all ++ bytes } + + // sum all individual file sizes + onSuccess(allBytesF) { allBytes ⇒ + complete(allBytes) + } + } + } + + // tests: + val str1 = "some data" + val multipartForm = + Multipart.FormData(Multipart.FormData.BodyPart.Strict( + "field1", + HttpEntity(MediaTypes.`text/plain`, str1), + Map("filename" -> "data1.txt"))) + + Post("/", multipartForm) ~> route ~> check { + rejection === MissingFormFieldRejection("missing") + } + + } + + } + + private def read(file: File): String = { + val in = new FileInputStream(file) + try { + val buffer = new Array[Byte](1024) + in.read(buffer) + new String(buffer, StandardCharsets.UTF_8) + } finally { + in.close() + } + } + +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala index ece3426bbc..30a0c9affe 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala @@ -14,6 +14,7 @@ trait Directives extends RouteConcatenation with CodingDirectives with ExecutionDirectives with FileAndResourceDirectives + with FileUploadDirectives with FormFieldDirectives with FutureDirectives with HeaderDirectives diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala new file mode 100644 index 0000000000..8db4667f76 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.http.scaladsl.server.directives + +import java.io.File + +import akka.http.scaladsl.server.{ MissingFormFieldRejection, Directive1 } +import akka.http.scaladsl.model.{ ContentType, Multipart } +import akka.util.ByteString +import scala.concurrent.Future +import scala.util.{ Failure, Success, Try } +import akka.stream.scaladsl._ + +trait FileUploadDirectives { + + import BasicDirectives._ + import RouteDirectives._ + import FutureDirectives._ + import MarshallingDirectives._ + + /** + * Streams the bytes of the file submitted using multipart with the given file name into a temporary file on disk. + * If there is an error writing to disk the request will be failed with the thrown exception, if there is no such + * field the request will be rejected, if there are multiple file parts with the same name, the first one will be + * used and the subsequent ones ignored. + */ + def uploadedFile(fieldName: String): Directive1[(FileInfo, File)] = + extractRequestContext.flatMap { ctx ⇒ + import ctx.executionContext + import ctx.materializer + + fileUpload(fieldName).flatMap { + case (fileInfo, bytes) ⇒ + + val destination = File.createTempFile("akka-http-upload", ".tmp") + val uploadedF: Future[(FileInfo, File)] = bytes.runWith(Sink.file(destination)) + .map(_ ⇒ (fileInfo, destination)) + + onComplete[(FileInfo, File)](uploadedF).flatMap { + + case Success(uploaded) ⇒ + provide(uploaded) + + case Failure(ex) ⇒ + destination.delete() + failWith(ex) + + } + } + } + + /** + * Collects each body part that is a multipart file as a tuple containing metadata and a `Source` + * for streaming the file contents somewhere. If there is no such field the request will be rejected, + * if there are multiple file parts with the same name, the first one will be used and the subsequent + * ones ignored. + */ + def fileUpload(fieldName: String): Directive1[(FileInfo, Source[ByteString, Any])] = + entity(as[Multipart.FormData]).flatMap { formData ⇒ + extractRequestContext.flatMap { ctx ⇒ + implicit val mat = ctx.materializer + implicit val ec = ctx.executionContext + + val onePartSource: Source[(FileInfo, Source[ByteString, Any]), Any] = formData.parts + .filter(part ⇒ part.filename.isDefined && part.name == fieldName) + .map(part ⇒ (FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes)) + .take(1) + + val onePartF = onePartSource.runWith(Sink.headOption[(FileInfo, Source[ByteString, Any])]) + + onSuccess(onePartF) + } + + }.flatMap { + case Some(tuple) ⇒ provide(tuple) + case None ⇒ reject(MissingFormFieldRejection(fieldName)) + } + +} + +object FileUploadDirectives extends FileUploadDirectives + +/** + * Additional metadata about the file being uploaded/that was uploaded using the [[FileUploadDirectives]] + * + * @param fieldName Name of the form field the file was uploaded in + * @param fileName User specified name of the uploaded file + * @param contentType Content type of the file + */ +final case class FileInfo(fieldName: String, fileName: String, contentType: ContentType)