Merge pull request #16196 from spray/w/15922-FileAndResourceDirectives

+htp #15922 import FileAndResourceDirectives from spray
This commit is contained in:
Martynas Mickevičius 2014-11-12 14:36:45 +02:00
commit 7bf97c768b
30 changed files with 919 additions and 84 deletions

View file

@ -18,15 +18,15 @@ sealed abstract class ModeledCompanion extends Renderable {
val name = getClass.getSimpleName.replace("$minus", "-").dropRight(1) // trailing $ val name = getClass.getSimpleName.replace("$minus", "-").dropRight(1) // trailing $
val lowercaseName = name.toRootLowerCase val lowercaseName = name.toRootLowerCase
private[this] val nameBytes = name.asciiBytes private[this] val nameBytes = name.asciiBytes
def render[R <: Rendering](r: R): r.type = r ~~ nameBytes ~~ ':' ~~ ' ' final def render[R <: Rendering](r: R): r.type = r ~~ nameBytes ~~ ':' ~~ ' '
} }
sealed trait ModeledHeader extends HttpHeader with Serializable { sealed trait ModeledHeader extends HttpHeader with Serializable {
def name: String = companion.name def name: String = companion.name
def value: String = renderValue(new StringRendering).get def value: String = renderValue(new StringRendering).get
def lowercaseName: String = companion.lowercaseName def lowercaseName: String = companion.lowercaseName
def render[R <: Rendering](r: R): r.type = renderValue(r ~~ companion) final def render[R <: Rendering](r: R): r.type = renderValue(r ~~ companion)
def renderValue[R <: Rendering](r: R): r.type protected[http] def renderValue[R <: Rendering](r: R): r.type
protected def companion: ModeledCompanion protected def companion: ModeledCompanion
} }
@ -38,7 +38,7 @@ abstract class CustomHeader extends japi.headers.CustomHeader {
def suppressRendering: Boolean = false def suppressRendering: Boolean = false
def lowercaseName: String = name.toRootLowerCase def lowercaseName: String = name.toRootLowerCase
def render[R <: Rendering](r: R): r.type = r ~~ name ~~ ':' ~~ ' ' ~~ value final def render[R <: Rendering](r: R): r.type = r ~~ name ~~ ':' ~~ ' ' ~~ value
} }
import japi.JavaMapping.Implicits._ import japi.JavaMapping.Implicits._
@ -81,7 +81,7 @@ object Expect extends ModeledCompanion {
val `100-continue` = new Expect() {} val `100-continue` = new Expect() {}
} }
sealed abstract case class Expect private () extends ModeledHeader { sealed abstract case class Expect private () extends ModeledHeader {
def renderValue[R <: Rendering](r: R): r.type = r ~~ "100-continue" final def renderValue[R <: Rendering](r: R): r.type = r ~~ "100-continue"
protected def companion = Expect protected def companion = Expect
} }

View file

