diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index a71b8f2f86..1d7ce9086c 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -4,15 +4,16 @@ package akka.http.model +import akka.http.util.FastFuture + import language.implicitConversions import java.io.File import java.lang.{ Iterable ⇒ JIterable } import org.reactivestreams.Publisher -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import akka.util.ByteString -import akka.http.util.Deferrable import akka.stream.{ TimerTransformer, FlowMaterializer } import akka.stream.scaladsl.Flow import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable } @@ -41,7 +42,7 @@ sealed trait HttpEntity extends japi.HttpEntity { * Collects all possible parts and returns a potentially future Strict entity for easier processing. * The Deferrable is failed with an TimeoutException if the stream isn't completed after the given timeout. */ - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[HttpEntity.Strict] = { + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[HttpEntity.Strict] = { def transformer() = new TimerTransformer[ByteString, HttpEntity.Strict] { var bytes = ByteString.newBuilder @@ -59,7 +60,7 @@ sealed trait HttpEntity extends japi.HttpEntity { throw new java.util.concurrent.TimeoutException( s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data") } - Deferrable(Flow(dataBytes).timerTransform("toStrict", transformer).toFuture()) + Flow(dataBytes).timerTransform("toStrict", transformer).toFuture() } /** @@ -133,7 +134,7 @@ object HttpEntity { def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = SynchronousPublisherFromIterable(data :: Nil) override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = - Deferrable(this) + FastFuture.successful(this) def withContentType(contentType: ContentType): Strict = if (contentType == this.contentType) this else copy(contentType = contentType) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index e61c1558d0..454e9d7b9e 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -6,7 +6,7 @@ package akka.http.model import java.lang.{ Iterable ⇒ JIterable } import scala.concurrent.duration.FiniteDuration -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import scala.reflect.{ classTag, ClassTag } import akka.stream.FlowMaterializer @@ -14,6 +14,7 @@ import akka.util.ByteString import akka.http.util._ import headers._ import HttpCharsets._ +import FastFuture._ /** * Common base class of HttpRequest and HttpResponse. @@ -47,8 +48,8 @@ sealed trait HttpMessage extends japi.HttpMessage { def withEntity(entity: MessageEntity): Self /** Returns a sharable and serializable copy of this message with a strict entity. */ - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[Self] = - entity.toStrict(timeout).map(this.withEntity) + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Self] = + entity.toStrict(timeout).fast.map(this.withEntity) /** Returns a copy of this message with the entity and headers set to the given ones. */ def withHeadersAndEntity(headers: immutable.Seq[HttpHeader], entity: MessageEntity): Self diff --git a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala index b6ec3ef271..b1b1e18f3d 100644 --- a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala +++ b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala @@ -6,12 +6,13 @@ package akka.http.model import java.io.File import org.reactivestreams.Publisher -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import akka.stream.impl.SynchronousPublisherFromIterable -import akka.http.util.Deferrable +import akka.http.util.FastFuture +import FastFuture._ import headers._ trait MultipartParts { @@ -56,8 +57,8 @@ case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts * Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off * hint. */ - def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[StrictMultipartFormData] = - Deferrable(Flow(parts).grouped(maxFieldCount).toFuture()).map(new StrictMultipartFormData(_)) + def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[StrictMultipartFormData] = + Flow(parts).grouped(maxFieldCount).toFuture().fast.map(new StrictMultipartFormData(_)) } /** @@ -70,7 +71,7 @@ class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends Multi def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName)) override def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer) = - Deferrable(this) + FastFuture.successful(this) } object MultipartFormData { diff --git a/akka-http-core/src/main/scala/akka/http/util/Deferrable.scala b/akka-http-core/src/main/scala/akka/http/util/Deferrable.scala deleted file mode 100644 index 69a49c23ed..0000000000 --- a/akka-http-core/src/main/scala/akka/http/util/Deferrable.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.util - -import scala.concurrent.duration.Duration -import scala.concurrent.{ Await, ExecutionContext, Future } -import scala.util.control.NonFatal -import scala.util.{ Failure, Success, Try } - -// TODO: remove and switch to plain old futures if performance is not significantly better - -/** - * An ADT modelling values that are either strictly available immediately or available at some point in the future. - * The benefit over a direct `Future` is that mapping and flatMapping doesn't have to be scheduled if the value - * is strictly available. - */ -sealed trait Deferrable[+A] { - def map[B](f: A ⇒ B)(implicit ec: ExecutionContext): Deferrable[B] - def flatMap[B](f: A ⇒ Deferrable[B])(implicit ec: ExecutionContext): Deferrable[B] - def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext): Unit - def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext): Deferrable[B] - def toFuture: Future[A] - def await(timeout: Duration): A -} - -object Deferrable { - def apply[T](value: T): Strict[T] = Strict(value) - def apply[T](value: Try[T]): Deferrable[T] = - value match { - case Success(x) ⇒ Strict(x) - case Failure(e) ⇒ StrictError(e) - } - def apply[T](future: Future[T]): Deferrable[T] = - future.value match { - case None ⇒ NonStrict[T](future) - case Some(x) ⇒ apply(x) - } - def failed[T](error: Throwable): StrictError = StrictError(error) - - case class Strict[A](value: A) extends Deferrable[A] { - def map[B](f: A ⇒ B)(implicit ec: ExecutionContext) = - try Strict(f(value)) - catch { case NonFatal(error) ⇒ failed(error) } - def flatMap[B](f: A ⇒ Deferrable[B])(implicit ec: ExecutionContext) = - try f(value) - catch { case NonFatal(error) ⇒ failed(error) } - def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext) = f(value) - def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = this - def toFuture = Future.successful(value) - def await(timeout: Duration) = value - } - - case class StrictError(error: Throwable) extends Deferrable[Nothing] { - def map[B](f: Nothing ⇒ B)(implicit ec: ExecutionContext) = this - def flatMap[B](f: Nothing ⇒ Deferrable[B])(implicit ec: ExecutionContext) = this - def foreach(f: Nothing ⇒ Unit)(implicit ec: ExecutionContext) = () - def recover[B](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = - if (pf isDefinedAt error) { - try Strict(pf(error)) - catch { case NonFatal(e) ⇒ failed(e) } - } else this - def toFuture = Future.failed(error) - def await(timeout: Duration) = throw error - } - - case class NonStrict[A](future: Future[A]) extends Deferrable[A] { - def map[B](f: A ⇒ B)(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ NonStrict(future map f) - case Some(x) ⇒ Deferrable(x) map f - } - def flatMap[B](f: A ⇒ Deferrable[B])(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ NonStrict(future flatMap (x ⇒ f(x).toFuture)) - case Some(x) ⇒ Deferrable(x) flatMap f - } - def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ future foreach f - case Some(x) ⇒ Deferrable(x) foreach f - } - def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ NonStrict(future recover pf) - case Some(x) ⇒ Deferrable(x) recover pf - } - def toFuture = future - def await(timeout: Duration) = Await.result(future, timeout) - } -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala b/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala new file mode 100644 index 0000000000..cffc047802 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2009-2014 Typestrict Inc. + */ + +package akka.http.util + +import scala.language.{ implicitConversions, higherKinds } +import scala.util.control.NonFatal +import scala.util.{ Failure, Success, Try } +import scala.collection.generic.CanBuildFrom +import scala.concurrent.duration.Duration +import scala.concurrent._ + +/** + * Provides alternative implementations of the basic transformation operations defined on [[Future]], + * which try to avoid scheduling to an [[ExecutionContext]] if possible, i.e. if the given future + * value is already present. + */ +class FastFuture[A](val future: Future[A]) extends AnyVal { + import FastFuture._ + + def map[B](f: A ⇒ B)(implicit ec: ExecutionContext): Future[B] = { + def strictMap(a: A) = + try FulfilledFuture(f(a)) + catch { case NonFatal(e) ⇒ ErrorFuture(e) } + + future match { + case FulfilledFuture(a) ⇒ strictMap(a) + case x: ErrorFuture ⇒ x + case _ ⇒ future.value match { + case None ⇒ future map f + case Some(Success(a)) ⇒ strictMap(a) + case Some(Failure(e)) ⇒ ErrorFuture(e) + } + } + } + + def flatMap[B](f: A ⇒ Future[B])(implicit ec: ExecutionContext): Future[B] = { + def strictFlatMap(a: A) = + try f(a) + catch { case NonFatal(e) ⇒ ErrorFuture(e) } + + future match { + case FulfilledFuture(a) ⇒ strictFlatMap(a) + case x: ErrorFuture ⇒ x + case _ ⇒ future.value match { + case None ⇒ future flatMap f + case Some(Success(a)) ⇒ strictFlatMap(a) + case Some(Failure(e)) ⇒ ErrorFuture(e) + } + } + } + + def filter(pred: A ⇒ Boolean)(implicit executor: ExecutionContext): Future[A] = + flatMap { + r ⇒ if (pred(r)) future else throw new NoSuchElementException("Future.filter predicate is not satisfied") + } + + def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext): Unit = { + def strictForeach(a: A) = + try f(a) + catch { case NonFatal(e) ⇒ ec.reportFailure(e) } // behave as if the `foreach` had been scheduled + + future match { + case FulfilledFuture(a) ⇒ strictForeach(a) + case x: ErrorFuture ⇒ // nothing to do + case _ ⇒ future.value match { + case None ⇒ future.foreach(f) + case Some(Success(a)) ⇒ strictForeach(a) + case Some(Failure(e)) ⇒ // nothing to do + } + } + } + + def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext): Future[B] = { + def strictRecover(t: Throwable) = + try if (pf isDefinedAt t) FulfilledFuture(pf(t)) else future + catch { case NonFatal(e) ⇒ ErrorFuture(e) } + + future match { + case FulfilledFuture(_) ⇒ future + case ErrorFuture(e) ⇒ strictRecover(e) + case _ ⇒ future.value match { + case None ⇒ future recover pf + case Some(Success(a)) ⇒ FulfilledFuture(a) + case Some(Failure(e)) ⇒ strictRecover(e) + } + } + } +} + +object FastFuture { + def successful[T](value: T): Future[T] = FulfilledFuture(value) + def failed(error: Throwable): Future[Nothing] = ErrorFuture(error) + + private case class FulfilledFuture[+A](a: A) extends Future[A] { + def value = Some(Success(a)) + def onComplete[U](f: Try[A] ⇒ U)(implicit executor: ExecutionContext) = Future.successful(a).onComplete(f) + def isCompleted = true + def result(atMost: Duration)(implicit permit: CanAwait) = a + def ready(atMost: Duration)(implicit permit: CanAwait) = this + } + private case class ErrorFuture(error: Throwable) extends Future[Nothing] { + def value = Some(Failure(error)) + def onComplete[U](f: Try[Nothing] ⇒ U)(implicit executor: ExecutionContext) = Future.failed(error).onComplete(f) + def isCompleted = true + def result(atMost: Duration)(implicit permit: CanAwait) = throw error + def ready(atMost: Duration)(implicit permit: CanAwait) = this + } + + implicit class EnhancedFuture[T](val future: Future[T]) extends AnyVal { + def fast: FastFuture[T] = new FastFuture[T](future) + } + + def sequence[T, M[_] <: TraversableOnce[_]](in: M[Future[T]])(implicit cbf: CanBuildFrom[M[Future[T]], T, M[T]], executor: ExecutionContext): Future[M[T]] = + in.foldLeft(successful(cbf(in))) { + (fr, fa) ⇒ for (r ← fr.fast; a ← fa.asInstanceOf[Future[T]].fast) yield r += a + }.fast.map(_.result()) + + def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(f: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = + if (futures.isEmpty) successful(zero) + else sequence(futures).fast.map(_.foldLeft(zero)(f)) + + def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = + if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection")) + else sequence(futures).fast.map(_ reduceLeft op) + + def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + in.foldLeft(successful(cbf(in))) { (fr, a) ⇒ + val fb = fn(a.asInstanceOf[A]) + for (r ← fr.fast; b ← fb.fast) yield r += b + }.fast.map(_.result()) +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 11d2d98a74..897fe8ac9a 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -167,5 +167,5 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } - def toStrict(entity: HttpEntity): HttpEntity.Strict = entity.toStrict(500.millis).await(1.second) + def toStrict(entity: HttpEntity): HttpEntity.Strict = Await.result(entity.toStrict(500.millis), 1.second) } \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index dd914e883c..707389913a 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -24,6 +24,7 @@ import HttpProtocols._ import StatusCodes._ import HttpEntity._ import ParserOutput.ParseError +import FastFuture._ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -397,7 +398,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { .map { x ⇒ Flow { x match { - case Right(request) ⇒ compactEntity(request.entity).map(x ⇒ Right(request.withEntity(x))).toFuture + case Right(request) ⇒ compactEntity(request.entity).fast.map(x ⇒ Right(request.withEntity(x))) case Left(error) ⇒ Future.successful(Left(error)) } }.toPublisher() @@ -410,16 +411,16 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { protected def parserSettings: ParserSettings = ParserSettings(system) protected def newParser = new HttpRequestParser(parserSettings, false)() - private def compactEntity(entity: RequestEntity): Deferrable[RequestEntity] = + private def compactEntity(entity: RequestEntity): Future[RequestEntity] = entity match { - case x: Chunked ⇒ compactEntityChunks(x.chunks).map(compacted ⇒ x.copy(chunks = compacted)) + case x: Chunked ⇒ compactEntityChunks(x.chunks).fast.map(compacted ⇒ x.copy(chunks = compacted)) case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Deferrable[Publisher[ChunkStreamPart]] = - Deferrable(Flow(data).grouped(1000).toFuture()) - .map(publisher(_: _*)) - .recover { case _: NoSuchElementException ⇒ publisher() } + private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] = + Flow(data).grouped(1000).toFuture() + .fast.map(publisher(_: _*)) + .fast.recover { case _: NoSuchElementException ⇒ publisher() } def prep(response: String) = response.stripMarginWithNewline("\r\n") } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index d16cfb5f0a..897086d576 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -24,6 +24,7 @@ import HttpProtocols._ import StatusCodes._ import HttpEntity._ import ParserOutput.ParseError +import FastFuture._ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -249,8 +250,8 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { }.map { x ⇒ Flow { x match { - case Right(response) ⇒ compactEntity(response.entity).map(x ⇒ Right(response.withEntity(x))).toFuture - case Left(error) ⇒ Future.successful(Left(error.info.formatPretty)) + case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x))) + case Left(error) ⇒ FastFuture.successful(Left(error.info.formatPretty)) } }.toPublisher() } @@ -266,16 +267,16 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { parser } - private def compactEntity(entity: ResponseEntity): Deferrable[ResponseEntity] = + private def compactEntity(entity: ResponseEntity): Future[ResponseEntity] = entity match { - case x: HttpEntity.Chunked ⇒ compactEntityChunks(x.chunks).map(compacted ⇒ x.copy(chunks = compacted)) + case x: HttpEntity.Chunked ⇒ compactEntityChunks(x.chunks).fast.map(compacted ⇒ x.copy(chunks = compacted)) case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Deferrable[Publisher[ChunkStreamPart]] = - Deferrable(Flow(data).grouped(1000).toFuture()) - .map(publisher(_: _*)) - .recover { case _: NoSuchElementException ⇒ publisher() } + private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] = + Flow(data).grouped(1000).toFuture() + .fast.map(publisher(_: _*)) + .fast.recover { case _: NoSuchElementException ⇒ publisher() } def prep(response: String) = response.stripMarginWithNewline("\r\n") diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index e5ff64df74..c1a711a0dd 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -4,19 +4,20 @@ package akka.http.model -import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } -import akka.util.ByteString -import org.scalatest.matchers.Matcher -import akka.stream.scaladsl.Flow -import akka.http.model.HttpEntity._ -import org.reactivestreams.Publisher -import akka.actor.ActorSystem -import akka.stream.FlowMaterializer -import akka.stream.impl.SynchronousPublisherFromIterable +import java.util.concurrent.TimeoutException import com.typesafe.config.{ ConfigFactory, Config } +import org.reactivestreams.Publisher import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ -import java.util.concurrent.TimeoutException +import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } +import org.scalatest.matchers.Matcher +import akka.util.ByteString +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import akka.stream.FlowMaterializer +import akka.stream.impl.SynchronousPublisherFromIterable +import akka.http.model.HttpEntity._ +import akka.http.util.FastFuture._ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { val tpe: ContentType = ContentTypes.`application/octet-stream` @@ -76,7 +77,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { val neverCompleted = Promise[ByteString]() val stream: Publisher[ByteString] = Flow(neverCompleted.future).toPublisher() intercept[TimeoutException] { - Default(tpe, 42, stream).toStrict(100.millis).await(150.millis) + Await.result(Default(tpe, 42, stream).toStrict(100.millis), 150.millis) }.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data") } } @@ -91,5 +92,5 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { } def strictifyTo(strict: Strict): Matcher[HttpEntity] = - equal(strict).matcher[Strict].compose(_.toStrict(250.millis).await(250.millis)) + equal(strict).matcher[Strict].compose(x ⇒ Await.result(x.toStrict(250.millis), 250.millis)) } diff --git a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala index 5b8bfb1188..abeef6a1bc 100644 --- a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala @@ -4,16 +4,16 @@ package io.akka.integrationtest.http -import akka.actor.ActorSystem -import akka.http.model._ -import akka.http.model.ContentTypes -import akka.http.model.headers._ -import akka.http.model.parser.HeaderParser -import akka.stream.scaladsl2._ -import akka.util.ByteString import com.typesafe.config.{ ConfigFactory, Config } -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import scala.concurrent.Await import scala.concurrent.duration._ +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import akka.util.ByteString +import akka.actor.ActorSystem +import akka.http.model.parser.HeaderParser +import akka.http.model._ +import akka.stream.scaladsl2._ +import headers._ /** * Integration test for external HTTP libraries that are built on top of @@ -88,7 +88,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte // Finally convert the body into an Array[Byte]. - val entityBytes: Array[Byte] = request.entity.toStrict(1.second).await(2.seconds).data.toArray + val entityBytes: Array[Byte] = Await.result(request.entity.toStrict(1.second), 2.seconds).data.toArray entityBytes.to[Seq] shouldEqual ByteString("hello").to[Seq] } diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala index e938171433..69867285df 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala @@ -6,6 +6,7 @@ package akka.http.testkit import com.typesafe.config.{ ConfigFactory, Config } import scala.collection.immutable +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.DynamicVariable import scala.reflect.ClassTag @@ -13,11 +14,12 @@ import org.scalatest.Suite import akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.http.client.RequestBuilding -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.server._ import akka.http.unmarshalling._ import akka.http.model._ import headers.Host +import FastFuture._ trait RouteTest extends RequestBuilding with RouteTestResultComponent { this: TestFrameworkInterface ⇒ @@ -57,11 +59,11 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { def chunks: immutable.Seq[HttpEntity.ChunkStreamPart] = result.chunks def entityAs[T: FromEntityUnmarshaller: ClassTag](implicit timeout: Duration = 1.second): T = { def msg(e: Throwable) = s"Could not unmarshal entity to type '${implicitly[ClassTag[T]]}' for `entityAs` assertion: $e\n\nResponse was: $response" - Unmarshal(entity).to[T].recover[T] { case error ⇒ failTest(msg(error)) }.await(timeout) + Await.result(Unmarshal(entity).to[T].fast.recover[T] { case error ⇒ failTest(msg(error)) }, timeout) } def responseAs[T: FromResponseUnmarshaller: ClassTag](implicit timeout: Duration = 1.second): T = { def msg(e: Throwable) = s"Could not unmarshal response to type '${implicitly[ClassTag[T]]}' for `responseAs` assertion: $e\n\nResponse was: $response" - Unmarshal(response).to[T].recover[T] { case error ⇒ failTest(msg(error)) }.await(timeout) + Await.result(Unmarshal(response).to[T].fast.recover[T] { case error ⇒ failTest(msg(error)) }, timeout) } def contentType: ContentType = entity.contentType def mediaType: MediaType = contentType.mediaType @@ -116,7 +118,7 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { } implicit def injectIntoRoute(implicit timeout: RouteTestTimeout, setup: RoutingSetup, defaultHostInfo: DefaultHostInfo) = - new TildeArrow[RequestContext, Deferrable[RouteResult]] { + new TildeArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult def apply(request: HttpRequest, route: Route): Out = { val routeTestResult = new RouteTestResult(timeout.duration)(setup.materializer) @@ -133,7 +135,7 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { val semiSealedRoute = // sealed for exceptions but not for rejections Directives.handleExceptions(sealedExceptionHandler) { route } val deferrableRouteResult = semiSealedRoute(ctx) - deferrableRouteResult.foreach(routeTestResult.handleResult)(setup.executor) + deferrableRouteResult.fast.foreach(routeTestResult.handleResult)(setup.executor) routeTestResult } } diff --git a/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala b/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala index 1498640183..cff935e8d2 100644 --- a/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala @@ -4,8 +4,10 @@ package akka.http.marshalling +import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ Matchers, FreeSpec } +import akka.http.util.FastFuture._ import akka.http.model.parser.HeaderParser import akka.http.model._ import headers._ @@ -127,10 +129,9 @@ class ContentNegotiationSpec extends FreeSpec with Matchers { case ct @ ContentType(mt, Some(cs)) ⇒ Marshaller.withFixedCharset(mt, cs)((s: String) ⇒ HttpEntity(ct, s)) case ContentType(mt, None) ⇒ Marshaller.withOpenCharset(mt)((s: String, cs) ⇒ HttpEntity(ContentType(mt, cs), s)) } - Marshal("foo").toResponseFor(request) - .map(response ⇒ Some(response.entity.contentType)) - .recover { case _: Marshal.UnacceptableResponseContentTypeException ⇒ None } - .await(1.second) + Await.result(Marshal("foo").toResponseFor(request) + .fast.map(response ⇒ Some(response.entity.contentType)) + .fast.recover { case _: Marshal.UnacceptableResponseContentTypeException ⇒ None }, 1.second) } } diff --git a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala index 8f69888bf5..1d2199bc54 100644 --- a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala @@ -5,6 +5,7 @@ package akka.http.marshalling import scala.collection.immutable.ListMap +import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem @@ -153,7 +154,7 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with override def afterAll() = system.shutdown() def marshal[T: ToEntityMarshallers](value: T): HttpEntity.Strict = - Marshal(value).to[HttpEntity].await(1.second).toStrict(1.second).await(1.second) + Await.result(Await.result(Marshal(value).to[HttpEntity], 1.second).toStrict(1.second), 1.second) protected class FixedRandom extends java.util.Random { override def nextBytes(array: Array[Byte]): Unit = "my-stable-boundary".getBytes("UTF-8").copyToArray(array) diff --git a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala index 152452abf2..df6afd5cf2 100644 --- a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala @@ -6,7 +6,7 @@ package akka.http.unmarshalling import scala.xml.NodeSeq import scala.concurrent.duration._ -import scala.concurrent.Await +import scala.concurrent.{ Future, Await } import org.scalatest.matchers.Matcher import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem @@ -16,6 +16,7 @@ import akka.http.model._ import akka.http.util._ import headers._ import MediaTypes._ +import FastFuture._ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName) @@ -34,7 +35,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { Unmarshal(HttpEntity("árvíztűrő ütvefúrógép")).to[Array[Char]] should evaluateTo("árvíztűrő ütvefúrógép".toCharArray) } "nodeSeqUnmarshaller should unmarshal `text/xml` content in UTF-8 to NodeSeqs" in { - Unmarshal(HttpEntity(`text/xml`, "Hällö")).to[NodeSeq].map(_.text) should evaluateTo("Hällö") + Unmarshal(HttpEntity(`text/xml`, "Hällö")).to[NodeSeq].fast.map(_.text) should evaluateTo("Hällö") } } @@ -78,7 +79,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } "multipartContentUnmarshaller should reject illegal multipart content" in { - val mpc = Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", + val mpc = Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", """--- |Content-type: text/plain; charset=UTF8 |Content-type: application/json @@ -86,8 +87,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { | |test@there.com |-----""".stripMarginWithNewline("\r\n"))) - .to[MultipartContent] - .await(1.second) + .to[MultipartContent], 1.second) Await.result(Flow(mpc.parts).toFuture().failed, 1.second).getMessage shouldEqual "multipart part must not contain more than one Content-Type header" } @@ -160,26 +160,27 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { override def afterAll() = system.shutdown() - def evaluateTo[T](value: T): Matcher[Deferrable[T]] = - equal(value).matcher[T] compose (_.await(1.second)) + def evaluateTo[T](value: T): Matcher[Future[T]] = + equal(value).matcher[T] compose (x ⇒ Await.result(x, 1.second)) - def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Deferrable[T]] = - equal(parts).matcher[Seq[BodyPart]] compose { - _.flatMap(x ⇒ Deferrable(Flow(x.parts).grouped(100).toFuture())) - .recover { case _: NoSuchElementException ⇒ Nil } - .await(1.second) + def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Future[T]] = + equal(parts).matcher[Seq[BodyPart]] compose { x ⇒ + Await.result(x + .fast.flatMap(x ⇒ Flow(x.parts).grouped(100).toFuture()) + .fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second) } - def haveFormData(fields: (String, BodyPart)*): Matcher[Deferrable[MultipartFormData]] = - equal(fields).matcher[Seq[(String, BodyPart)]] compose { - _.flatMap(x ⇒ Deferrable(Flow(x.parts).grouped(100).toFuture())) - .recover { case _: NoSuchElementException ⇒ Nil } - .map { + def haveFormData(fields: (String, BodyPart)*): Matcher[Future[MultipartFormData]] = + equal(fields).matcher[Seq[(String, BodyPart)]] compose { x ⇒ + Await.result(x + .fast.flatMap(x ⇒ Flow(x.parts).grouped(100).toFuture()) + .fast.recover { case _: NoSuchElementException ⇒ Nil } + .fast.map { _ map { part ⇒ part.headers.collectFirst { case `Content-Disposition`(ContentDispositionTypes.`form-data`, params) ⇒ params("name") }.get -> part } - }.await(1.second) + }, 1.second) } } diff --git a/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala b/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala index a31bb93170..2a1a8ba4b9 100644 --- a/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala +++ b/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala @@ -5,7 +5,7 @@ package akka.http.client import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Await, ExecutionContext } import scala.concurrent.duration._ import scala.reflect.ClassTag import akka.util.Timeout @@ -45,7 +45,7 @@ trait RequestBuilding extends TransformerPipelineSupport { content match { case None ⇒ apply(uri, HttpEntity.Empty) case Some(value) ⇒ - val entity = Marshal(value).to[RequestEntity].await(timeout.duration) + val entity = Await.result(Marshal(value).to[RequestEntity], timeout.duration) apply(uri, entity) } diff --git a/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala index d36788102c..c390a9ce7e 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala @@ -6,16 +6,17 @@ package akka.http.marshalling import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Try, Failure, Success } -import akka.http.util.Deferrable +import akka.http.util.FastFuture +import FastFuture._ trait GenericMarshallers extends LowPriorityToResponseMarshallerImplicits { - implicit def throwableMarshaller[T]: Marshaller[Throwable, T] = Marshaller(Deferrable.failed) + implicit def throwableMarshaller[T]: Marshaller[Throwable, T] = Marshaller(FastFuture.failed) implicit def optionMarshaller[A, B](implicit m: Marshaller[A, B], empty: EmptyValue[B]): Marshaller[Option[A], B] = Marshaller { case Some(value) ⇒ m(value) - case None ⇒ Deferrable(Marshalling.Opaque(() ⇒ empty.emptyValue)) + case None ⇒ FastFuture.successful(Marshalling.Opaque(() ⇒ empty.emptyValue)) } implicit def eitherMarshaller[A1, A2, B](implicit m1: Marshaller[A1, B], m2: Marshaller[A2, B]): Marshaller[Either[A1, A2], B] = @@ -24,16 +25,13 @@ trait GenericMarshallers extends LowPriorityToResponseMarshallerImplicits { case Right(a2) ⇒ m2(a2) } - implicit def deferrableMarshaller[A, B](implicit m: Marshaller[A, B], ec: ExecutionContext): Marshaller[Deferrable[A], B] = - Marshaller(_ flatMap m.apply) - implicit def futureMarshaller[A, B](implicit m: Marshaller[A, B], ec: ExecutionContext): Marshaller[Future[A], B] = - Marshaller(Deferrable(_) flatMap m.apply) + Marshaller(_.fast.flatMap(m(_))) implicit def tryMarshaller[A, B](implicit m: Marshaller[A, B]): Marshaller[Try[A], B] = Marshaller { case Success(value) ⇒ m(value) - case Failure(error) ⇒ Deferrable.failed(error) + case Failure(error) ⇒ FastFuture.failed(error) } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala b/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala index 3af8d1b30d..1a031cbeb4 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala @@ -6,9 +6,10 @@ package akka.http.marshalling import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model.HttpCharsets._ import akka.http.model._ +import FastFuture._ object Marshal { def apply[T](value: T): Marshal[T] = new Marshal(value) @@ -23,8 +24,8 @@ class Marshal[A](val value: A) { * Marshals `value` to the first of the available `Marshallers` for `A` and `B`. * If the marshaller is flexible with regard to the used charset `UTF-8` is chosen. */ - def to[B](implicit m: Marshallers[A, B], ec: ExecutionContext): Deferrable[B] = - m.marshallers.head(value) map { + def to[B](implicit m: Marshallers[A, B], ec: ExecutionContext): Future[B] = + m.marshallers.head(value).fast.map { case Marshalling.WithFixedCharset(_, _, marshal) ⇒ marshal() case Marshalling.WithOpenCharset(_, marshal) ⇒ marshal(HttpCharsets.`UTF-8`) case Marshalling.Opaque(marshal) ⇒ marshal() @@ -33,53 +34,50 @@ class Marshal[A](val value: A) { /** * Marshals `value` to an `HttpResponse` for the given `HttpRequest` with full content-negotiation. */ - def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshallers[A], ec: ExecutionContext): Deferrable[HttpResponse] = { + def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshallers[A], ec: ExecutionContext): Future[HttpResponse] = { import akka.http.marshalling.Marshal._ val mediaRanges = request.acceptedMediaRanges // cache for performance val charsetRanges = request.acceptedCharsetRanges // cache for performance def qValueMT(mediaType: MediaType) = request.qValueForMediaType(mediaType, mediaRanges) def qValueCS(charset: HttpCharset) = request.qValueForCharset(charset, charsetRanges) - // TODO: avoid scheduling if possible - val marshallingFutures = m.marshallers.map(_(value).toFuture) - val marshallingsFuture = Future.sequence(marshallingFutures) - val marshallingFuture = - marshallingsFuture map { marshallings ⇒ - def weight(mt: MediaType, cs: HttpCharset): Float = math.min(qValueMT(mt), qValueCS(cs)) - val defaultMarshallingWeight: MarshallingWeight = - new MarshallingWeight(0f, { () ⇒ - val supportedContentTypes = marshallings collect { - case Marshalling.WithFixedCharset(mt, cs, _) ⇒ ContentType(mt, cs) - case Marshalling.WithOpenCharset(mt, _) ⇒ ContentType(mt) - } - throw new UnacceptableResponseContentTypeException(supportedContentTypes) - }) - val best = marshallings.foldLeft(defaultMarshallingWeight) { - case (acc, Marshalling.WithFixedCharset(mt, cs, marshal)) ⇒ + val marshallingFutures = m.marshallers.map(_(value)) + val marshallingsFuture = FastFuture.sequence(marshallingFutures) + marshallingsFuture.fast.map { marshallings ⇒ + def weight(mt: MediaType, cs: HttpCharset): Float = math.min(qValueMT(mt), qValueCS(cs)) + val defaultMarshallingWeight: MarshallingWeight = + new MarshallingWeight(0f, { () ⇒ + val supportedContentTypes = marshallings collect { + case Marshalling.WithFixedCharset(mt, cs, _) ⇒ ContentType(mt, cs) + case Marshalling.WithOpenCharset(mt, _) ⇒ ContentType(mt) + } + throw new UnacceptableResponseContentTypeException(supportedContentTypes) + }) + val best = marshallings.foldLeft(defaultMarshallingWeight) { + case (acc, Marshalling.WithFixedCharset(mt, cs, marshal)) ⇒ + val w = weight(mt, cs) + if (w > acc.weight) new MarshallingWeight(w, marshal) else acc + + case (acc, Marshalling.WithOpenCharset(mt, marshal)) ⇒ + def withCharset(cs: HttpCharset) = { val w = weight(mt, cs) - if (w > acc.weight) new MarshallingWeight(w, marshal) else acc + if (w > acc.weight) new MarshallingWeight(w, () ⇒ marshal(cs)) else acc + } + // logic for choosing the charset adapted from http://tools.ietf.org/html/rfc7231#section-5.3.3 + if (qValueCS(`UTF-8`) == 1f) withCharset(`UTF-8`) // prefer UTF-8 if fully accepted + else charsetRanges match { // ranges are sorted by descending q-value, + case (HttpCharsetRange.One(cs, qValue)) :: _ ⇒ // so we only need to look at the first one + if (qValue == 1f) withCharset(cs) // if the client has high preference for this charset, pick it + else if (qValueCS(`ISO-8859-1`) == 1f) withCharset(`ISO-8859-1`) // give some more preference to `ISO-8859-1` + else if (qValue > 0f) withCharset(cs) // ok, simply choose the first one if the client doesn't reject it + else acc + case _ ⇒ acc + } - case (acc, Marshalling.WithOpenCharset(mt, marshal)) ⇒ - def withCharset(cs: HttpCharset) = { - val w = weight(mt, cs) - if (w > acc.weight) new MarshallingWeight(w, () ⇒ marshal(cs)) else acc - } - // logic for choosing the charset adapted from http://tools.ietf.org/html/rfc7231#section-5.3.3 - if (qValueCS(`UTF-8`) == 1f) withCharset(`UTF-8`) // prefer UTF-8 if fully accepted - else charsetRanges match { // ranges are sorted by descending q-value, - case (HttpCharsetRange.One(cs, qValue)) :: _ ⇒ // so we only need to look at the first one - if (qValue == 1f) withCharset(cs) // if the client has high preference for this charset, pick it - else if (qValueCS(`ISO-8859-1`) == 1f) withCharset(`ISO-8859-1`) // give some more preference to `ISO-8859-1` - else if (qValue > 0f) withCharset(cs) // ok, simply choose the first one if the client doesn't reject it - else acc - case _ ⇒ acc - } - - case (acc, Marshalling.Opaque(marshal)) ⇒ - if (acc.weight == 0f) new MarshallingWeight(Float.MinPositiveValue, marshal) else acc - } - best.marshal() + case (acc, Marshalling.Opaque(marshal)) ⇒ + if (acc.weight == 0f) new MarshallingWeight(Float.MinPositiveValue, marshal) else acc } - Deferrable(marshallingFuture) + best.marshal() + } } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala index a51ebef5c5..5ab98ca5f3 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala @@ -5,10 +5,11 @@ package akka.http.marshalling import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.xml.NodeSeq -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model._ +import FastFuture._ import MediaTypes._ case class Marshallers[-A, +B](marshallers: immutable.Seq[Marshaller[A, B]]) { @@ -39,10 +40,10 @@ sealed abstract class SingleMarshallerMarshallers { } sealed trait Marshaller[-A, +B] { outer ⇒ - def apply(value: A): Deferrable[Marshalling[B]] + def apply(value: A): Future[Marshalling[B]] def map[C](f: B ⇒ C)(implicit ec: ExecutionContext): Marshaller[A, C] = - Marshaller[A, C](value ⇒ outer(value) map (_ map f)) + Marshaller[A, C](value ⇒ outer(value).fast.map(_ map f)) /** * Reuses this Marshaller's logic to produce a new Marshaller from another type `C` which overrides @@ -51,7 +52,7 @@ sealed trait Marshaller[-A, +B] { outer ⇒ def wrap[C, D >: B](mediaType: MediaType)(f: C ⇒ A)(implicit ec: ExecutionContext, mto: MediaTypeOverrider[D]): Marshaller[C, D] = Marshaller { value ⇒ import Marshalling._ - outer(f(value)) map { + outer(f(value)).fast.map { case WithFixedCharset(_, cs, marshal) ⇒ WithFixedCharset(mediaType, cs, () ⇒ mto(marshal(), mediaType)) case WithOpenCharset(_, marshal) ⇒ WithOpenCharset(mediaType, cs ⇒ mto(marshal(cs), mediaType)) case Opaque(marshal) ⇒ Opaque(() ⇒ mto(marshal(), mediaType)) @@ -67,19 +68,19 @@ object Marshaller with PredefinedToResponseMarshallers with PredefinedToRequestMarshallers { - def apply[A, B](f: A ⇒ Deferrable[Marshalling[B]]): Marshaller[A, B] = + def apply[A, B](f: A ⇒ Future[Marshalling[B]]): Marshaller[A, B] = new Marshaller[A, B] { def apply(value: A) = f(value) } def withFixedCharset[A, B](mediaType: MediaType, charset: HttpCharset)(marshal: A ⇒ B): Marshaller[A, B] = - Marshaller { value ⇒ Deferrable(Marshalling.WithFixedCharset(mediaType, charset, () ⇒ marshal(value))) } + Marshaller { value ⇒ FastFuture.successful(Marshalling.WithFixedCharset(mediaType, charset, () ⇒ marshal(value))) } def withOpenCharset[A, B](mediaType: MediaType)(marshal: (A, HttpCharset) ⇒ B): Marshaller[A, B] = - Marshaller { value ⇒ Deferrable(Marshalling.WithOpenCharset(mediaType, charset ⇒ marshal(value, charset))) } + Marshaller { value ⇒ FastFuture.successful(Marshalling.WithOpenCharset(mediaType, charset ⇒ marshal(value, charset))) } def opaque[A, B](marshal: A ⇒ B): Marshaller[A, B] = - Marshaller { value ⇒ Deferrable(Marshalling.Opaque(() ⇒ marshal(value))) } + Marshaller { value ⇒ FastFuture.successful(Marshalling.Opaque(() ⇒ marshal(value))) } } /** diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 97a97362af..6b2b014766 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -12,6 +12,7 @@ import akka.stream.{ FlattenStrategy, FlowMaterializer } import akka.stream.scaladsl.Flow import akka.http.engine.rendering.BodyPartRenderer import akka.http.util.actorSystem +import akka.http.util.FastFuture._ import akka.http.model._ import MediaTypes._ @@ -47,7 +48,7 @@ trait MultipartMarshallers { implicit def multipartFormDataMarshaller(implicit mcm: ToEntityMarshaller[MultipartContent], ec: ExecutionContext): ToEntityMarshaller[MultipartFormData] = Marshaller { value ⇒ - mcm(MultipartContent(value.parts)) map { + mcm(MultipartContent(value.parts)).fast.map { case Marshalling.WithOpenCharset(mt, marshal) ⇒ val mediaType = `multipart/form-data` withBoundary mt.params("boundary") Marshalling.WithOpenCharset(mediaType, cs ⇒ MediaTypeOverrider.forEntity(marshal(cs), mediaType)) diff --git a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala index 657f034dd7..0472ab7ee3 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala @@ -9,7 +9,7 @@ import scala.concurrent.ExecutionContext import scala.xml.NodeSeq import akka.http.model.MediaTypes._ import akka.http.model._ -import akka.http.util.{ Deferrable, StringRendering } +import akka.http.util.{ FastFuture, StringRendering } import akka.util.ByteString trait PredefinedToEntityMarshallers extends MultipartMarshallers { @@ -63,7 +63,7 @@ trait PredefinedToEntityMarshallers extends MultipartMarshallers { implicit val HttpEntityMarshaller: ToEntityMarshaller[MessageEntity] = Marshaller { value ⇒ // since we don't want to recode we simply ignore the charset determined by content negotiation here - Deferrable(Marshalling.WithOpenCharset(value.contentType.mediaType, _ ⇒ value)) + FastFuture.successful(Marshalling.WithOpenCharset(value.contentType.mediaType, _ ⇒ value)) } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala index 4de715d2f2..39a2801d25 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala @@ -6,8 +6,9 @@ package akka.http.marshalling import scala.collection.immutable import scala.concurrent.ExecutionContext -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model._ +import FastFuture._ trait PredefinedToRequestMarshallers { private type TRM[T] = ToRequestMarshaller[T] // brevity alias @@ -15,7 +16,7 @@ trait PredefinedToRequestMarshallers { implicit val fromRequest: TRM[HttpRequest] = Marshaller.opaque(identity) implicit def fromUri(implicit ec: ExecutionContext): TRM[Uri] = - Marshaller { uri ⇒ Deferrable(Marshalling.Opaque(() ⇒ HttpRequest(uri = uri))) } + Marshaller { uri ⇒ FastFuture.successful(Marshalling.Opaque(() ⇒ HttpRequest(uri = uri))) } implicit def fromMethodAndUriAndValue[S, T](implicit mt: ToEntityMarshaller[T], ec: ExecutionContext): TRM[(HttpMethod, Uri, T)] = @@ -23,7 +24,7 @@ trait PredefinedToRequestMarshallers { implicit def fromMethodAndUriAndHeadersAndValue[T](implicit mt: ToEntityMarshaller[T], ec: ExecutionContext): TRM[(HttpMethod, Uri, immutable.Seq[HttpHeader], T)] = - Marshaller { case (m, u, h, v) ⇒ mt(v) map (_ map (HttpRequest(m, u, h, _))) } + Marshaller { case (m, u, h, v) ⇒ mt(v).fast.map(_ map (HttpRequest(m, u, h, _))) } } object PredefinedToRequestMarshallers extends PredefinedToRequestMarshallers diff --git a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala index c6b5cd3c84..19cf405aa8 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala @@ -6,6 +6,7 @@ package akka.http.marshalling import scala.collection.immutable import scala.concurrent.ExecutionContext +import akka.http.util.FastFuture._ import akka.http.model.MediaTypes._ import akka.http.model._ @@ -33,7 +34,7 @@ trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImp implicit def fromStatusCodeAndHeadersAndValue[T](implicit mt: ToEntityMarshaller[T], ec: ExecutionContext): TRM[(StatusCode, immutable.Seq[HttpHeader], T)] = - Marshaller { case (status, headers, value) ⇒ mt(value).map(_ map (HttpResponse(status, headers, _))) } + Marshaller { case (status, headers, value) ⇒ mt(value).fast.map(_ map (HttpResponse(status, headers, _))) } } trait LowPriorityToResponseMarshallerImplicits { diff --git a/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala b/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala index c1c0ebd674..c58e66dd59 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala @@ -4,8 +4,7 @@ package akka.http.marshalling -import scala.concurrent.ExecutionContext -import akka.http.util.Deferrable +import scala.concurrent.{ Future, ExecutionContext } import akka.http.model._ /** Something that can later be marshalled into a response */ @@ -14,7 +13,7 @@ trait ToResponseMarshallable { def value: T implicit def marshaller: ToResponseMarshaller[T] - def apply(request: HttpRequest)(implicit ec: ExecutionContext): Deferrable[HttpResponse] = + def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = Marshal(value).toResponseFor(request) } diff --git a/akka-http/src/main/scala/akka/http/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/server/RequestContext.scala index e7ccd74ae4..d30aa36363 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContext.scala @@ -5,10 +5,9 @@ package akka.http.server import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter import akka.stream.FlowMaterializer -import akka.http.util.Deferrable import akka.http.marshalling.ToResponseMarshallable import akka.http.model._ @@ -49,19 +48,19 @@ trait RequestContext { /** * Completes the request with the given ToResponseMarshallable. */ - def complete(obj: ToResponseMarshallable): Deferrable[RouteResult] + def complete(obj: ToResponseMarshallable): Future[RouteResult] /** * Rejects the request with the given rejections. */ - def reject(rejections: Rejection*): Deferrable[RouteResult] + def reject(rejections: Rejection*): Future[RouteResult] /** * Bubbles the given error up the response chain where it is dealt with by the closest `handleExceptions` * directive and its ``ExceptionHandler``, unless the error is a ``RejectionError``. In this case the * wrapped rejection is unpacked and "executed". */ - def fail(error: Throwable): Deferrable[RouteResult] + def fail(error: Throwable): Future[RouteResult] /** * Returns a copy of this context with the new HttpRequest. @@ -96,7 +95,7 @@ trait RequestContext { /** * Returns a copy of this context with the given response transformation function chained into the response chain. */ - def withRouteResponseFlatMapped(f: RouteResult ⇒ Deferrable[RouteResult]): RequestContext + def withRouteResponseFlatMapped(f: RouteResult ⇒ Future[RouteResult]): RequestContext /** * Returns a copy of this context with the given response transformation function chained into the response chain. @@ -121,17 +120,17 @@ trait RequestContext { /** * Returns a copy of this context with the given rejection handling function chained into the response chain. */ - def withRejectionHandling(f: List[Rejection] ⇒ Deferrable[RouteResult]): RequestContext + def withRejectionHandling(f: List[Rejection] ⇒ Future[RouteResult]): RequestContext /** * Returns a copy of this context with the given exception handling function chained into the response chain. */ - def withExceptionHandling(pf: PartialFunction[Throwable, Deferrable[RouteResult]]): RequestContext + def withExceptionHandling(pf: PartialFunction[Throwable, Future[RouteResult]]): RequestContext /** * Returns a copy of this context with the given function handling a part of the response space. */ - def withRouteResponseHandling(pf: PartialFunction[RouteResult, Deferrable[RouteResult]]): RequestContext + def withRouteResponseHandling(pf: PartialFunction[RouteResult, Future[RouteResult]]): RequestContext /** * Removes a potentially existing Accept header from the request headers. diff --git a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala index d2691d0cab..bd2795472b 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala @@ -5,12 +5,13 @@ package akka.http.server import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter import akka.stream.FlowMaterializer import akka.http.marshalling.ToResponseMarshallable -import akka.http.util.{ Deferrable, identityFunc } +import akka.http.util.{ FastFuture, identityFunc } import akka.http.model._ +import FastFuture._ /** * INTERNAL API @@ -21,7 +22,7 @@ private[http] class RequestContextImpl( val executionContext: ExecutionContext, val flowMaterializer: FlowMaterializer, val log: LoggingAdapter, - finish: RouteResult ⇒ Deferrable[RouteResult] = Deferrable(_)) extends RequestContext { + finish: RouteResult ⇒ Future[RouteResult] = FastFuture.successful) extends RequestContext { def this(request: HttpRequest, log: LoggingAdapter)(implicit ec: ExecutionContext, fm: FlowMaterializer) = this(request, request.uri.path, ec, fm, log) @@ -31,19 +32,19 @@ private[http] class RequestContextImpl( log: LoggingAdapter): RequestContext = copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log) - override def complete(trm: ToResponseMarshallable): Deferrable[RouteResult] = + override def complete(trm: ToResponseMarshallable): Future[RouteResult] = trm(request)(executionContext) - .map(res ⇒ RouteResult.complete(res))(executionContext) - .recover { + .fast.map(res ⇒ RouteResult.complete(res))(executionContext) + .fast.recover { case RejectionError(rej) ⇒ RouteResult.rejected(rej :: Nil) case error ⇒ RouteResult.failure(error) }(executionContext) - .flatMap(finish)(executionContext) + .fast.flatMap(finish)(executionContext) - override def reject(rejections: Rejection*): Deferrable[RouteResult] = + override def reject(rejections: Rejection*): Future[RouteResult] = finish(RouteResult.rejected(rejections.toList)) - override def fail(error: Throwable): Deferrable[RouteResult] = + override def fail(error: Throwable): Future[RouteResult] = finish(RouteResult.failure(error)) override def withRequest(req: HttpRequest): RequestContext = @@ -64,8 +65,8 @@ private[http] class RequestContextImpl( override def withRouteResponseMappedPF(pf: PartialFunction[RouteResult, RouteResult]): RequestContext = withRouteResponseMapped(pf.applyOrElse(_, identityFunc[RouteResult])) - override def withRouteResponseFlatMapped(f: RouteResult ⇒ Deferrable[RouteResult]): RequestContext = - copy(finish = rr ⇒ f(rr).flatMap(finish)(executionContext)) + override def withRouteResponseFlatMapped(f: RouteResult ⇒ Future[RouteResult]): RequestContext = + copy(finish = rr ⇒ f(rr).fast.flatMap(finish)(executionContext)) override def withHttpResponseMapped(f: HttpResponse ⇒ HttpResponse): RequestContext = withRouteResponseMappedPF { @@ -83,21 +84,21 @@ private[http] class RequestContextImpl( case RouteResult.Rejected(rejs) ⇒ RouteResult.rejected(f(rejs)) } - override def withRejectionHandling(f: List[Rejection] ⇒ Deferrable[RouteResult]): RequestContext = + override def withRejectionHandling(f: List[Rejection] ⇒ Future[RouteResult]): RequestContext = withRouteResponseHandling { case RouteResult.Rejected(rejs) ⇒ // `finish` is *not* chained in here, because the user already applied it when creating the result of f f(rejs) } - override def withExceptionHandling(pf: PartialFunction[Throwable, Deferrable[RouteResult]]): RequestContext = + override def withExceptionHandling(pf: PartialFunction[Throwable, Future[RouteResult]]): RequestContext = withRouteResponseHandling { case RouteResult.Failure(error) if pf isDefinedAt error ⇒ // `finish` is *not* chained in here, because the user already applied it when creating the result of pf pf(error) } - def withRouteResponseHandling(pf: PartialFunction[RouteResult, Deferrable[RouteResult]]): RequestContext = + def withRouteResponseHandling(pf: PartialFunction[RouteResult, Future[RouteResult]]): RequestContext = copy(finish = pf.applyOrElse(_, finish)) override def withContentNegotiationDisabled: RequestContext = @@ -108,6 +109,6 @@ private[http] class RequestContextImpl( executionContext: ExecutionContext = executionContext, flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, - finish: RouteResult ⇒ Deferrable[RouteResult] = finish) = + finish: RouteResult ⇒ Future[RouteResult] = finish) = new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, finish) } diff --git a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala index 9256f41657..4d98c81fd5 100644 --- a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala +++ b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala @@ -4,14 +4,13 @@ package akka.http.server -import akka.stream.FlowMaterializer - import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success } +import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model.{ HttpRequest, HttpResponse } import akka.http.Http +import FastFuture._ /** * The main entry point into the Scala routing DSL. @@ -24,7 +23,6 @@ trait ScalaRoutingDSL extends Directives { def withRoute(route: Route): R def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): R def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): R - def withHandler(handler: HttpRequest ⇒ Deferrable[HttpResponse]): R } def handleConnections(bindingFuture: Future[Http.ServerBinding])(implicit ec: ExecutionContext, fm: FlowMaterializer, @@ -33,7 +31,6 @@ trait ScalaRoutingDSL extends Directives { def withRoute(route: Route) = afterBinding(_ withRoute route) def withSyncHandler(handler: HttpRequest ⇒ HttpResponse) = afterBinding(_ withSyncHandler handler) def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]) = afterBinding(_ withAsyncHandler handler) - def withHandler(handler: HttpRequest ⇒ Deferrable[HttpResponse]) = afterBinding(_ withHandler handler) def afterBinding(f: Applicator[Unit] ⇒ Unit): Future[Unit] = bindingFuture.map(binding ⇒ f(handleConnections(binding))) @@ -46,32 +43,29 @@ trait ScalaRoutingDSL extends Directives { run(routeRunner(route, _)) def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): Unit = - withHandler(request ⇒ Deferrable(handler(request))) + withAsyncHandler(request ⇒ FastFuture.successful(handler(request))) def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): Unit = - withHandler(request ⇒ Deferrable(handler(request))) - - def withHandler(handler: HttpRequest ⇒ Deferrable[HttpResponse]): Unit = run(_ ⇒ handler) - private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Deferrable[HttpResponse]): Unit = + private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Future[HttpResponse]): Unit = Flow(binding.connectionStream).foreach { case connection @ Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ val setup = setupProvider(connection) setup.routingLog.log.debug("Accepted new connection from " + remoteAddress) val runner = f(setup) Flow(requestProducer) - .mapFuture(request ⇒ runner(request).toFuture) + .mapFuture(request ⇒ runner(request)) .produceTo(responseConsumer)(setup.flowMaterializer) } } } - def routeRunner(route: Route, setup: RoutingSetup): HttpRequest ⇒ Deferrable[HttpResponse] = { + def routeRunner(route: Route, setup: RoutingSetup): HttpRequest ⇒ Future[HttpResponse] = { import setup._ val sealedRoute = sealRoute(route)(setup) request ⇒ - sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request))) map { + sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request))).fast.map { case RouteResult.Complete(response) ⇒ response case RouteResult.Rejected(rejected) ⇒ throw new IllegalStateException(s"Unhandled rejections '$rejected', unsealed RejectionHandler?!") case RouteResult.Failure(error) ⇒ throw new IllegalStateException(s"Unhandled error '$error', unsealed ExceptionHandler?!") diff --git a/akka-http/src/main/scala/akka/http/server/package.scala b/akka-http/src/main/scala/akka/http/server/package.scala index 7282692dd3..ab19fd1b0c 100644 --- a/akka-http/src/main/scala/akka/http/server/package.scala +++ b/akka-http/src/main/scala/akka/http/server/package.scala @@ -4,11 +4,11 @@ package akka.http -import akka.http.util.Deferrable +import scala.concurrent.Future package object server { - type Route = RequestContext ⇒ Deferrable[RouteResult] + type Route = RequestContext ⇒ Future[RouteResult] type RouteGenerator[T] = T ⇒ Route type Directive0 = Directive[Unit] type Directive1[T] = Directive[Tuple1[T]] diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 55e1b39178..9ee514d3db 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -46,7 +46,7 @@ trait MultipartUnmarshallers { BodyPart(createEntity(entityParts), headers) case (BodyPartParser.ParseError(errorInfo), _) ⇒ throw new ParsingException(errorInfo) }.toPublisher()(fm) - Deferrable(create(bodyParts)) + FastFuture.successful(create(bodyParts)) } } else UnmarshallingError.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index a11b513b71..6f20e43e63 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -10,7 +10,7 @@ import scala.xml.{ XML, NodeSeq } import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import akka.util.ByteString -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model._ import MediaTypes._ @@ -18,8 +18,8 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = Unmarshaller { entity ⇒ - if (entity.isKnownEmpty) Deferrable(ByteString.empty) - else Deferrable(Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).toFuture()) + if (entity.isKnownEmpty) FastFuture.successful(ByteString.empty) + else Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).toFuture() } implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, @@ -53,7 +53,7 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { case e: org.xml.sax.SAXNotRecognizedException ⇒ // property is not needed } val reader = new InputStreamReader(new ByteArrayInputStream(bytes), entity.contentType.charset.nioCharset) - Deferrable(XML.withSAXParser(parser).load(reader)) // blocking call! Ideally we'd have a `loadToFuture` + FastFuture.successful(XML.withSAXParser(parser).load(reader)) // blocking call! Ideally we'd have a `loadToFuture` } else UnmarshallingError.UnsupportedContentType(nodeSeqMediaTypes map (ContentTypeRange(_))) } } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala index 5df1da5921..715f6e1e77 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala @@ -4,7 +4,7 @@ package akka.http.unmarshalling -import akka.http.util.Deferrable +import scala.concurrent.Future object Unmarshal { def apply[T](value: T): Unmarshal[T] = new Unmarshal(value) @@ -14,5 +14,5 @@ class Unmarshal[A](val value: A) { /** * Unmarshals the value to the given Type using the in-scope Unmarshaller. */ - def to[B](implicit um: Unmarshaller[A, B]): Deferrable[B] = um(value) + def to[B](implicit um: Unmarshaller[A, B]): Future[B] = um(value) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala index 5bd4faf889..5c13e088da 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala @@ -6,26 +6,27 @@ package akka.http.unmarshalling import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.ExecutionContext -import akka.http.util.Deferrable +import scala.concurrent.{ Future, ExecutionContext } +import akka.http.util.FastFuture import akka.http.model.ContentTypeRange +import FastFuture._ -trait Unmarshaller[-A, B] extends (A ⇒ Deferrable[B]) { +trait Unmarshaller[-A, B] extends (A ⇒ Future[B]) { def map[C](f: B ⇒ C)(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(this andThen (_ map f)) + Unmarshaller(this andThen (_.fast.map(f))) - def flatMap[C](f: B ⇒ Deferrable[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(this andThen (_ flatMap f)) + def flatMap[C](f: B ⇒ Future[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller(this andThen (_.fast.flatMap(f))) def mapWithInput[C](f: (A @uncheckedVariance, B) ⇒ C)(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(a ⇒ this(a) map (f(a, _))) + Unmarshaller(a ⇒ this(a).fast.map(f(a, _))) - def flatMapWithInput[C](f: (A @uncheckedVariance, B) ⇒ Deferrable[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(a ⇒ this(a) flatMap (f(a, _))) + def flatMapWithInput[C](f: (A @uncheckedVariance, B) ⇒ Future[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller(a ⇒ this(a).fast.flatMap(f(a, _))) def recover[C >: B](pf: PartialFunction[Throwable, C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(this andThen (_ recover pf)) + Unmarshaller(this andThen (_.fast.recover(pf))) def withDefaultValue[BB >: B](defaultValue: BB)(implicit ec: ExecutionContext): Unmarshaller[A, BB] = recover { case UnmarshallingError.ContentExpected ⇒ defaultValue } @@ -35,14 +36,14 @@ object Unmarshaller extends GenericUnmarshallers with PredefinedFromEntityUnmarshallers { - def apply[A, B](f: A ⇒ Deferrable[B]): Unmarshaller[A, B] = + def apply[A, B](f: A ⇒ Future[B]): Unmarshaller[A, B] = new Unmarshaller[A, B] { def apply(a: A) = f(a) } } sealed abstract class UnmarshallingError extends RuntimeException object UnmarshallingError { - implicit def toDeferrable[T](error: UnmarshallingError): Deferrable[T] = Deferrable.failed(error) + implicit def toDeferrable[T](error: UnmarshallingError): Future[T] = FastFuture.failed(error) case object ContentExpected extends UnmarshallingError