diff --git a/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala index 3ab41910a6..dc7ddff9dc 100644 --- a/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala @@ -18,15 +18,15 @@ sealed abstract class ModeledCompanion extends Renderable { val name = getClass.getSimpleName.replace("$minus", "-").dropRight(1) // trailing $ val lowercaseName = name.toRootLowerCase private[this] val nameBytes = name.asciiBytes - def render[R <: Rendering](r: R): r.type = r ~~ nameBytes ~~ ':' ~~ ' ' + final def render[R <: Rendering](r: R): r.type = r ~~ nameBytes ~~ ':' ~~ ' ' } sealed trait ModeledHeader extends HttpHeader with Serializable { def name: String = companion.name def value: String = renderValue(new StringRendering).get def lowercaseName: String = companion.lowercaseName - def render[R <: Rendering](r: R): r.type = renderValue(r ~~ companion) - def renderValue[R <: Rendering](r: R): r.type + final def render[R <: Rendering](r: R): r.type = renderValue(r ~~ companion) + protected[http] def renderValue[R <: Rendering](r: R): r.type protected def companion: ModeledCompanion } @@ -38,7 +38,7 @@ abstract class CustomHeader extends japi.headers.CustomHeader { def suppressRendering: Boolean = false def lowercaseName: String = name.toRootLowerCase - def render[R <: Rendering](r: R): r.type = r ~~ name ~~ ':' ~~ ' ' ~~ value + final def render[R <: Rendering](r: R): r.type = r ~~ name ~~ ':' ~~ ' ' ~~ value } import japi.JavaMapping.Implicits._ @@ -81,7 +81,7 @@ object Expect extends ModeledCompanion { val `100-continue` = new Expect() {} } sealed abstract case class Expect private () extends ModeledHeader { - def renderValue[R <: Rendering](r: R): r.type = r ~~ "100-continue" + final def renderValue[R <: Rendering](r: R): r.type = r ~~ "100-continue" protected def companion = Expect } diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 432d13740b..646b7f2ab4 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -4,17 +4,21 @@ package akka.http.util -import akka.http.model.RequestEntity -import akka.stream.impl.ErrorPublisher -import akka.stream.Transformer -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Source -import akka.util.ByteString -import org.reactivestreams.Publisher +import java.util.concurrent.atomic.AtomicBoolean +import java.io.InputStream + +import org.reactivestreams.{ Subscriber, Publisher } import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } -import scala.util.control.NonFatal + +import akka.actor.Props +import akka.util.ByteString + +import akka.stream.{ impl, Transformer, FlowMaterializer } +import akka.stream.scaladsl._ + +import akka.http.model.RequestEntity /** * INTERNAL API @@ -50,7 +54,7 @@ private[http] object StreamUtils { } def failedPublisher[T](ex: Throwable): Publisher[T] = - ErrorPublisher(ex).asInstanceOf[Publisher[T]] + impl.ErrorPublisher(ex).asInstanceOf[Publisher[T]] def mapErrorTransformer[T](f: Throwable ⇒ Throwable): Transformer[T, T] = new Transformer[T, T] { @@ -98,8 +102,90 @@ private[http] object StreamUtils { override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = currentState.onTermination(e) } + /** + * Applies a sequence of transformers on one source and returns a sequence of sources with the result. The input source + * will only be traversed once. + */ + def transformMultiple[T, U](input: Source[T], transformers: immutable.Seq[() ⇒ Transformer[T, U]])(implicit materializer: FlowMaterializer): immutable.Seq[Source[U]] = + transformers match { + case Nil ⇒ Nil + case Seq(one) ⇒ Vector(input.transform("transformMultipleElement", one)) + case multiple ⇒ + val results = Vector.fill(multiple.size)(Sink.publisher[U]) + val mat = + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + + val broadcast = Broadcast[T]("transformMultipleInputBroadcast") + input ~> broadcast + (multiple, results).zipped.foreach { (trans, sink) ⇒ + broadcast ~> Flow[T].transform("transformMultipleElement", trans) ~> sink + } + }.run() + results.map(s ⇒ Source(mat.get(s))) + } + def mapEntityError(f: Throwable ⇒ Throwable): RequestEntity ⇒ RequestEntity = _.transformDataBytes(() ⇒ mapErrorTransformer(f)) + + /** + * Simple blocking Source backed by an InputStream. + * + * FIXME: should be provided by akka-stream, see #15588 + */ + def fromInputStreamSource(inputStream: InputStream, defaultChunkSize: Int = 65536): Source[ByteString] = { + import akka.stream.impl._ + + def props(materializer: ActorBasedFlowMaterializer): Props = { + val iterator = new Iterator[ByteString] { + var finished = false + def hasNext: Boolean = !finished + def next(): ByteString = + if (!finished) { + val buffer = new Array[Byte](defaultChunkSize) + val read = inputStream.read(buffer) + if (read < 0) { + finished = true + inputStream.close() + ByteString.empty + } else ByteString.fromArray(buffer, 0, read) + } else ByteString.empty + } + + Props(new IteratorPublisherImpl(iterator, materializer.settings)).withDispatcher(materializer.settings.fileIODispatcher) + } + + new AtomicBoolean(false) with SimpleActorFlowSource[ByteString] { + override def attach(flowSubscriber: Subscriber[ByteString], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName)._1.subscribe(flowSubscriber) + + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[ByteString], Unit) = + if (!getAndSet(true)) { + val ref = materializer.actorOf(props(materializer), name = s"$flowName-0-InputStream-source") + val publisher = ActorPublisher[ByteString](ref) + ref ! ExposedPublisher(publisher.asInstanceOf[impl.ActorPublisher[Any]]) + + (publisher, ()) + } else (ErrorPublisher(new IllegalStateException("One time source can only be instantiated once")).asInstanceOf[Publisher[ByteString]], ()) + } + } + + /** + * Returns a source that can only be used once for testing purposes. + */ + def oneTimeSource[T](other: Source[T]): Source[T] = { + import akka.stream.impl._ + val original = other.asInstanceOf[ActorFlowSource[T]] + new AtomicBoolean(false) with SimpleActorFlowSource[T] { + override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName)._1.subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[T], Unit) = + if (!getAndSet(true)) (original.create(materializer, flowName)._1, ()) + else (ErrorPublisher(new IllegalStateException("One time source can only be instantiated once")).asInstanceOf[Publisher[T]], ()) + } + } } /** diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index 9e427bbcf1..7fb3787aa2 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -10,7 +10,9 @@ import java.nio.charset.Charset import com.typesafe.config.Config import akka.stream.{ FlowMaterializer, FlattenStrategy, Transformer } import akka.stream.scaladsl.{ Flow, Source } -import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.{ Await, Future } +import scala.util.{ Failure, Success } import scala.util.matching.Regex import akka.event.LoggingAdapter import akka.util.ByteString @@ -77,6 +79,17 @@ package object util { underlying.fold(Vector.empty[T])(_ :+ _) } + private[http] implicit class AddFutureAwaitResult[T](future: Future[T]) { + /** "Safe" Await.result that doesn't throw away half of the stacktrace */ + def awaitResult(atMost: Duration): T = { + Await.ready(future, atMost) + future.value.get match { + case Success(t) ⇒ t + case Failure(ex) ⇒ throw new RuntimeException("Trying to await result of failed Future, see the cause for the original problem.", ex) + } + } + } + private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = new Transformer[ByteString, ByteString] { def onNext(element: ByteString) = element :: Nil @@ -86,5 +99,14 @@ package object util { private[this] val _identityFunc: Any ⇒ Any = x ⇒ x /** Returns a constant identity function to avoid allocating the closure */ def identityFunc[T]: T ⇒ T = _identityFunc.asInstanceOf[T ⇒ T] + + def humanReadableByteCount(bytes: Long, si: Boolean): String = { + val unit = if (si) 1000 else 1024 + if (bytes >= unit) { + val exp = (math.log(bytes) / math.log(unit)).toInt + val pre = if (si) "kMGTPE".charAt(exp - 1).toString else "KMGTPE".charAt(exp - 1).toString + 'i' + "%.1f %sB" format (bytes / math.pow(unit, exp), pre) + } else bytes.toString + " B" + } } diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index 26a0d520de..373c864821 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -7,7 +7,8 @@ package akka.http.testkit import java.util.concurrent.CountDownLatch import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.{ Await, ExecutionContext } +import scala.concurrent.ExecutionContext +import akka.http.util._ import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.model.HttpEntity.ChunkStreamPart @@ -95,6 +96,6 @@ trait RouteTestResultComponent { failTest("Request was neither completed nor rejected within " + timeout) private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] = - Await.result(data.grouped(Int.MaxValue).runWith(Sink.head), timeout) + data.collectAll.awaitResult(timeout) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/resources/sample.html b/akka-http-tests/src/test/resources/sample.html new file mode 100644 index 0000000000..10dbdec8c5 --- /dev/null +++ b/akka-http-tests/src/test/resources/sample.html @@ -0,0 +1 @@ +

Lorem ipsum!

\ No newline at end of file diff --git a/akka-http-tests/src/test/resources/sample.xyz b/akka-http-tests/src/test/resources/sample.xyz new file mode 100644 index 0000000000..ce42064770 --- /dev/null +++ b/akka-http-tests/src/test/resources/sample.xyz @@ -0,0 +1 @@ +XyZ \ No newline at end of file diff --git a/akka-http-tests/src/test/resources/someDir/fileA.txt b/akka-http-tests/src/test/resources/someDir/fileA.txt new file mode 100644 index 0000000000..d800886d9c --- /dev/null +++ b/akka-http-tests/src/test/resources/someDir/fileA.txt @@ -0,0 +1 @@ +123 \ No newline at end of file diff --git a/akka-http-tests/src/test/resources/someDir/fileB.xml b/akka-http-tests/src/test/resources/someDir/fileB.xml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-http-tests/src/test/resources/someDir/sub/file.html b/akka-http-tests/src/test/resources/someDir/sub/file.html new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-http-tests/src/test/resources/subDirectory/empty.pdf b/akka-http-tests/src/test/resources/subDirectory/empty.pdf new file mode 100644 index 0000000000..d800886d9c --- /dev/null +++ b/akka-http-tests/src/test/resources/subDirectory/empty.pdf @@ -0,0 +1 @@ +123 \ No newline at end of file diff --git a/akka-http-tests/src/test/resources/subDirectory/fileA.txt b/akka-http-tests/src/test/resources/subDirectory/fileA.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/FileAndResourceDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/FileAndResourceDirectivesSpec.scala new file mode 100644 index 0000000000..0a96fc4de6 --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/FileAndResourceDirectivesSpec.scala @@ -0,0 +1,327 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.server +package directives + +import java.io.{ File, FileOutputStream } + +import akka.http.model.MediaTypes._ +import akka.http.model._ +import akka.http.model.headers._ +import akka.http.util._ +import org.scalatest.matchers.Matcher +import org.scalatest.{ Inside, Inspectors } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext, Future } +import scala.util.Properties + +class FileAndResourceDirectivesSpec extends RoutingSpec with Inspectors with Inside { + + override def testConfigSource = + """akka.http.routing { + | file-chunking-threshold-size = 16 + | file-chunking-chunk-size = 8 + | range-coalescing-threshold = 1 + |}""".stripMargin + + "getFromFile" should { + "reject non-GET requests" in { + Put() ~> getFromFile("some") ~> check { handled shouldEqual (false) } + } + "reject requests to non-existing files" in { + Get() ~> getFromFile("nonExistentFile") ~> check { handled shouldEqual (false) } + } + "reject requests to directories" in { + Get() ~> getFromFile(Properties.javaHome) ~> check { handled shouldEqual (false) } + } + "return the file content with the MediaType matching the file extension" in { + val file = File.createTempFile("akkaHttpTest", ".PDF") + try { + writeAllText("This is PDF", file) + Get() ~> getFromFile(file.getPath) ~> check { + mediaType shouldEqual `application/pdf` + definedCharset shouldEqual None + responseAs[String] shouldEqual "This is PDF" + headers should contain(`Last-Modified`(DateTime(file.lastModified))) + } + } finally file.delete + } + "return the file content with MediaType 'application/octet-stream' on unknown file extensions" in { + val file = File.createTempFile("akkaHttpTest", null) + try { + writeAllText("Some content", file) + Get() ~> getFromFile(file) ~> check { + mediaType shouldEqual `application/octet-stream` + responseAs[String] shouldEqual "Some content" + } + } finally file.delete + } + + "return a single range from a file" in { + val file = File.createTempFile("partialTest", null) + try { + writeAllText("ABCDEFGHIJKLMNOPQRSTUVWXYZ", file) + Get() ~> addHeader(Range(ByteRange(0, 10))) ~> getFromFile(file) ~> check { + status shouldEqual StatusCodes.PartialContent + headers should contain(`Content-Range`(ContentRange(0, 10, 26))) + responseAs[String] shouldEqual "ABCDEFGHIJK" + } + } finally file.delete + } + + "return multiple ranges from a file at once" in { + val file = File.createTempFile("partialTest", null) + try { + writeAllText("ABCDEFGHIJKLMNOPQRSTUVWXYZ", file) + val rangeHeader = Range(ByteRange(1, 10), ByteRange.suffix(10)) + Get() ~> addHeader(rangeHeader) ~> getFromFile(file, ContentTypes.`text/plain`) ~> check { + status shouldEqual StatusCodes.PartialContent + header[`Content-Range`] shouldEqual None + mediaType.withParams(Map.empty) shouldEqual `multipart/byteranges` + + val parts = responseAs[Multipart.ByteRanges].toStrict(100.millis).awaitResult(100.millis).strictParts + parts.size shouldEqual 2 + parts(0).entity.data.utf8String shouldEqual "BCDEFGHIJK" + parts(1).entity.data.utf8String shouldEqual "QRSTUVWXYZ" + } + } finally file.delete + } + } + + "getFromResource" should { + "reject non-GET requests" in { + Put() ~> getFromResource("some") ~> check { handled shouldEqual (false) } + } + "reject requests to non-existing resources" in { + Get() ~> getFromResource("nonExistingResource") ~> check { handled shouldEqual (false) } + } + "return the resource content with the MediaType matching the file extension" in { + val route = getFromResource("sample.html") + + def runCheck() = + Get() ~> route ~> check { + mediaType shouldEqual `text/html` + forAtLeast(1, headers) { h ⇒ + inside(h) { + case `Last-Modified`(dt) ⇒ + DateTime(2011, 7, 1) should be < dt + dt.clicks should be < System.currentTimeMillis() + } + } + responseAs[String] shouldEqual "

Lorem ipsum!

" + } + + runCheck() + runCheck() // additional test to check that no internal state is kept + } + "return the file content with MediaType 'application/octet-stream' on unknown file extensions" in { + Get() ~> getFromResource("sample.xyz") ~> check { + mediaType shouldEqual `application/octet-stream` + responseAs[String] shouldEqual "XyZ" + } + } + } + + "getFromResourceDirectory" should { + "reject requests to non-existing resources" in { + Get("not/found") ~> getFromResourceDirectory("subDirectory") ~> check { handled shouldEqual (false) } + } + val verify = check { + mediaType shouldEqual `application/pdf` + responseAs[String] shouldEqual "123" + } + "return the resource content with the MediaType matching the file extension - example 1" in { Get("empty.pdf") ~> getFromResourceDirectory("subDirectory") ~> verify } + "return the resource content with the MediaType matching the file extension - example 2" in { Get("empty.pdf") ~> getFromResourceDirectory("subDirectory/") ~> verify } + "return the resource content with the MediaType matching the file extension - example 3" in { Get("subDirectory/empty.pdf") ~> getFromResourceDirectory("") ~> verify } + "reject requests to directory resources" in { + Get() ~> getFromResourceDirectory("subDirectory") ~> check { handled shouldEqual (false) } + } + } + + "listDirectoryContents" should { + val base = new File(getClass.getClassLoader.getResource("").toURI).getPath + new File(base, "subDirectory/emptySub").mkdir() + def eraseDateTime(s: String) = s.replaceAll("""\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d""", "xxxx-xx-xx xx:xx:xx") + implicit val settings = RoutingSettings.default.copy(renderVanityFooter = false) + + "properly render a simple directory" in { + Get() ~> listDirectoryContents(base + "/someDir") ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of / + | + |

Index of /

+ |
+ |
+            |sub/             xxxx-xx-xx xx:xx:xx
+            |fileA.txt        xxxx-xx-xx xx:xx:xx            3  B
+            |fileB.xml        xxxx-xx-xx xx:xx:xx            0  B
+            |
+ |
+ | + | + |""" + } + } + } + "properly render a sub directory" in { + Get("/sub/") ~> listDirectoryContents(base + "/someDir") ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of /sub/ + | + |

Index of /sub/

+ |
+ |
+            |../
+            |file.html        xxxx-xx-xx xx:xx:xx            0  B
+            |
+ |
+ | + | + |""" + } + } + } + "properly render the union of several directories" in { + Get() ~> listDirectoryContents(base + "/someDir", base + "/subDirectory") ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of / + | + |

Index of /

+ |
+ |
+            |emptySub/        xxxx-xx-xx xx:xx:xx
+            |sub/             xxxx-xx-xx xx:xx:xx
+            |empty.pdf        xxxx-xx-xx xx:xx:xx            3  B
+            |fileA.txt        xxxx-xx-xx xx:xx:xx            3  B
+            |fileB.xml        xxxx-xx-xx xx:xx:xx            0  B
+            |
+ |
+ | + | + |""" + } + } + } + "properly render an empty sub directory with vanity footer" in { + val settings = 0 // shadow implicit + Get("/emptySub/") ~> listDirectoryContents(base + "/subDirectory") ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of /emptySub/ + | + |

Index of /emptySub/

+ |
+ |
+            |../
+            |
+ |
+ |
+ |rendered by Akka Http on xxxx-xx-xx xx:xx:xx + |
+ | + | + |""" + } + } + } + "properly render an empty top-level directory" in { + Get() ~> listDirectoryContents(base + "/subDirectory/emptySub") ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of / + | + |

Index of /

+ |
+ |
+            |(no files)
+            |
+ |
+ | + | + |""" + } + } + } + "properly render a simple directory with a path prefix" in { + Get("/files/") ~> pathPrefix("files")(listDirectoryContents(base + "/someDir")) ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of /files/ + | + |

Index of /files/

+ |
+ |
+            |sub/             xxxx-xx-xx xx:xx:xx
+            |fileA.txt        xxxx-xx-xx xx:xx:xx            3  B
+            |fileB.xml        xxxx-xx-xx xx:xx:xx            0  B
+            |
+ |
+ | + | + |""" + } + } + } + "properly render a sub directory with a path prefix" in { + Get("/files/sub/") ~> pathPrefix("files")(listDirectoryContents(base + "/someDir")) ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of /files/sub/ + | + |

Index of /files/sub/

+ |
+ |
+            |../
+            |file.html        xxxx-xx-xx xx:xx:xx            0  B
+            |
+ |
+ | + | + |""" + } + } + } + "properly render an empty top-level directory with a path prefix" in { + Get("/files/") ~> pathPrefix("files")(listDirectoryContents(base + "/subDirectory/emptySub")) ~> check { + eraseDateTime(responseAs[String]) shouldEqual prep { + """ + |Index of /files/ + | + |

Index of /files/

+ |
+ |
+            |(no files)
+            |
+ |
+ | + | + |""" + } + } + } + "reject requests to file resources" in { + Get() ~> listDirectoryContents(base + "subDirectory/empty.pdf") ~> check { handled shouldEqual (false) } + } + } + + def prep(s: String) = s.stripMarginWithNewline("\n") + + def writeAllText(text: String, file: File): Unit = { + val fos = new FileOutputStream(file) + try { + fos.write(text.getBytes("UTF-8")) + } finally fos.close() + } + + def evaluateTo[T](t: T, atMost: Duration = 100.millis)(implicit ec: ExecutionContext): Matcher[Future[T]] = + be(t).compose[Future[T]] { fut ⇒ + import scala.concurrent.Await + fut.awaitResult(atMost) + } +} diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala index 809133ddf9..643fb23fd0 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala @@ -9,6 +9,8 @@ import akka.http.model.StatusCodes._ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ +import akka.stream.scaladsl.Source +import akka.util.ByteString import org.scalatest.{ Inside, Inspectors } import scala.concurrent.Await @@ -93,7 +95,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { } } - "return a 'multipart/byteranges' for a ranged request with multiple coalesced ranges with preserved order" in { + "return a 'multipart/byteranges' for a ranged request with multiple coalesced ranges and expect ranges in ascending order" in { Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> { wrs { complete("Some random and not super short entity.") } } ~> check { @@ -101,17 +103,30 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) parts.size shouldEqual 2 inside(parts(0)) { - case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ - range shouldEqual ContentRange.Default(5, 10, Some(39)) - unit shouldEqual RangeUnits.Bytes - Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "random" - } - inside(parts(1)) { case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ range shouldEqual ContentRange.Default(0, 2, Some(39)) unit shouldEqual RangeUnits.Bytes Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "Som" } + inside(parts(1)) { + case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ + range shouldEqual ContentRange.Default(5, 10, Some(39)) + unit shouldEqual RangeUnits.Bytes + Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "random" + } + } + } + + "return a 'multipart/byteranges' for a ranged request with multiple ranges if entity data source isn't reusable" in { + val content = "Some random and not super short entity." + def entityData() = StreamUtils.oneTimeSource(Source.singleton(ByteString(content))) + + Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> { + wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) } + } ~> check { + header[`Content-Range`] should be(None) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) + parts.size shouldEqual 2 } } diff --git a/akka-http/src/main/resources/reference.conf b/akka-http/src/main/resources/reference.conf index 927aa7a5c9..7c8a4d5f36 100644 --- a/akka-http/src/main/resources/reference.conf +++ b/akka-http/src/main/resources/reference.conf @@ -13,6 +13,12 @@ akka.http.routing { # (Note that akka-http will always produce log messages containing the full error details) verbose-error-messages = off + # Enables/disables ETag and `If-Modified-Since` support for FileAndResourceDirectives + file-get-conditional = on + + # Enables/disables the rendering of the "rendered by" footer in directory listings + render-vanity-footer = yes + # The maximum size between two requested ranges. Ranges with less space in between will be coalesced. # # When multiple ranges are requested, a server may coalesce any of the ranges that overlap or that are separated diff --git a/akka-http/src/main/scala/akka/http/server/Directives.scala b/akka-http/src/main/scala/akka/http/server/Directives.scala index a025af73ab..9306135e1d 100644 --- a/akka-http/src/main/scala/akka/http/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/server/Directives.scala @@ -17,7 +17,7 @@ trait Directives extends RouteConcatenation with DebuggingDirectives with CodingDirectives with ExecutionDirectives - //with FileAndResourceDirectives + with FileAndResourceDirectives //with FormFieldDirectives with FutureDirectives with HeaderDirectives diff --git a/akka-http/src/main/scala/akka/http/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/server/RequestContext.scala index e650e77ab8..6973e5b656 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContext.scala @@ -4,6 +4,8 @@ package akka.http.server +import akka.stream.FlowMaterializer + import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter import akka.http.marshalling.ToResponseMarshallable @@ -26,6 +28,11 @@ trait RequestContext { */ implicit def executionContext: ExecutionContext + /** + * The default FlowMaterializer. + */ + implicit def flowMaterializer: FlowMaterializer + /** * The default LoggingAdapter to be used for logging messages related to this request. */ @@ -41,6 +48,7 @@ trait RequestContext { */ def reconfigure( executionContext: ExecutionContext = executionContext, + flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, settings: RoutingSettings = settings): RequestContext @@ -71,6 +79,11 @@ trait RequestContext { */ def withExecutionContext(ec: ExecutionContext): RequestContext + /** + * Returns a copy of this context with the new HttpRequest. + */ + def withFlowMaterializer(materializer: FlowMaterializer): RequestContext + /** * Returns a copy of this context with the new LoggingAdapter. */ diff --git a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala index ba4660dcc3..7d90e7de33 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala @@ -4,6 +4,8 @@ package akka.http.server +import akka.stream.FlowMaterializer + import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter import akka.http.marshalling.ToResponseMarshallable @@ -18,14 +20,15 @@ private[http] class RequestContextImpl( val request: HttpRequest, val unmatchedPath: Uri.Path, val executionContext: ExecutionContext, + val flowMaterializer: FlowMaterializer, val log: LoggingAdapter, val settings: RoutingSettings) extends RequestContext { - def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext) = - this(request, request.uri.path, ec, log, settings) + def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: FlowMaterializer) = + this(request, request.uri.path, ec, materializer, log, settings) - def reconfigure(executionContext: ExecutionContext, log: LoggingAdapter, settings: RoutingSettings): RequestContext = - copy(executionContext = executionContext, log = log, settings = settings) + def reconfigure(executionContext: ExecutionContext, flowMaterializer: FlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext = + copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log, settings = settings) override def complete(trm: ToResponseMarshallable): Future[RouteResult] = trm(request)(executionContext) @@ -44,6 +47,9 @@ private[http] class RequestContextImpl( override def withExecutionContext(ec: ExecutionContext): RequestContext = copy(executionContext = ec) + override def withFlowMaterializer(materializer: FlowMaterializer): RequestContext = + copy(flowMaterializer = materializer) + override def withLog(log: LoggingAdapter): RequestContext = copy(log = log) @@ -65,7 +71,8 @@ private[http] class RequestContextImpl( private def copy(request: HttpRequest = request, unmatchedPath: Uri.Path = unmatchedPath, executionContext: ExecutionContext = executionContext, + flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, settings: RoutingSettings = settings) = - new RequestContextImpl(request, unmatchedPath, executionContext, log, settings) + new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, settings) } diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala b/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala index e65a2baea0..bf500898ff 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala @@ -10,12 +10,16 @@ import akka.http.util._ case class RoutingSettings( verboseErrorMessages: Boolean, + fileGetConditional: Boolean, + renderVanityFooter: Boolean, rangeCountLimit: Int, rangeCoalescingThreshold: Long) object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.routing") { def fromSubConfig(c: Config) = apply( c getBoolean "verbose-error-messages", + c getBoolean "file-get-conditional", + c getBoolean "render-vanity-footer", c getInt "range-count-limit", c getBytes "range-coalescing-threshold") diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala index 9708f9a398..3e61358f4e 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala @@ -4,6 +4,8 @@ package akka.http.server +import akka.stream.FlowMaterializer + import scala.concurrent.ExecutionContext import akka.actor.{ ActorSystem, ActorContext } import akka.event.LoggingAdapter @@ -33,10 +35,12 @@ class RoutingSetup( val exceptionHandler: ExceptionHandler, val rejectionHandler: RejectionHandler, val executionContext: ExecutionContext, + val flowMaterializer: FlowMaterializer, val routingLog: RoutingLog) { // enable `import setup._` to properly bring implicits in scope implicit def executor: ExecutionContext = executionContext + implicit def materializer: FlowMaterializer = flowMaterializer } object RoutingSetup { @@ -44,12 +48,14 @@ object RoutingSetup { exceptionHandler: ExceptionHandler = null, rejectionHandler: RejectionHandler = null, executionContext: ExecutionContext, + flowMaterializer: FlowMaterializer, routingLog: RoutingLog): RoutingSetup = new RoutingSetup( routingSettings, if (exceptionHandler ne null) exceptionHandler else ExceptionHandler.default(routingSettings), if (rejectionHandler ne null) rejectionHandler else RejectionHandler.default(executionContext), executionContext, + flowMaterializer, routingLog) } diff --git a/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala index 384d04ae74..cb072cee84 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala @@ -6,6 +6,7 @@ package akka.http.server package directives import akka.event.LoggingAdapter +import akka.stream.FlowMaterializer import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable @@ -140,6 +141,17 @@ trait BasicDirectives { */ def extractExecutionContext: Directive1[ExecutionContext] = BasicDirectives._extractExecutionContext + /** + * Runs its inner route with the given alternative [[FlowMaterializer]]. + */ + def withFlowMaterializer(materializer: FlowMaterializer): Directive0 = + mapRequestContext(_ withFlowMaterializer materializer) + + /** + * Extracts the [[ExecutionContext]] from the [[RequestContext]]. + */ + def extractFlowMaterializer: Directive1[FlowMaterializer] = BasicDirectives._extractFlowMaterializer + /** * Runs its inner route with the given alternative [[LoggingAdapter]]. */ @@ -181,6 +193,7 @@ object BasicDirectives extends BasicDirectives { private val _extractRequest: Directive1[HttpRequest] = extract(_.request) private val _extractUri: Directive1[Uri] = extract(_.request.uri) private val _extractExecutionContext: Directive1[ExecutionContext] = extract(_.executionContext) + private val _extractFlowMaterializer: Directive1[FlowMaterializer] = extract(_.flowMaterializer) private val _extractLog: Directive1[LoggingAdapter] = extract(_.log) private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings) private val _extractRequestContext: Directive1[RequestContext] = extract(akka.http.util.identityFunc) diff --git a/akka-http/src/main/scala/akka/http/server/directives/FileAndResourceDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/FileAndResourceDirectives.scala new file mode 100644 index 0000000000..b36d742949 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/server/directives/FileAndResourceDirectives.scala @@ -0,0 +1,311 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.server +package directives + +import java.io.{ File, FileInputStream } + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import akka.http.marshalling.{ Marshaller, ToEntityMarshaller } +import akka.http.model._ +import akka.http.model.headers._ +import akka.http.util._ + +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext + +trait FileAndResourceDirectives { + + import CacheConditionDirectives._ + import MethodDirectives._ + import FileAndResourceDirectives._ + import RouteDirectives._ + import BasicDirectives._ + import RouteConcatenation._ + import RangeDirectives._ + + /** + * Completes GET requests with the content of the given file. The actual I/O operation is + * running detached in a `Future`, so it doesn't block the current thread (but potentially + * some other thread !). If the file cannot be found or read the request is rejected. + */ + def getFromFile(fileName: String)(implicit resolver: ContentTypeResolver): Route = + getFromFile(new File(fileName)) + + /** + * Completes GET requests with the content of the given file. The actual I/O operation is + * running detached in a `Future`, so it doesn't block the current thread (but potentially + * some other thread !). If the file cannot be found or read the request is rejected. + */ + def getFromFile(file: File)(implicit resolver: ContentTypeResolver): Route = + getFromFile(file, resolver(file.getName)) + + /** + * Completes GET requests with the content of the given file. The actual I/O operation is + * running detached in a `Future`, so it doesn't block the current thread (but potentially + * some other thread !). If the file cannot be found or read the request is rejected. + */ + def getFromFile(file: File, contentType: ContentType): Route = + get { + if (file.isFile && file.canRead) + conditionalFor(file.length, file.lastModified).apply { + withRangeSupport { + extractExecutionContext { implicit ec ⇒ + complete(HttpEntity.Default(contentType, file.length, StreamUtils.fromInputStreamSource(new FileInputStream(file)))) + } + } + } + else reject + } + + private def conditionalFor(length: Long, lastModified: Long): Directive0 = + extractSettings.flatMap(settings ⇒ + if (settings.fileGetConditional) { + val tag = java.lang.Long.toHexString(lastModified ^ java.lang.Long.reverse(length)) + val lastModifiedDateTime = DateTime(math.min(lastModified, System.currentTimeMillis)) + conditional(EntityTag(tag), lastModifiedDateTime) + } else pass) + + /** + * Completes GET requests with the content of the given resource. The actual I/O operation is + * running detached in a `Future`, so it doesn't block the current thread (but potentially + * some other thread !). + * If the resource cannot be found or read the Route rejects the request. + */ + def getFromResource(resourceName: String)(implicit resolver: ContentTypeResolver): Route = + getFromResource(resourceName, resolver(resourceName)) + + /** + * Completes GET requests with the content of the given resource. The actual I/O operation is + * running detached in a `Future`, so it doesn't block the current thread (but potentially + * some other thread !). + * If the resource cannot be found or read the Route rejects the request. + */ + def getFromResource(resourceName: String, contentType: ContentType, theClassLoader: ClassLoader = classOf[ActorSystem].getClassLoader): Route = + if (!resourceName.endsWith("/")) + get { + theClassLoader.getResource(resourceName) match { + case null ⇒ reject + case url ⇒ + val (length, lastModified) = { + val conn = url.openConnection() + try { + conn.setUseCaches(false) // otherwise the JDK will keep the JAR file open when we close! + val len = conn.getContentLength + val lm = conn.getLastModified + len -> lm + } finally conn.getInputStream.close() + } + conditionalFor(length, lastModified).apply { + withRangeSupport { + extractExecutionContext { implicit ec ⇒ + complete { + HttpEntity.Default(contentType, length, StreamUtils.fromInputStreamSource(url.openStream())) + } + } + } + } + } + } + else reject // don't serve the content of resource "directories" + + /** + * Completes GET requests with the content of a file underneath the given directory. + * If the file cannot be read the Route rejects the request. + */ + def getFromDirectory(directoryName: String)(implicit resolver: ContentTypeResolver): Route = { + val base = withTrailingSlash(directoryName) + extractUnmatchedPath { path ⇒ + extractLog { log ⇒ + fileSystemPath(base, path, log) match { + case "" ⇒ reject + case fileName ⇒ getFromFile(fileName) + } + } + } + } + + /** + * Completes GET requests with a unified listing of the contents of all given directories. + * The actual rendering of the directory contents is performed by the in-scope `Marshaller[DirectoryListing]`. + */ + def listDirectoryContents(directories: String*)(implicit renderer: DirectoryRenderer): Route = + get { + extractRequestContext { ctx ⇒ + val path = ctx.unmatchedPath + val fullPath = ctx.request.uri.path.toString + val matchedLength = fullPath.lastIndexOf(path.toString) + require(matchedLength >= 0) + val pathPrefix = fullPath.substring(0, matchedLength) + val pathString = withTrailingSlash(fileSystemPath("/", path, ctx.log, '/')) + val dirs = directories flatMap { dir ⇒ + fileSystemPath(withTrailingSlash(dir), path, ctx.log) match { + case "" ⇒ None + case fileName ⇒ + val file = new File(fileName) + if (file.isDirectory && file.canRead) Some(file) else None + } + } + import ctx.executionContext + implicit val marshaller: ToEntityMarshaller[DirectoryListing] = renderer.marshaller(ctx.settings.renderVanityFooter) + + if (dirs.isEmpty) reject + else complete(DirectoryListing(pathPrefix + pathString, isRoot = pathString == "/", dirs.flatMap(_.listFiles))) + } + } + + /** + * Same as `getFromBrowseableDirectories` with only one directory. + */ + def getFromBrowseableDirectory(directory: String)(implicit renderer: DirectoryRenderer, resolver: ContentTypeResolver): Route = + getFromBrowseableDirectories(directory) + + /** + * Serves the content of the given directories as a file system browser, i.e. files are sent and directories + * served as browseable listings. + */ + def getFromBrowseableDirectories(directories: String*)(implicit renderer: DirectoryRenderer, resolver: ContentTypeResolver): Route = { + directories.map(getFromDirectory).reduceLeft(_ ~ _) ~ listDirectoryContents(directories: _*) + } + + /** + * Same as "getFromDirectory" except that the file is not fetched from the file system but rather from a + * "resource directory". + */ + def getFromResourceDirectory(directoryName: String)(implicit resolver: ContentTypeResolver): Route = { + val base = if (directoryName.isEmpty) "" else withTrailingSlash(directoryName) + + extractUnmatchedPath { path ⇒ + extractLog { log ⇒ + fileSystemPath(base, path, log, separator = '/') match { + case "" ⇒ reject + case resourceName ⇒ getFromResource(resourceName) + } + } + } + } +} + +object FileAndResourceDirectives extends FileAndResourceDirectives { + private def withTrailingSlash(path: String): String = if (path endsWith "/") path else path + '/' + private def fileSystemPath(base: String, path: Uri.Path, log: LoggingAdapter, separator: Char = File.separatorChar): String = { + import java.lang.StringBuilder + @tailrec def rec(p: Uri.Path, result: StringBuilder = new StringBuilder(base)): String = + p match { + case Uri.Path.Empty ⇒ result.toString + case Uri.Path.Slash(tail) ⇒ rec(tail, result.append(separator)) + case Uri.Path.Segment(head, tail) ⇒ + if (head.indexOf('/') >= 0 || head == "..") { + log.warning("File-system path for base [{}] and Uri.Path [{}] contains suspicious path segment [{}], " + + "GET access was disallowed", base, path, head) + "" + } else rec(tail, result.append(head)) + } + rec(if (path.startsWithSlash) path.tail else path) + } + + trait DirectoryRenderer { + def marshaller(renderVanityFooter: Boolean): ToEntityMarshaller[DirectoryListing] + } + trait LowLevelDirectoryRenderer { + implicit def defaultDirectoryRenderer(implicit ec: ExecutionContext): DirectoryRenderer = + new DirectoryRenderer { + def marshaller(renderVanityFooter: Boolean): ToEntityMarshaller[DirectoryListing] = + DirectoryListing.directoryMarshaller(renderVanityFooter) + } + } + object DirectoryRenderer extends LowLevelDirectoryRenderer { + implicit def liftMarshaller(implicit _marshaller: ToEntityMarshaller[DirectoryListing]): DirectoryRenderer = + new DirectoryRenderer { + def marshaller(renderVanityFooter: Boolean): ToEntityMarshaller[DirectoryListing] = _marshaller + } + } +} + +trait ContentTypeResolver { + def apply(fileName: String): ContentType +} + +object ContentTypeResolver { + + /** + * The default way of resolving a filename to a ContentType is by looking up the file extension in the + * registry of all defined media-types. By default all non-binary file content is assumed to be UTF-8 encoded. + */ + implicit val Default = withDefaultCharset(HttpCharsets.`UTF-8`) + + def withDefaultCharset(charset: HttpCharset): ContentTypeResolver = + new ContentTypeResolver { + def apply(fileName: String) = { + val ext = fileName.lastIndexOf('.') match { + case -1 ⇒ "" + case x ⇒ fileName.substring(x + 1) + } + val mediaType = MediaTypes.forExtension(ext) getOrElse MediaTypes.`application/octet-stream` + mediaType match { + case x if !x.binary ⇒ ContentType(x, charset) + case x ⇒ ContentType(x) + } + } + } +} + +case class DirectoryListing(path: String, isRoot: Boolean, files: Seq[File]) + +object DirectoryListing { + + private val html = + """ + |Index of $ + | + |

Index of $

+ |
+ |
+      |$
+ |
$ + |
+ |rendered by Akka Http on $ + |
$ + | + | + |""".stripMarginWithNewline("\n") split '$' + + def directoryMarshaller(renderVanityFooter: Boolean)(implicit ec: ExecutionContext): ToEntityMarshaller[DirectoryListing] = + Marshaller.StringMarshaller.wrap(MediaTypes.`text/html`) { listing ⇒ + val DirectoryListing(path, isRoot, files) = listing + val filesAndNames = files.map(file ⇒ file -> file.getName).sortBy(_._2) + val deduped = filesAndNames.zipWithIndex.flatMap { + case (fan @ (file, name), ix) ⇒ + if (ix == 0 || filesAndNames(ix - 1)._2 != name) Some(fan) else None + } + val (directoryFilesAndNames, fileFilesAndNames) = deduped.partition(_._1.isDirectory) + def maxNameLength(seq: Seq[(File, String)]) = if (seq.isEmpty) 0 else seq.map(_._2.length).max + val maxNameLen = math.max(maxNameLength(directoryFilesAndNames) + 1, maxNameLength(fileFilesAndNames)) + val sb = new java.lang.StringBuilder + sb.append(html(0)).append(path).append(html(1)).append(path).append(html(2)) + if (!isRoot) { + val secondToLastSlash = path.lastIndexOf('/', path.lastIndexOf('/', path.length - 1) - 1) + sb.append("../\n" format path.substring(0, secondToLastSlash)) + } + def lastModified(file: File) = DateTime(file.lastModified).toIsoLikeDateTimeString + def start(name: String) = + sb.append("").append(name).append("") + .append(" " * (maxNameLen - name.length)) + def renderDirectory(file: File, name: String) = + start(name + '/').append(" ").append(lastModified(file)).append('\n') + def renderFile(file: File, name: String) = { + val size = akka.http.util.humanReadableByteCount(file.length, si = true) + start(name).append(" ").append(lastModified(file)) + sb.append(" ".substring(size.length)).append(size).append('\n') + } + for ((file, name) ← directoryFilesAndNames) renderDirectory(file, name) + for ((file, name) ← fileFilesAndNames) renderFile(file, name) + if (isRoot && files.isEmpty) sb.append("(no files)\n") + sb.append(html(3)) + if (renderVanityFooter) sb.append(html(4)).append(DateTime.now.toIsoLikeDateTimeString).append(html(5)) + sb.append(html(6)).toString + } +} diff --git a/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala index 4990f0f083..e38c584122 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala @@ -34,6 +34,7 @@ trait RangeDirectives { */ def withRangeSupport: Directive0 = extractRequestContext.flatMap { ctx ⇒ + import ctx.flowMaterializer val settings = ctx.settings implicit val log = ctx.log import settings.{ rangeCountLimit, rangeCoalescingThreshold } @@ -66,7 +67,17 @@ trait RangeDirectives { def multipartRanges(ranges: Seq[ByteRange], entity: UniversalEntity): Multipart.ByteRanges = { val length = entity.contentLength val iRanges: Seq[IndexRange] = ranges.map(indexRange(length)) - val bodyParts = coalesceRanges(iRanges).map(ir ⇒ Multipart.ByteRanges.BodyPart(ir.contentRange(length), ir(entity))) + + // It's only possible to run once over the input entity data stream because it's not known if the + // source is reusable. + // Therefore, ranges need to be sorted to prevent that some selected ranges already start to accumulate data + // but cannot be sent out because another range is blocking the queue. + val coalescedRanges = coalesceRanges(iRanges).sortBy(_.start) + val bodyPartTransformers = coalescedRanges.map(ir ⇒ () ⇒ StreamUtils.sliceBytesTransformer(ir.start, ir.length)).toVector + val bodyPartByteStreams = StreamUtils.transformMultiple(entity.dataBytes, bodyPartTransformers) + val bodyParts = (coalescedRanges, bodyPartByteStreams).zipped.map { (range, bytes) ⇒ + Multipart.ByteRanges.BodyPart(range.contentRange(length), HttpEntity(entity.contentType, range.length, bytes)) + } Multipart.ByteRanges(Source(bodyParts.toVector)) } diff --git a/akka-parsing/src/main/scala/akka/shapeless/package.scala b/akka-parsing/src/main/scala/akka/shapeless/package.scala index 19e484eea9..efb78f6763 100644 --- a/akka-parsing/src/main/scala/akka/shapeless/package.scala +++ b/akka-parsing/src/main/scala/akka/shapeless/package.scala @@ -14,18 +14,16 @@ * limitations under the License. */ -package akka +package akka.shapeless -package object shapeless { - /** Dependent unary function type. */ - trait DepFn1[T] { - type Out - def apply(t: T): Out - } - - /** Dependent binary function type. */ - trait DepFn2[T, U] { - type Out - def apply(t: T, u: U): Out - } +/** Dependent unary function type. */ +trait DepFn1[T] { + type Out + def apply(t: T): Out +} + +/** Dependent binary function type. */ +trait DepFn2[T, U] { + type Out + def apply(t: T, u: U): Out } diff --git a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java new file mode 100644 index 0000000000..b09f2b04bb --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.stream; + +import akka.actor.ActorSystem; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; + +public abstract class StreamTest { + final protected ActorSystem system; + final protected FlowMaterializer materializer; + + protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) { + system = actorSystemResource.getSystem(); + MaterializerSettings settings = MaterializerSettings.create(system); + materializer = FlowMaterializer.create(settings, system); + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index 341f477194..9682d539b9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -1,10 +1,8 @@ package akka.stream.actor; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; +import akka.stream.StreamTest; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Source; import akka.stream.testkit.AkkaSpec; @@ -15,12 +13,15 @@ import org.reactivestreams.Publisher; import static akka.stream.actor.ActorPublisherMessage.Request; -public class ActorPublisherTest { +public class ActorPublisherTest extends StreamTest { + public ActorPublisherTest() { + super(actorSystemResource); + } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf()); - public static class TestPublisher extends UntypedActorPublisher { + public static class TestPublisher extends UntypedActorPublisher { @Override public void onReceive(Object msg) { @@ -35,11 +36,6 @@ public class ActorPublisherTest { } } - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustHaveJavaAPI() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java index 2340385719..de5da1c7ec 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -1,10 +1,8 @@ package akka.stream.actor; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; +import akka.stream.StreamTest; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; @@ -19,7 +17,10 @@ import java.util.Arrays; import static akka.stream.actor.ActorSubscriberMessage.OnError; import static akka.stream.actor.ActorSubscriberMessage.OnNext; -public class ActorSubscriberTest { +public class ActorSubscriberTest extends StreamTest { + public ActorSubscriberTest() { + super(actorSystemResource); + } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); @@ -55,11 +56,6 @@ public class ActorSubscriberTest { } } - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustHaveJavaAPI() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 2ed90f4041..de8b10fcfd 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -1,15 +1,13 @@ package akka.stream.javadsl; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.dispatch.Foreach; import akka.dispatch.Futures; import akka.dispatch.OnSuccess; import akka.japi.Pair; import akka.japi.Util; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; import akka.stream.OverflowStrategy; +import akka.stream.StreamTest; import akka.stream.Transformer; import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; @@ -32,17 +30,15 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; -public class FlowTest { +public class FlowTest extends StreamTest { + public FlowTest() { + super(actorSystemResource); + } - @ClassRule + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustBeAbleToUseSimpleOperators() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 4a2666c4e9..25b4e7479d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -3,9 +3,10 @@ */ package akka.stream.javadsl; -import akka.actor.ActorSystem; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; +import java.util.ArrayList; +import java.util.List; + +import akka.stream.StreamTest; import akka.stream.javadsl.japi.Function2; import akka.stream.testkit.AkkaSpec; import org.junit.ClassRule; @@ -18,17 +19,15 @@ import scala.concurrent.duration.Duration; import java.util.ArrayList; import java.util.List; -public class SinkTest { +public class SinkTest extends StreamTest { + public SinkTest() { + super(actorSystemResource); + } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { final KeyedSink> pubSink = Sink.fanoutPublisher(2, 2); diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 9d6451b380..7a17eaa3fb 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -35,6 +35,9 @@ akka { timeout = 5s } + # Fully qualified config path which holds the dispatcher configuration + # to be used by FlowMaterialiser when creating Actors for IO operations. + file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} } } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 2ec2704f78..8d2735beda 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -184,7 +184,8 @@ object MaterializerSettings { config.getInt("initial-fan-out-buffer-size"), config.getInt("max-fan-out-buffer-size"), config.getString("dispatcher"), - StreamSubscriptionTimeoutSettings(config)) + StreamSubscriptionTimeoutSettings(config), + config.getString("file-io-dispatcher")) /** * Java API @@ -223,7 +224,8 @@ final case class MaterializerSettings( initialFanOutBufferSize: Int, maxFanOutBufferSize: Int, dispatcher: String, - subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings) { + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + fileIODispatcher: String) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")