@ -4,17 +4,21 @@
package akka.http.util package akka.http.util
import akka.http.model.RequestEntity import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.impl.ErrorPublisher import java.io.InputStream
import akka.stream.Transformer
import akka.stream.FlowMaterializer import org.reactivestreams.{ Subscriber, Publisher }
import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.reactivestreams.Publisher
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import akka.actor.Props
import akka.util.ByteString
import akka.stream.{ impl, Transformer, FlowMaterializer }
import akka.stream.scaladsl._
import akka.http.model.RequestEntity
/** /**
* INTERNAL API * INTERNAL API
@ -50,7 +54,7 @@ private[http] object StreamUtils {
} }
def failedPublisher[T](ex: Throwable): Publisher[T] = def failedPublisher[T](ex: Throwable): Publisher[T] =
ErrorPublisher(ex).asInstanceOf[Publisher[T]] impl.ErrorPublisher(ex).asInstanceOf[Publisher[T]]
def mapErrorTransformer[T](f: Throwable Throwable): Transformer[T, T] = def mapErrorTransformer[T](f: Throwable Throwable): Transformer[T, T] =
new Transformer[T, T] { new Transformer[T, T] {
@ -98,8 +102,90 @@ private[http] object StreamUtils {
override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = currentState.onTermination(e) override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = currentState.onTermination(e)
} }
/**
* Applies a sequence of transformers on one source and returns a sequence of sources with the result. The input source
* will only be traversed once.
*/
def transformMultiple[T, U](input: Source[T], transformers: immutable.Seq[() Transformer[T, U]])(implicit materializer: FlowMaterializer): immutable.Seq[Source[U]] =
transformers match {
case Nil Nil
case Seq(one) Vector(input.transform("transformMultipleElement", one))
case multiple
val results = Vector.fill(multiple.size)(Sink.publisher[U])
val mat =
FlowGraph { implicit b
import FlowGraphImplicits._
val broadcast = Broadcast[T]("transformMultipleInputBroadcast")
input ~> broadcast
(multiple, results).zipped.foreach { (trans, sink)
broadcast ~> Flow[T].transform("transformMultipleElement", trans) ~> sink
}
}.run()
results.map(s Source(mat.get(s)))
}
def mapEntityError(f: Throwable Throwable): RequestEntity RequestEntity = def mapEntityError(f: Throwable Throwable): RequestEntity RequestEntity =
_.transformDataBytes(() mapErrorTransformer(f)) _.transformDataBytes(() mapErrorTransformer(f))
/**
* Simple blocking Source backed by an InputStream.
*
* FIXME: should be provided by akka-stream, see #15588
*/
def fromInputStreamSource(inputStream: InputStream, defaultChunkSize: Int = 65536): Source[ByteString] = {
import akka.stream.impl._
def props(materializer: ActorBasedFlowMaterializer): Props = {
val iterator = new Iterator[ByteString] {
var finished = false
def hasNext: Boolean = !finished
def next(): ByteString =
if (!finished) {
val buffer = new Array[Byte](defaultChunkSize)
val read = inputStream.read(buffer)
if (read < 0) {
finished = true
inputStream.close()
ByteString.empty
} else ByteString.fromArray(buffer, 0, read)
} else ByteString.empty
}
Props(new IteratorPublisherImpl(iterator, materializer.settings)).withDispatcher(materializer.settings.fileIODispatcher)
}
new AtomicBoolean(false) with SimpleActorFlowSource[ByteString] {
override def attach(flowSubscriber: Subscriber[ByteString], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[ByteString], Unit) =
if (!getAndSet(true)) {
val ref = materializer.actorOf(props(materializer), name = s"$flowName-0-InputStream-source")
val publisher = ActorPublisher[ByteString](ref)
ref ! ExposedPublisher(publisher.asInstanceOf[impl.ActorPublisher[Any]])
(publisher, ())
} else (ErrorPublisher(new IllegalStateException("One time source can only be instantiated once")).asInstanceOf[Publisher[ByteString]], ())
}
}
/**
* Returns a source that can only be used once for testing purposes.
*/
def oneTimeSource[T](other: Source[T]): Source[T] = {
import akka.stream.impl._
val original = other.asInstanceOf[ActorFlowSource[T]]
new AtomicBoolean(false) with SimpleActorFlowSource[T] {
override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[T], Unit) =
if (!getAndSet(true)) (original.create(materializer, flowName)._1, ())
else (ErrorPublisher(new IllegalStateException("One time source can only be instantiated once")).asInstanceOf[Publisher[T]], ())
}
}
} }
/** /**

View file

@ -10,7 +10,9 @@ import java.nio.charset.Charset
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.stream.{ FlowMaterializer, FlattenStrategy, Transformer } import akka.stream.{ FlowMaterializer, FlattenStrategy, Transformer }
import akka.stream.scaladsl.{ Flow, Source } import akka.stream.scaladsl.{ Flow, Source }
import scala.concurrent.Future import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.util.{ Failure, Success }
import scala.util.matching.Regex import scala.util.matching.Regex
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.util.ByteString import akka.util.ByteString
@ -77,6 +79,17 @@ package object util {
underlying.fold(Vector.empty[T])(_ :+ _) underlying.fold(Vector.empty[T])(_ :+ _)
} }
private[http] implicit class AddFutureAwaitResult[T](future: Future[T]) {
/** "Safe" Await.result that doesn't throw away half of the stacktrace */
def awaitResult(atMost: Duration): T = {
Await.ready(future, atMost)
future.value.get match {
case Success(t) t
case Failure(ex) throw new RuntimeException("Trying to await result of failed Future, see the cause for the original problem.", ex)
}
}
}
private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] =
new Transformer[ByteString, ByteString] { new Transformer[ByteString, ByteString] {
def onNext(element: ByteString) = element :: Nil def onNext(element: ByteString) = element :: Nil
@ -86,5 +99,14 @@ package object util {
private[this] val _identityFunc: Any Any = x x private[this] val _identityFunc: Any Any = x x
/** Returns a constant identity function to avoid allocating the closure */ /** Returns a constant identity function to avoid allocating the closure */
def identityFunc[T]: T T = _identityFunc.asInstanceOf[T T] def identityFunc[T]: T T = _identityFunc.asInstanceOf[T T]
def humanReadableByteCount(bytes: Long, si: Boolean): String = {
val unit = if (si) 1000 else 1024
if (bytes >= unit) {
val exp = (math.log(bytes) / math.log(unit)).toInt
val pre = if (si) "kMGTPE".charAt(exp - 1).toString else "KMGTPE".charAt(exp - 1).toString + 'i'
"%.1f %sB" format (bytes / math.pow(unit, exp), pre)
} else bytes.toString + " B"
}
} }

View file

@ -7,7 +7,8 @@ package akka.http.testkit
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext } import scala.concurrent.ExecutionContext
import akka.http.util._
import akka.stream.FlowMaterializer import akka.stream.FlowMaterializer
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.http.model.HttpEntity.ChunkStreamPart import akka.http.model.HttpEntity.ChunkStreamPart
@ -95,6 +96,6 @@ trait RouteTestResultComponent {
failTest("Request was neither completed nor rejected within " + timeout) failTest("Request was neither completed nor rejected within " + timeout)
private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] = private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] =
Await.result(data.grouped(Int.MaxValue).runWith(Sink.head), timeout) data.collectAll.awaitResult(timeout)
} }
} }

View file

@ -0,0 +1 @@
<p>Lorem ipsum!</p>

View file

@ -0,0 +1 @@
XyZ

View file

@ -0,0 +1 @@
123

View file

@ -0,0 +1 @@
123

View file

@ -0,0 +1,327 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.server
package directives
import java.io.{ File, FileOutputStream }
import akka.http.model.MediaTypes._
import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
import org.scalatest.matchers.Matcher
import org.scalatest.{ Inside, Inspectors }
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.util.Properties
class FileAndResourceDirectivesSpec extends RoutingSpec with Inspectors with Inside {
override def testConfigSource =
"""akka.http.routing {
| file-chunking-threshold-size = 16
| file-chunking-chunk-size = 8
| range-coalescing-threshold = 1
|}""".stripMargin
"getFromFile" should {
"reject non-GET requests" in {
Put() ~> getFromFile("some") ~> check { handled shouldEqual (false) }
}
"reject requests to non-existing files" in {
Get() ~> getFromFile("nonExistentFile") ~> check { handled shouldEqual (false) }
}
"reject requests to directories" in {
Get() ~> getFromFile(Properties.javaHome) ~> check { handled shouldEqual (false) }
}
"return the file content with the MediaType matching the file extension" in {
val file = File.createTempFile("akkaHttpTest", ".PDF")
try {
writeAllText("This is PDF", file)
Get() ~> getFromFile(file.getPath) ~> check {
mediaType shouldEqual `application/pdf`
definedCharset shouldEqual None
responseAs[String] shouldEqual "This is PDF"
headers should contain(`Last-Modified`(DateTime(file.lastModified)))
}
} finally file.delete
}
"return the file content with MediaType 'application/octet-stream' on unknown file extensions" in {
val file = File.createTempFile("akkaHttpTest", null)
try {
writeAllText("Some content", file)
Get() ~> getFromFile(file) ~> check {
mediaType shouldEqual `application/octet-stream`
responseAs[String] shouldEqual "Some content"
}
} finally file.delete
}
"return a single range from a file" in {
val file = File.createTempFile("partialTest", null)
try {
writeAllText("ABCDEFGHIJKLMNOPQRSTUVWXYZ", file)
Get() ~> addHeader(Range(ByteRange(0, 10))) ~> getFromFile(file) ~> check {
status shouldEqual StatusCodes.PartialContent
headers should contain(`Content-Range`(ContentRange(0, 10, 26)))
responseAs[String] shouldEqual "ABCDEFGHIJK"
}
} finally file.delete
}
"return multiple ranges from a file at once" in {
val file = File.createTempFile("partialTest", null)
try {
writeAllText("ABCDEFGHIJKLMNOPQRSTUVWXYZ", file)
val rangeHeader = Range(ByteRange(1, 10), ByteRange.suffix(10))
Get() ~> addHeader(rangeHeader) ~> getFromFile(file, ContentTypes.`text/plain`) ~> check {
status shouldEqual StatusCodes.PartialContent
header[`Content-Range`] shouldEqual None
mediaType.withParams(Map.empty) shouldEqual `multipart/byteranges`
val parts = responseAs[Multipart.ByteRanges].toStrict(100.millis).awaitResult(100.millis).strictParts
parts.size shouldEqual 2
parts(0).entity.data.utf8String shouldEqual "BCDEFGHIJK"
parts(1).entity.data.utf8String shouldEqual "QRSTUVWXYZ"
}
} finally file.delete
}
}
"getFromResource" should {
"reject non-GET requests" in {
Put() ~> getFromResource("some") ~> check { handled shouldEqual (false) }
}
"reject requests to non-existing resources" in {
Get() ~> getFromResource("nonExistingResource") ~> check { handled shouldEqual (false) }
}
"return the resource content with the MediaType matching the file extension" in {
val route = getFromResource("sample.html")
def runCheck() =
Get() ~> route ~> check {
mediaType shouldEqual `text/html`
forAtLeast(1, headers) { h
inside(h) {
case `Last-Modified`(dt)
DateTime(2011, 7, 1) should be < dt
dt.clicks should be < System.currentTimeMillis()
}
}
responseAs[String] shouldEqual "<p>Lorem ipsum!</p>"
}
runCheck()
runCheck() // additional test to check that no internal state is kept
}
"return the file content with MediaType 'application/octet-stream' on unknown file extensions" in {
Get() ~> getFromResource("sample.xyz") ~> check {
mediaType shouldEqual `application/octet-stream`
responseAs[String] shouldEqual "XyZ"
}
}
}
"getFromResourceDirectory" should {
"reject requests to non-existing resources" in {
Get("not/found") ~> getFromResourceDirectory("subDirectory") ~> check { handled shouldEqual (false) }
}
val verify = check {
mediaType shouldEqual `application/pdf`
responseAs[String] shouldEqual "123"
}
"return the resource content with the MediaType matching the file extension - example 1" in { Get("empty.pdf") ~> getFromResourceDirectory("subDirectory") ~> verify }
"return the resource content with the MediaType matching the file extension - example 2" in { Get("empty.pdf") ~> getFromResourceDirectory("subDirectory/") ~> verify }
"return the resource content with the MediaType matching the file extension - example 3" in { Get("subDirectory/empty.pdf") ~> getFromResourceDirectory("") ~> verify }
"reject requests to directory resources" in {
Get() ~> getFromResourceDirectory("subDirectory") ~> check { handled shouldEqual (false) }
}
}
"listDirectoryContents" should {
val base = new File(getClass.getClassLoader.getResource("").toURI).getPath
new File(base, "subDirectory/emptySub").mkdir()
def eraseDateTime(s: String) = s.replaceAll("""\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d""", "xxxx-xx-xx xx:xx:xx")
implicit val settings = RoutingSettings.default.copy(renderVanityFooter = false)
"properly render a simple directory" in {
Get() ~> listDirectoryContents(base + "/someDir") ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /</title></head>
|<body>
|<h1>Index of /</h1>
|<hr>
|<pre>
|<a href="/sub/">sub/</a> xxxx-xx-xx xx:xx:xx
|<a href="/fileA.txt">fileA.txt</a> xxxx-xx-xx xx:xx:xx 3 B
|<a href="/fileB.xml">fileB.xml</a> xxxx-xx-xx xx:xx:xx 0 B
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"properly render a sub directory" in {
Get("/sub/") ~> listDirectoryContents(base + "/someDir") ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /sub/</title></head>
|<body>
|<h1>Index of /sub/</h1>
|<hr>
|<pre>
|<a href="/">../</a>
|<a href="/sub/file.html">file.html</a> xxxx-xx-xx xx:xx:xx 0 B
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"properly render the union of several directories" in {
Get() ~> listDirectoryContents(base + "/someDir", base + "/subDirectory") ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /</title></head>
|<body>
|<h1>Index of /</h1>
|<hr>
|<pre>
|<a href="/emptySub/">emptySub/</a> xxxx-xx-xx xx:xx:xx
|<a href="/sub/">sub/</a> xxxx-xx-xx xx:xx:xx
|<a href="/empty.pdf">empty.pdf</a> xxxx-xx-xx xx:xx:xx 3 B
|<a href="/fileA.txt">fileA.txt</a> xxxx-xx-xx xx:xx:xx 3 B
|<a href="/fileB.xml">fileB.xml</a> xxxx-xx-xx xx:xx:xx 0 B
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"properly render an empty sub directory with vanity footer" in {
val settings = 0 // shadow implicit
Get("/emptySub/") ~> listDirectoryContents(base + "/subDirectory") ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /emptySub/</title></head>
|<body>
|<h1>Index of /emptySub/</h1>
|<hr>
|<pre>
|<a href="/">../</a>
|</pre>
|<hr>
|<div style="width:100%;text-align:right;color:gray">
|<small>rendered by <a href="http://akka.io">Akka Http</a> on xxxx-xx-xx xx:xx:xx</small>
|</div>
|</body>
|</html>
|"""
}
}
}
"properly render an empty top-level directory" in {
Get() ~> listDirectoryContents(base + "/subDirectory/emptySub") ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /</title></head>
|<body>
|<h1>Index of /</h1>
|<hr>
|<pre>
|(no files)
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"properly render a simple directory with a path prefix" in {
Get("/files/") ~> pathPrefix("files")(listDirectoryContents(base + "/someDir")) ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /files/</title></head>
|<body>
|<h1>Index of /files/</h1>
|<hr>
|<pre>
|<a href="/files/sub/">sub/</a> xxxx-xx-xx xx:xx:xx
|<a href="/files/fileA.txt">fileA.txt</a> xxxx-xx-xx xx:xx:xx 3 B
|<a href="/files/fileB.xml">fileB.xml</a> xxxx-xx-xx xx:xx:xx 0 B
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"properly render a sub directory with a path prefix" in {
Get("/files/sub/") ~> pathPrefix("files")(listDirectoryContents(base + "/someDir")) ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /files/sub/</title></head>
|<body>
|<h1>Index of /files/sub/</h1>
|<hr>
|<pre>
|<a href="/files/">../</a>
|<a href="/files/sub/file.html">file.html</a> xxxx-xx-xx xx:xx:xx 0 B
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"properly render an empty top-level directory with a path prefix" in {
Get("/files/") ~> pathPrefix("files")(listDirectoryContents(base + "/subDirectory/emptySub")) ~> check {
eraseDateTime(responseAs[String]) shouldEqual prep {
"""<html>
|<head><title>Index of /files/</title></head>
|<body>
|<h1>Index of /files/</h1>
|<hr>
|<pre>
|(no files)
|</pre>
|<hr>
|</body>
|</html>
|"""
}
}
}
"reject requests to file resources" in {
Get() ~> listDirectoryContents(base + "subDirectory/empty.pdf") ~> check { handled shouldEqual (false) }
}
}
def prep(s: String) = s.stripMarginWithNewline("\n")
def writeAllText(text: String, file: File): Unit = {
val fos = new FileOutputStream(file)
try {
fos.write(text.getBytes("UTF-8"))
} finally fos.close()
}
def evaluateTo[T](t: T, atMost: Duration = 100.millis)(implicit ec: ExecutionContext): Matcher[Future[T]] =
be(t).compose[Future[T]] { fut
import scala.concurrent.Await
fut.awaitResult(atMost)
}
}

View file

@ -9,6 +9,8 @@ import akka.http.model.StatusCodes._
import akka.http.model._ import akka.http.model._
import akka.http.model.headers._ import akka.http.model.headers._
import akka.http.util._ import akka.http.util._
import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.scalatest.{ Inside, Inspectors } import org.scalatest.{ Inside, Inspectors }
import scala.concurrent.Await import scala.concurrent.Await
@ -93,7 +95,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
} }
} }
"return a 'multipart/byteranges' for a ranged request with multiple coalesced ranges with preserved order" in { "return a 'multipart/byteranges' for a ranged request with multiple coalesced ranges and expect ranges in ascending order" in {
Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> { Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> {
wrs { complete("Some random and not super short entity.") } wrs { complete("Some random and not super short entity.") }
} ~> check { } ~> check {
@ -101,17 +103,30 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second)
parts.size shouldEqual 2 parts.size shouldEqual 2
inside(parts(0)) { inside(parts(0)) {
case Multipart.ByteRanges.BodyPart(range, entity, unit, headers)
range shouldEqual ContentRange.Default(5, 10, Some(39))
unit shouldEqual RangeUnits.Bytes
Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "random"
}
inside(parts(1)) {
case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) case Multipart.ByteRanges.BodyPart(range, entity, unit, headers)
range shouldEqual ContentRange.Default(0, 2, Some(39)) range shouldEqual ContentRange.Default(0, 2, Some(39))
unit shouldEqual RangeUnits.Bytes unit shouldEqual RangeUnits.Bytes
Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "Som" Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "Som"
} }
inside(parts(1)) {
case Multipart.ByteRanges.BodyPart(range, entity, unit, headers)
range shouldEqual ContentRange.Default(5, 10, Some(39))
unit shouldEqual RangeUnits.Bytes
Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "random"
}
}
}
"return a 'multipart/byteranges' for a ranged request with multiple ranges if entity data source isn't reusable" in {
val content = "Some random and not super short entity."
def entityData() = StreamUtils.oneTimeSource(Source.singleton(ByteString(content)))
Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> {
wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) }
} ~> check {
header[`Content-Range`] should be(None)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second)
parts.size shouldEqual 2
} }
} }

View file

@ -13,6 +13,12 @@ akka.http.routing {
# (Note that akka-http will always produce log messages containing the full error details) # (Note that akka-http will always produce log messages containing the full error details)
verbose-error-messages = off verbose-error-messages = off
# Enables/disables ETag and `If-Modified-Since` support for FileAndResourceDirectives
file-get-conditional = on
# Enables/disables the rendering of the "rendered by" footer in directory listings
render-vanity-footer = yes
# The maximum size between two requested ranges. Ranges with less space in between will be coalesced. # The maximum size between two requested ranges. Ranges with less space in between will be coalesced.
# #
# When multiple ranges are requested, a server may coalesce any of the ranges that overlap or that are separated # When multiple ranges are requested, a server may coalesce any of the ranges that overlap or that are separated

View file

@ -17,7 +17,7 @@ trait Directives extends RouteConcatenation
with DebuggingDirectives with DebuggingDirectives
with CodingDirectives with CodingDirectives
with ExecutionDirectives with ExecutionDirectives
//with FileAndResourceDirectives with FileAndResourceDirectives
//with FormFieldDirectives //with FormFieldDirectives
with FutureDirectives with FutureDirectives
with HeaderDirectives with HeaderDirectives

View file

@ -4,6 +4,8 @@
package akka.http.server package akka.http.server
import akka.stream.FlowMaterializer
import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.{ Future, ExecutionContext }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.http.marshalling.ToResponseMarshallable import akka.http.marshalling.ToResponseMarshallable
@ -26,6 +28,11 @@ trait RequestContext {
*/ */
implicit def executionContext: ExecutionContext implicit def executionContext: ExecutionContext
/**
* The default FlowMaterializer.
*/
implicit def flowMaterializer: FlowMaterializer
/** /**
* The default LoggingAdapter to be used for logging messages related to this request. * The default LoggingAdapter to be used for logging messages related to this request.
*/ */
@ -41,6 +48,7 @@ trait RequestContext {
*/ */
def reconfigure( def reconfigure(
executionContext: ExecutionContext = executionContext, executionContext: ExecutionContext = executionContext,
flowMaterializer: FlowMaterializer = flowMaterializer,
log: LoggingAdapter = log, log: LoggingAdapter = log,
settings: RoutingSettings = settings): RequestContext settings: RoutingSettings = settings): RequestContext
@ -71,6 +79,11 @@ trait RequestContext {
*/ */
def withExecutionContext(ec: ExecutionContext): RequestContext def withExecutionContext(ec: ExecutionContext): RequestContext
/**
* Returns a copy of this context with the new HttpRequest.
*/
def withFlowMaterializer(materializer: FlowMaterializer): RequestContext
/** /**
* Returns a copy of this context with the new LoggingAdapter. * Returns a copy of this context with the new LoggingAdapter.
*/ */

View file

@ -4,6 +4,8 @@
package akka.http.server package akka.http.server
import akka.stream.FlowMaterializer
import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.{ Future, ExecutionContext }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.http.marshalling.ToResponseMarshallable import akka.http.marshalling.ToResponseMarshallable
@ -18,14 +20,15 @@ private[http] class RequestContextImpl(
val request: HttpRequest, val request: HttpRequest,
val unmatchedPath: Uri.Path, val unmatchedPath: Uri.Path,
val executionContext: ExecutionContext, val executionContext: ExecutionContext,
val flowMaterializer: FlowMaterializer,
val log: LoggingAdapter, val log: LoggingAdapter,
val settings: RoutingSettings) extends RequestContext { val settings: RoutingSettings) extends RequestContext {
def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext) = def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: FlowMaterializer) =
this(request, request.uri.path, ec, log, settings) this(request, request.uri.path, ec, materializer, log, settings)
def reconfigure(executionContext: ExecutionContext, log: LoggingAdapter, settings: RoutingSettings): RequestContext = def reconfigure(executionContext: ExecutionContext, flowMaterializer: FlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext =
copy(executionContext = executionContext, log = log, settings = settings) copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log, settings = settings)
override def complete(trm: ToResponseMarshallable): Future[RouteResult] = override def complete(trm: ToResponseMarshallable): Future[RouteResult] =
trm(request)(executionContext) trm(request)(executionContext)
@ -44,6 +47,9 @@ private[http] class RequestContextImpl(
override def withExecutionContext(ec: ExecutionContext): RequestContext = override def withExecutionContext(ec: ExecutionContext): RequestContext =
copy(executionContext = ec) copy(executionContext = ec)
override def withFlowMaterializer(materializer: FlowMaterializer): RequestContext =
copy(flowMaterializer = materializer)
override def withLog(log: LoggingAdapter): RequestContext = override def withLog(log: LoggingAdapter): RequestContext =
copy(log = log) copy(log = log)
@ -65,7 +71,8 @@ private[http] class RequestContextImpl(
private def copy(request: HttpRequest = request, private def copy(request: HttpRequest = request,
unmatchedPath: Uri.Path = unmatchedPath, unmatchedPath: Uri.Path = unmatchedPath,
executionContext: ExecutionContext = executionContext, executionContext: ExecutionContext = executionContext,
flowMaterializer: FlowMaterializer = flowMaterializer,
log: LoggingAdapter = log, log: LoggingAdapter = log,
settings: RoutingSettings = settings) = settings: RoutingSettings = settings) =
new RequestContextImpl(request, unmatchedPath, executionContext, log, settings) new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, settings)
} }

View file

@ -10,12 +10,16 @@ import akka.http.util._
case class RoutingSettings( case class RoutingSettings(
verboseErrorMessages: Boolean, verboseErrorMessages: Boolean,
fileGetConditional: Boolean,
renderVanityFooter: Boolean,
rangeCountLimit: Int, rangeCountLimit: Int,
rangeCoalescingThreshold: Long) rangeCoalescingThreshold: Long)
object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.routing") { object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.routing") {
def fromSubConfig(c: Config) = apply( def fromSubConfig(c: Config) = apply(
c getBoolean "verbose-error-messages", c getBoolean "verbose-error-messages",
c getBoolean "file-get-conditional",
c getBoolean "render-vanity-footer",
c getInt "range-count-limit", c getInt "range-count-limit",
c getBytes "range-coalescing-threshold") c getBytes "range-coalescing-threshold")

View file

@ -4,6 +4,8 @@
package akka.http.server package akka.http.server
import akka.stream.FlowMaterializer
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import akka.actor.{ ActorSystem, ActorContext } import akka.actor.{ ActorSystem, ActorContext }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
@ -33,10 +35,12 @@ class RoutingSetup(
val exceptionHandler: ExceptionHandler, val exceptionHandler: ExceptionHandler,
val rejectionHandler: RejectionHandler, val rejectionHandler: RejectionHandler,
val executionContext: ExecutionContext, val executionContext: ExecutionContext,
val flowMaterializer: FlowMaterializer,
val routingLog: RoutingLog) { val routingLog: RoutingLog) {
// enable `import setup._` to properly bring implicits in scope // enable `import setup._` to properly bring implicits in scope
implicit def executor: ExecutionContext = executionContext implicit def executor: ExecutionContext = executionContext
implicit def materializer: FlowMaterializer = flowMaterializer
} }
object RoutingSetup { object RoutingSetup {
@ -44,12 +48,14 @@ object RoutingSetup {
exceptionHandler: ExceptionHandler = null, exceptionHandler: ExceptionHandler = null,
rejectionHandler: RejectionHandler = null, rejectionHandler: RejectionHandler = null,
executionContext: ExecutionContext, executionContext: ExecutionContext,
flowMaterializer: FlowMaterializer,
routingLog: RoutingLog): RoutingSetup = routingLog: RoutingLog): RoutingSetup =
new RoutingSetup( new RoutingSetup(
routingSettings, routingSettings,
if (exceptionHandler ne null) exceptionHandler else ExceptionHandler.default(routingSettings), if (exceptionHandler ne null) exceptionHandler else ExceptionHandler.default(routingSettings),
if (rejectionHandler ne null) rejectionHandler else RejectionHandler.default(executionContext), if (rejectionHandler ne null) rejectionHandler else RejectionHandler.default(executionContext),
executionContext, executionContext,
flowMaterializer,
routingLog) routingLog)
} }

View file

@ -6,6 +6,7 @@ package akka.http.server
package directives package directives
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream.FlowMaterializer
import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.{ Future, ExecutionContext }
import scala.collection.immutable import scala.collection.immutable
@ -140,6 +141,17 @@ trait BasicDirectives {
*/ */
def extractExecutionContext: Directive1[ExecutionContext] = BasicDirectives._extractExecutionContext def extractExecutionContext: Directive1[ExecutionContext] = BasicDirectives._extractExecutionContext
/**
* Runs its inner route with the given alternative [[FlowMaterializer]].
*/
def withFlowMaterializer(materializer: FlowMaterializer): Directive0 =
mapRequestContext(_ withFlowMaterializer materializer)
/**
* Extracts the [[ExecutionContext]] from the [[RequestContext]].
*/
def extractFlowMaterializer: Directive1[FlowMaterializer] = BasicDirectives._extractFlowMaterializer
/** /**
* Runs its inner route with the given alternative [[LoggingAdapter]]. * Runs its inner route with the given alternative [[LoggingAdapter]].
*/ */
@ -181,6 +193,7 @@ object BasicDirectives extends BasicDirectives {
private val _extractRequest: Directive1[HttpRequest] = extract(_.request) private val _extractRequest: Directive1[HttpRequest] = extract(_.request)
private val _extractUri: Directive1[Uri] = extract(_.request.uri) private val _extractUri: Directive1[Uri] = extract(_.request.uri)
private val _extractExecutionContext: Directive1[ExecutionContext] = extract(_.executionContext) private val _extractExecutionContext: Directive1[ExecutionContext] = extract(_.executionContext)
private val _extractFlowMaterializer: Directive1[FlowMaterializer] = extract(_.flowMaterializer)
private val _extractLog: Directive1[LoggingAdapter] = extract(_.log) private val _extractLog: Directive1[LoggingAdapter] = extract(_.log)
private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings) private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings)
private val _extractRequestContext: Directive1[RequestContext] = extract(akka.http.util.identityFunc) private val _extractRequestContext: Directive1[RequestContext] = extract(akka.http.util.identityFunc)

View file

@ -0,0 +1,311 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.server
package directives
import java.io.{ File, FileInputStream }
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.http.marshalling.{ Marshaller, ToEntityMarshaller }
import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
trait FileAndResourceDirectives {
import CacheConditionDirectives._
import MethodDirectives._
import FileAndResourceDirectives._
import RouteDirectives._
import BasicDirectives._
import RouteConcatenation._
import RangeDirectives._
/**
* Completes GET requests with the content of the given file. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !). If the file cannot be found or read the request is rejected.
*/
def getFromFile(fileName: String)(implicit resolver: ContentTypeResolver): Route =
getFromFile(new File(fileName))
/**
* Completes GET requests with the content of the given file. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !). If the file cannot be found or read the request is rejected.
*/
def getFromFile(file: File)(implicit resolver: ContentTypeResolver): Route =
getFromFile(file, resolver(file.getName))
/**
* Completes GET requests with the content of the given file. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !). If the file cannot be found or read the request is rejected.
*/
def getFromFile(file: File, contentType: ContentType): Route =
get {
if (file.isFile && file.canRead)
conditionalFor(file.length, file.lastModified).apply {
withRangeSupport {
extractExecutionContext { implicit ec
complete(HttpEntity.Default(contentType, file.length, StreamUtils.fromInputStreamSource(new FileInputStream(file))))
}
}
}
else reject
}
private def conditionalFor(length: Long, lastModified: Long): Directive0 =
extractSettings.flatMap(settings
if (settings.fileGetConditional) {
val tag = java.lang.Long.toHexString(lastModified ^ java.lang.Long.reverse(length))
val lastModifiedDateTime = DateTime(math.min(lastModified, System.currentTimeMillis))
conditional(EntityTag(tag), lastModifiedDateTime)
} else pass)
/**
* Completes GET requests with the content of the given resource. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !).
* If the resource cannot be found or read the Route rejects the request.
*/
def getFromResource(resourceName: String)(implicit resolver: ContentTypeResolver): Route =
getFromResource(resourceName, resolver(resourceName))
/**
* Completes GET requests with the content of the given resource. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !).
* If the resource cannot be found or read the Route rejects the request.
*/
def getFromResource(resourceName: String, contentType: ContentType, theClassLoader: ClassLoader = classOf[ActorSystem].getClassLoader): Route =
if (!resourceName.endsWith("/"))
get {
theClassLoader.getResource(resourceName) match {
case null reject
case url
val (length, lastModified) = {
val conn = url.openConnection()
try {
conn.setUseCaches(false) // otherwise the JDK will keep the JAR file open when we close!
val len = conn.getContentLength
val lm = conn.getLastModified
len -> lm
} finally conn.getInputStream.close()
}
conditionalFor(length, lastModified).apply {
withRangeSupport {
extractExecutionContext { implicit ec
complete {
HttpEntity.Default(contentType, length, StreamUtils.fromInputStreamSource(url.openStream()))
}
}
}
}
}
}
else reject // don't serve the content of resource "directories"
/**
* Completes GET requests with the content of a file underneath the given directory.
* If the file cannot be read the Route rejects the request.
*/
def getFromDirectory(directoryName: String)(implicit resolver: ContentTypeResolver): Route = {
val base = withTrailingSlash(directoryName)
extractUnmatchedPath { path
extractLog { log
fileSystemPath(base, path, log) match {
case "" reject
case fileName getFromFile(fileName)
}
}
}
}
/**
* Completes GET requests with a unified listing of the contents of all given directories.
* The actual rendering of the directory contents is performed by the in-scope `Marshaller[DirectoryListing]`.
*/
def listDirectoryContents(directories: String*)(implicit renderer: DirectoryRenderer): Route =
get {
extractRequestContext { ctx
val path = ctx.unmatchedPath
val fullPath = ctx.request.uri.path.toString
val matchedLength = fullPath.lastIndexOf(path.toString)
require(matchedLength >= 0)
val pathPrefix = fullPath.substring(0, matchedLength)
val pathString = withTrailingSlash(fileSystemPath("/", path, ctx.log, '/'))
val dirs = directories flatMap { dir
fileSystemPath(withTrailingSlash(dir), path, ctx.log) match {
case "" None
case fileName
val file = new File(fileName)
if (file.isDirectory && file.canRead) Some(file) else None
}
}
import ctx.executionContext
implicit val marshaller: ToEntityMarshaller[DirectoryListing] = renderer.marshaller(ctx.settings.renderVanityFooter)
if (dirs.isEmpty) reject
else complete(DirectoryListing(pathPrefix + pathString, isRoot = pathString == "/", dirs.flatMap(_.listFiles)))
}
}
/**
* Same as `getFromBrowseableDirectories` with only one directory.
*/
def getFromBrowseableDirectory(directory: String)(implicit renderer: DirectoryRenderer, resolver: ContentTypeResolver): Route =
getFromBrowseableDirectories(directory)
/**
* Serves the content of the given directories as a file system browser, i.e. files are sent and directories
* served as browseable listings.
*/
def getFromBrowseableDirectories(directories: String*)(implicit renderer: DirectoryRenderer, resolver: ContentTypeResolver): Route = {
directories.map(getFromDirectory).reduceLeft(_ ~ _) ~ listDirectoryContents(directories: _*)
}
/**
* Same as "getFromDirectory" except that the file is not fetched from the file system but rather from a
* "resource directory".
*/
def getFromResourceDirectory(directoryName: String)(implicit resolver: ContentTypeResolver): Route = {
val base = if (directoryName.isEmpty) "" else withTrailingSlash(directoryName)
extractUnmatchedPath { path
extractLog { log
fileSystemPath(base, path, log, separator = '/') match {
case "" reject
case resourceName getFromResource(resourceName)
}
}
}
}
}
object FileAndResourceDirectives extends FileAndResourceDirectives {
private def withTrailingSlash(path: String): String = if (path endsWith "/") path else path + '/'
private def fileSystemPath(base: String, path: Uri.Path, log: LoggingAdapter, separator: Char = File.separatorChar): String = {
import java.lang.StringBuilder
@tailrec def rec(p: Uri.Path, result: StringBuilder = new StringBuilder(base)): String =
p match {
case Uri.Path.Empty result.toString
case Uri.Path.Slash(tail) rec(tail, result.append(separator))
case Uri.Path.Segment(head, tail)
if (head.indexOf('/') >= 0 || head == "..") {
log.warning("File-system path for base [{}] and Uri.Path [{}] contains suspicious path segment [{}], " +
"GET access was disallowed", base, path, head)
""
} else rec(tail, result.append(head))
}
rec(if (path.startsWithSlash) path.tail else path)
}
trait DirectoryRenderer {
def marshaller(renderVanityFooter: Boolean): ToEntityMarshaller[DirectoryListing]
}
trait LowLevelDirectoryRenderer {
implicit def defaultDirectoryRenderer(implicit ec: ExecutionContext): DirectoryRenderer =
new DirectoryRenderer {
def marshaller(renderVanityFooter: Boolean): ToEntityMarshaller[DirectoryListing] =
DirectoryListing.directoryMarshaller(renderVanityFooter)
}
}
object DirectoryRenderer extends LowLevelDirectoryRenderer {
implicit def liftMarshaller(implicit _marshaller: ToEntityMarshaller[DirectoryListing]): DirectoryRenderer =
new DirectoryRenderer {
def marshaller(renderVanityFooter: Boolean): ToEntityMarshaller[DirectoryListing] = _marshaller
}
}
}
trait ContentTypeResolver {
def apply(fileName: String): ContentType
}
object ContentTypeResolver {
/**
* The default way of resolving a filename to a ContentType is by looking up the file extension in the
* registry of all defined media-types. By default all non-binary file content is assumed to be UTF-8 encoded.
*/
implicit val Default = withDefaultCharset(HttpCharsets.`UTF-8`)
def withDefaultCharset(charset: HttpCharset): ContentTypeResolver =
new ContentTypeResolver {
def apply(fileName: String) = {
val ext = fileName.lastIndexOf('.') match {
case -1 ""
case x fileName.substring(x + 1)
}
val mediaType = MediaTypes.forExtension(ext) getOrElse MediaTypes.`application/octet-stream`
mediaType match {
case x if !x.binary ContentType(x, charset)
case x ContentType(x)
}
}
}
}
case class DirectoryListing(path: String, isRoot: Boolean, files: Seq[File])
object DirectoryListing {
private val html =
"""<html>
|<head><title>Index of $</title></head>
|<body>
|<h1>Index of $</h1>
|<hr>
|<pre>
|$</pre>
|<hr>$
|<div style="width:100%;text-align:right;color:gray">
|<small>rendered by <a href="http://akka.io">Akka Http</a> on $</small>
|</div>$
|</body>
|</html>
|""".stripMarginWithNewline("\n") split '$'
def directoryMarshaller(renderVanityFooter: Boolean)(implicit ec: ExecutionContext): ToEntityMarshaller[DirectoryListing] =
Marshaller.StringMarshaller.wrap(MediaTypes.`text/html`) { listing
val DirectoryListing(path, isRoot, files) = listing
val filesAndNames = files.map(file file -> file.getName).sortBy(_._2)
val deduped = filesAndNames.zipWithIndex.flatMap {
case (fan @ (file, name), ix)
if (ix == 0 || filesAndNames(ix - 1)._2 != name) Some(fan) else None
}
val (directoryFilesAndNames, fileFilesAndNames) = deduped.partition(_._1.isDirectory)
def maxNameLength(seq: Seq[(File, String)]) = if (seq.isEmpty) 0 else seq.map(_._2.length).max
val maxNameLen = math.max(maxNameLength(directoryFilesAndNames) + 1, maxNameLength(fileFilesAndNames))
val sb = new java.lang.StringBuilder
sb.append(html(0)).append(path).append(html(1)).append(path).append(html(2))
if (!isRoot) {
val secondToLastSlash = path.lastIndexOf('/', path.lastIndexOf('/', path.length - 1) - 1)
sb.append("<a href=\"%s/\">../</a>\n" format path.substring(0, secondToLastSlash))
}
def lastModified(file: File) = DateTime(file.lastModified).toIsoLikeDateTimeString
def start(name: String) =
sb.append("<a href=\"").append(path + name).append("\">").append(name).append("</a>")
.append(" " * (maxNameLen - name.length))
def renderDirectory(file: File, name: String) =
start(name + '/').append(" ").append(lastModified(file)).append('\n')
def renderFile(file: File, name: String) = {
val size = akka.http.util.humanReadableByteCount(file.length, si = true)
start(name).append(" ").append(lastModified(file))
sb.append(" ".substring(size.length)).append(size).append('\n')
}
for ((file, name) directoryFilesAndNames) renderDirectory(file, name)
for ((file, name) fileFilesAndNames) renderFile(file, name)
if (isRoot && files.isEmpty) sb.append("(no files)\n")
sb.append(html(3))
if (renderVanityFooter) sb.append(html(4)).append(DateTime.now.toIsoLikeDateTimeString).append(html(5))
sb.append(html(6)).toString
}
}

View file

@ -34,6 +34,7 @@ trait RangeDirectives {
*/ */
def withRangeSupport: Directive0 = def withRangeSupport: Directive0 =
extractRequestContext.flatMap { ctx extractRequestContext.flatMap { ctx
import ctx.flowMaterializer
val settings = ctx.settings val settings = ctx.settings
implicit val log = ctx.log implicit val log = ctx.log
import settings.{ rangeCountLimit, rangeCoalescingThreshold } import settings.{ rangeCountLimit, rangeCoalescingThreshold }
@ -66,7 +67,17 @@ trait RangeDirectives {
def multipartRanges(ranges: Seq[ByteRange], entity: UniversalEntity): Multipart.ByteRanges = { def multipartRanges(ranges: Seq[ByteRange], entity: UniversalEntity): Multipart.ByteRanges = {
val length = entity.contentLength val length = entity.contentLength
val iRanges: Seq[IndexRange] = ranges.map(indexRange(length)) val iRanges: Seq[IndexRange] = ranges.map(indexRange(length))
val bodyParts = coalesceRanges(iRanges).map(ir Multipart.ByteRanges.BodyPart(ir.contentRange(length), ir(entity)))
// It's only possible to run once over the input entity data stream because it's not known if the
// source is reusable.
// Therefore, ranges need to be sorted to prevent that some selected ranges already start to accumulate data
// but cannot be sent out because another range is blocking the queue.
val coalescedRanges = coalesceRanges(iRanges).sortBy(_.start)
val bodyPartTransformers = coalescedRanges.map(ir () StreamUtils.sliceBytesTransformer(ir.start, ir.length)).toVector
val bodyPartByteStreams = StreamUtils.transformMultiple(entity.dataBytes, bodyPartTransformers)
val bodyParts = (coalescedRanges, bodyPartByteStreams).zipped.map { (range, bytes)
Multipart.ByteRanges.BodyPart(range.contentRange(length), HttpEntity(entity.contentType, range.length, bytes))
}
Multipart.ByteRanges(Source(bodyParts.toVector)) Multipart.ByteRanges(Source(bodyParts.toVector))
} }

View file

@ -14,18 +14,16 @@
* limitations under the License. * limitations under the License.
*/ */
package akka package akka.shapeless
package object shapeless { /** Dependent unary function type. */
/** Dependent unary function type. */ trait DepFn1[T] {
trait DepFn1[T] { type Out
type Out def apply(t: T): Out
def apply(t: T): Out }
}
/** Dependent binary function type. */
/** Dependent binary function type. */ trait DepFn2[T, U] {
trait DepFn2[T, U] { type Out
type Out def apply(t: T, u: U): Out
def apply(t: T, u: U): Out
}
} }

View file

@ -0,0 +1,19 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream;
import akka.actor.ActorSystem;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
public abstract class StreamTest {
final protected ActorSystem system;
final protected FlowMaterializer materializer;
protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) {
system = actorSystemResource.getSystem();
MaterializerSettings settings = MaterializerSettings.create(system);
materializer = FlowMaterializer.create(settings, system);
}
}

View file

@ -1,10 +1,8 @@
package akka.stream.actor; package akka.stream.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.stream.FlowMaterializer; import akka.stream.StreamTest;
import akka.stream.MaterializerSettings;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
@ -15,12 +13,15 @@ import org.reactivestreams.Publisher;
import static akka.stream.actor.ActorPublisherMessage.Request; import static akka.stream.actor.ActorPublisherMessage.Request;
public class ActorPublisherTest { public class ActorPublisherTest extends StreamTest {
public ActorPublisherTest() {
super(actorSystemResource);
}
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf()); public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf());
public static class TestPublisher extends UntypedActorPublisher<Integer> { public static class TestPublisher extends UntypedActorPublisher<Integer> {
@Override @Override
public void onReceive(Object msg) { public void onReceive(Object msg) {
@ -35,11 +36,6 @@ public class ActorPublisherTest {
} }
} }
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test
public void mustHaveJavaAPI() { public void mustHaveJavaAPI() {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -1,10 +1,8 @@
package akka.stream.actor; package akka.stream.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.stream.FlowMaterializer; import akka.stream.StreamTest;
import akka.stream.MaterializerSettings;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
@ -19,7 +17,10 @@ import java.util.Arrays;
import static akka.stream.actor.ActorSubscriberMessage.OnError; import static akka.stream.actor.ActorSubscriberMessage.OnError;
import static akka.stream.actor.ActorSubscriberMessage.OnNext; import static akka.stream.actor.ActorSubscriberMessage.OnNext;
public class ActorSubscriberTest { public class ActorSubscriberTest extends StreamTest {
public ActorSubscriberTest() {
super(actorSystemResource);
}
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf());
@ -55,11 +56,6 @@ public class ActorSubscriberTest {
} }
} }
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test
public void mustHaveJavaAPI() { public void mustHaveJavaAPI() {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -1,15 +1,13 @@
package akka.stream.javadsl; package akka.stream.javadsl;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Foreach; import akka.dispatch.Foreach;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.dispatch.OnSuccess; import akka.dispatch.OnSuccess;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.Util; import akka.japi.Util;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.OverflowStrategy; import akka.stream.OverflowStrategy;
import akka.stream.StreamTest;
import akka.stream.Transformer; import akka.stream.Transformer;
import akka.stream.javadsl.japi.*; import akka.stream.javadsl.japi.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
@ -32,17 +30,15 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class FlowTest { public class FlowTest extends StreamTest {
public FlowTest() {
super(actorSystemResource);
}
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf()); AkkaSpec.testConf());
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test
public void mustBeAbleToUseSimpleOperators() { public void mustBeAbleToUseSimpleOperators() {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -3,9 +3,10 @@
*/ */
package akka.stream.javadsl; package akka.stream.javadsl;
import akka.actor.ActorSystem; import java.util.ArrayList;
import akka.stream.FlowMaterializer; import java.util.List;
import akka.stream.MaterializerSettings;
import akka.stream.StreamTest;
import akka.stream.javadsl.japi.Function2; import akka.stream.javadsl.japi.Function2;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -18,17 +19,15 @@ import scala.concurrent.duration.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class SinkTest { public class SinkTest extends StreamTest {
public SinkTest() {
super(actorSystemResource);
}
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf()); AkkaSpec.testConf());
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test
public void mustBeAbleToUseFanoutPublisher() throws Exception { public void mustBeAbleToUseFanoutPublisher() throws Exception {
final KeyedSink<Object, Publisher<Object>> pubSink = Sink.fanoutPublisher(2, 2); final KeyedSink<Object, Publisher<Object>> pubSink = Sink.fanoutPublisher(2, 2);

View file

@ -35,6 +35,9 @@ akka {
timeout = 5s timeout = 5s
} }
# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors for IO operations.
file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher}
} }
} }

View file

@ -184,7 +184,8 @@ object MaterializerSettings {
config.getInt("initial-fan-out-buffer-size"), config.getInt("initial-fan-out-buffer-size"),
config.getInt("max-fan-out-buffer-size"), config.getInt("max-fan-out-buffer-size"),
config.getString("dispatcher"), config.getString("dispatcher"),
StreamSubscriptionTimeoutSettings(config)) StreamSubscriptionTimeoutSettings(config),
config.getString("file-io-dispatcher"))
/** /**
* Java API * Java API
@ -223,7 +224,8 @@ final case class MaterializerSettings(
initialFanOutBufferSize: Int, initialFanOutBufferSize: Int,
maxFanOutBufferSize: Int, maxFanOutBufferSize: Int,
dispatcher: String, dispatcher: String,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings) { subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
fileIODispatcher: String) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")