diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala index e938171433..69867285df 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala @@ -6,6 +6,7 @@ package akka.http.testkit import com.typesafe.config.{ ConfigFactory, Config } import scala.collection.immutable +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.DynamicVariable import scala.reflect.ClassTag @@ -13,11 +14,12 @@ import org.scalatest.Suite import akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.http.client.RequestBuilding -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.server._ import akka.http.unmarshalling._ import akka.http.model._ import headers.Host +import FastFuture._ trait RouteTest extends RequestBuilding with RouteTestResultComponent { this: TestFrameworkInterface ⇒ @@ -57,11 +59,11 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { def chunks: immutable.Seq[HttpEntity.ChunkStreamPart] = result.chunks def entityAs[T: FromEntityUnmarshaller: ClassTag](implicit timeout: Duration = 1.second): T = { def msg(e: Throwable) = s"Could not unmarshal entity to type '${implicitly[ClassTag[T]]}' for `entityAs` assertion: $e\n\nResponse was: $response" - Unmarshal(entity).to[T].recover[T] { case error ⇒ failTest(msg(error)) }.await(timeout) + Await.result(Unmarshal(entity).to[T].fast.recover[T] { case error ⇒ failTest(msg(error)) }, timeout) } def responseAs[T: FromResponseUnmarshaller: ClassTag](implicit timeout: Duration = 1.second): T = { def msg(e: Throwable) = s"Could not unmarshal response to type '${implicitly[ClassTag[T]]}' for `responseAs` assertion: $e\n\nResponse was: $response" - Unmarshal(response).to[T].recover[T] { case error ⇒ failTest(msg(error)) }.await(timeout) + Await.result(Unmarshal(response).to[T].fast.recover[T] { case error ⇒ failTest(msg(error)) }, timeout) } def contentType: ContentType = entity.contentType def mediaType: MediaType = contentType.mediaType @@ -116,7 +118,7 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { } implicit def injectIntoRoute(implicit timeout: RouteTestTimeout, setup: RoutingSetup, defaultHostInfo: DefaultHostInfo) = - new TildeArrow[RequestContext, Deferrable[RouteResult]] { + new TildeArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult def apply(request: HttpRequest, route: Route): Out = { val routeTestResult = new RouteTestResult(timeout.duration)(setup.materializer) @@ -133,7 +135,7 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { val semiSealedRoute = // sealed for exceptions but not for rejections Directives.handleExceptions(sealedExceptionHandler) { route } val deferrableRouteResult = semiSealedRoute(ctx) - deferrableRouteResult.foreach(routeTestResult.handleResult)(setup.executor) + deferrableRouteResult.fast.foreach(routeTestResult.handleResult)(setup.executor) routeTestResult } } diff --git a/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala b/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala index 1498640183..cff935e8d2 100644 --- a/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/marshalling/ContentNegotiationSpec.scala @@ -4,8 +4,10 @@ package akka.http.marshalling +import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ Matchers, FreeSpec } +import akka.http.util.FastFuture._ import akka.http.model.parser.HeaderParser import akka.http.model._ import headers._ @@ -127,10 +129,9 @@ class ContentNegotiationSpec extends FreeSpec with Matchers { case ct @ ContentType(mt, Some(cs)) ⇒ Marshaller.withFixedCharset(mt, cs)((s: String) ⇒ HttpEntity(ct, s)) case ContentType(mt, None) ⇒ Marshaller.withOpenCharset(mt)((s: String, cs) ⇒ HttpEntity(ContentType(mt, cs), s)) } - Marshal("foo").toResponseFor(request) - .map(response ⇒ Some(response.entity.contentType)) - .recover { case _: Marshal.UnacceptableResponseContentTypeException ⇒ None } - .await(1.second) + Await.result(Marshal("foo").toResponseFor(request) + .fast.map(response ⇒ Some(response.entity.contentType)) + .fast.recover { case _: Marshal.UnacceptableResponseContentTypeException ⇒ None }, 1.second) } } diff --git a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala index 8f69888bf5..1d2199bc54 100644 --- a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala @@ -5,6 +5,7 @@ package akka.http.marshalling import scala.collection.immutable.ListMap +import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem @@ -153,7 +154,7 @@ class MarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll with override def afterAll() = system.shutdown() def marshal[T: ToEntityMarshallers](value: T): HttpEntity.Strict = - Marshal(value).to[HttpEntity].await(1.second).toStrict(1.second).await(1.second) + Await.result(Await.result(Marshal(value).to[HttpEntity], 1.second).toStrict(1.second), 1.second) protected class FixedRandom extends java.util.Random { override def nextBytes(array: Array[Byte]): Unit = "my-stable-boundary".getBytes("UTF-8").copyToArray(array) diff --git a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala index 152452abf2..df6afd5cf2 100644 --- a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala @@ -6,7 +6,7 @@ package akka.http.unmarshalling import scala.xml.NodeSeq import scala.concurrent.duration._ -import scala.concurrent.Await +import scala.concurrent.{ Future, Await } import org.scalatest.matchers.Matcher import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem @@ -16,6 +16,7 @@ import akka.http.model._ import akka.http.util._ import headers._ import MediaTypes._ +import FastFuture._ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName) @@ -34,7 +35,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { Unmarshal(HttpEntity("árvíztűrő ütvefúrógép")).to[Array[Char]] should evaluateTo("árvíztűrő ütvefúrógép".toCharArray) } "nodeSeqUnmarshaller should unmarshal `text/xml` content in UTF-8 to NodeSeqs" in { - Unmarshal(HttpEntity(`text/xml`, "Hällö")).to[NodeSeq].map(_.text) should evaluateTo("Hällö") + Unmarshal(HttpEntity(`text/xml`, "Hällö")).to[NodeSeq].fast.map(_.text) should evaluateTo("Hällö") } } @@ -78,7 +79,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } "multipartContentUnmarshaller should reject illegal multipart content" in { - val mpc = Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", + val mpc = Await.result(Unmarshal(HttpEntity(`multipart/form-data` withBoundary "-", """--- |Content-type: text/plain; charset=UTF8 |Content-type: application/json @@ -86,8 +87,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { | |test@there.com |-----""".stripMarginWithNewline("\r\n"))) - .to[MultipartContent] - .await(1.second) + .to[MultipartContent], 1.second) Await.result(Flow(mpc.parts).toFuture().failed, 1.second).getMessage shouldEqual "multipart part must not contain more than one Content-Type header" } @@ -160,26 +160,27 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { override def afterAll() = system.shutdown() - def evaluateTo[T](value: T): Matcher[Deferrable[T]] = - equal(value).matcher[T] compose (_.await(1.second)) + def evaluateTo[T](value: T): Matcher[Future[T]] = + equal(value).matcher[T] compose (x ⇒ Await.result(x, 1.second)) - def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Deferrable[T]] = - equal(parts).matcher[Seq[BodyPart]] compose { - _.flatMap(x ⇒ Deferrable(Flow(x.parts).grouped(100).toFuture())) - .recover { case _: NoSuchElementException ⇒ Nil } - .await(1.second) + def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Future[T]] = + equal(parts).matcher[Seq[BodyPart]] compose { x ⇒ + Await.result(x + .fast.flatMap(x ⇒ Flow(x.parts).grouped(100).toFuture()) + .fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second) } - def haveFormData(fields: (String, BodyPart)*): Matcher[Deferrable[MultipartFormData]] = - equal(fields).matcher[Seq[(String, BodyPart)]] compose { - _.flatMap(x ⇒ Deferrable(Flow(x.parts).grouped(100).toFuture())) - .recover { case _: NoSuchElementException ⇒ Nil } - .map { + def haveFormData(fields: (String, BodyPart)*): Matcher[Future[MultipartFormData]] = + equal(fields).matcher[Seq[(String, BodyPart)]] compose { x ⇒ + Await.result(x + .fast.flatMap(x ⇒ Flow(x.parts).grouped(100).toFuture()) + .fast.recover { case _: NoSuchElementException ⇒ Nil } + .fast.map { _ map { part ⇒ part.headers.collectFirst { case `Content-Disposition`(ContentDispositionTypes.`form-data`, params) ⇒ params("name") }.get -> part } - }.await(1.second) + }, 1.second) } } diff --git a/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala b/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala index a31bb93170..2a1a8ba4b9 100644 --- a/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala +++ b/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala @@ -5,7 +5,7 @@ package akka.http.client import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Await, ExecutionContext } import scala.concurrent.duration._ import scala.reflect.ClassTag import akka.util.Timeout @@ -45,7 +45,7 @@ trait RequestBuilding extends TransformerPipelineSupport { content match { case None ⇒ apply(uri, HttpEntity.Empty) case Some(value) ⇒ - val entity = Marshal(value).to[RequestEntity].await(timeout.duration) + val entity = Await.result(Marshal(value).to[RequestEntity], timeout.duration) apply(uri, entity) } diff --git a/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala index d36788102c..c390a9ce7e 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/GenericMarshallers.scala @@ -6,16 +6,17 @@ package akka.http.marshalling import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Try, Failure, Success } -import akka.http.util.Deferrable +import akka.http.util.FastFuture +import FastFuture._ trait GenericMarshallers extends LowPriorityToResponseMarshallerImplicits { - implicit def throwableMarshaller[T]: Marshaller[Throwable, T] = Marshaller(Deferrable.failed) + implicit def throwableMarshaller[T]: Marshaller[Throwable, T] = Marshaller(FastFuture.failed) implicit def optionMarshaller[A, B](implicit m: Marshaller[A, B], empty: EmptyValue[B]): Marshaller[Option[A], B] = Marshaller { case Some(value) ⇒ m(value) - case None ⇒ Deferrable(Marshalling.Opaque(() ⇒ empty.emptyValue)) + case None ⇒ FastFuture.successful(Marshalling.Opaque(() ⇒ empty.emptyValue)) } implicit def eitherMarshaller[A1, A2, B](implicit m1: Marshaller[A1, B], m2: Marshaller[A2, B]): Marshaller[Either[A1, A2], B] = @@ -24,16 +25,13 @@ trait GenericMarshallers extends LowPriorityToResponseMarshallerImplicits { case Right(a2) ⇒ m2(a2) } - implicit def deferrableMarshaller[A, B](implicit m: Marshaller[A, B], ec: ExecutionContext): Marshaller[Deferrable[A], B] = - Marshaller(_ flatMap m.apply) - implicit def futureMarshaller[A, B](implicit m: Marshaller[A, B], ec: ExecutionContext): Marshaller[Future[A], B] = - Marshaller(Deferrable(_) flatMap m.apply) + Marshaller(_.fast.flatMap(m(_))) implicit def tryMarshaller[A, B](implicit m: Marshaller[A, B]): Marshaller[Try[A], B] = Marshaller { case Success(value) ⇒ m(value) - case Failure(error) ⇒ Deferrable.failed(error) + case Failure(error) ⇒ FastFuture.failed(error) } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala b/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala index 3af8d1b30d..1a031cbeb4 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/Marshal.scala @@ -6,9 +6,10 @@ package akka.http.marshalling import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model.HttpCharsets._ import akka.http.model._ +import FastFuture._ object Marshal { def apply[T](value: T): Marshal[T] = new Marshal(value) @@ -23,8 +24,8 @@ class Marshal[A](val value: A) { * Marshals `value` to the first of the available `Marshallers` for `A` and `B`. * If the marshaller is flexible with regard to the used charset `UTF-8` is chosen. */ - def to[B](implicit m: Marshallers[A, B], ec: ExecutionContext): Deferrable[B] = - m.marshallers.head(value) map { + def to[B](implicit m: Marshallers[A, B], ec: ExecutionContext): Future[B] = + m.marshallers.head(value).fast.map { case Marshalling.WithFixedCharset(_, _, marshal) ⇒ marshal() case Marshalling.WithOpenCharset(_, marshal) ⇒ marshal(HttpCharsets.`UTF-8`) case Marshalling.Opaque(marshal) ⇒ marshal() @@ -33,53 +34,50 @@ class Marshal[A](val value: A) { /** * Marshals `value` to an `HttpResponse` for the given `HttpRequest` with full content-negotiation. */ - def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshallers[A], ec: ExecutionContext): Deferrable[HttpResponse] = { + def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshallers[A], ec: ExecutionContext): Future[HttpResponse] = { import akka.http.marshalling.Marshal._ val mediaRanges = request.acceptedMediaRanges // cache for performance val charsetRanges = request.acceptedCharsetRanges // cache for performance def qValueMT(mediaType: MediaType) = request.qValueForMediaType(mediaType, mediaRanges) def qValueCS(charset: HttpCharset) = request.qValueForCharset(charset, charsetRanges) - // TODO: avoid scheduling if possible - val marshallingFutures = m.marshallers.map(_(value).toFuture) - val marshallingsFuture = Future.sequence(marshallingFutures) - val marshallingFuture = - marshallingsFuture map { marshallings ⇒ - def weight(mt: MediaType, cs: HttpCharset): Float = math.min(qValueMT(mt), qValueCS(cs)) - val defaultMarshallingWeight: MarshallingWeight = - new MarshallingWeight(0f, { () ⇒ - val supportedContentTypes = marshallings collect { - case Marshalling.WithFixedCharset(mt, cs, _) ⇒ ContentType(mt, cs) - case Marshalling.WithOpenCharset(mt, _) ⇒ ContentType(mt) - } - throw new UnacceptableResponseContentTypeException(supportedContentTypes) - }) - val best = marshallings.foldLeft(defaultMarshallingWeight) { - case (acc, Marshalling.WithFixedCharset(mt, cs, marshal)) ⇒ + val marshallingFutures = m.marshallers.map(_(value)) + val marshallingsFuture = FastFuture.sequence(marshallingFutures) + marshallingsFuture.fast.map { marshallings ⇒ + def weight(mt: MediaType, cs: HttpCharset): Float = math.min(qValueMT(mt), qValueCS(cs)) + val defaultMarshallingWeight: MarshallingWeight = + new MarshallingWeight(0f, { () ⇒ + val supportedContentTypes = marshallings collect { + case Marshalling.WithFixedCharset(mt, cs, _) ⇒ ContentType(mt, cs) + case Marshalling.WithOpenCharset(mt, _) ⇒ ContentType(mt) + } + throw new UnacceptableResponseContentTypeException(supportedContentTypes) + }) + val best = marshallings.foldLeft(defaultMarshallingWeight) { + case (acc, Marshalling.WithFixedCharset(mt, cs, marshal)) ⇒ + val w = weight(mt, cs) + if (w > acc.weight) new MarshallingWeight(w, marshal) else acc + + case (acc, Marshalling.WithOpenCharset(mt, marshal)) ⇒ + def withCharset(cs: HttpCharset) = { val w = weight(mt, cs) - if (w > acc.weight) new MarshallingWeight(w, marshal) else acc + if (w > acc.weight) new MarshallingWeight(w, () ⇒ marshal(cs)) else acc + } + // logic for choosing the charset adapted from http://tools.ietf.org/html/rfc7231#section-5.3.3 + if (qValueCS(`UTF-8`) == 1f) withCharset(`UTF-8`) // prefer UTF-8 if fully accepted + else charsetRanges match { // ranges are sorted by descending q-value, + case (HttpCharsetRange.One(cs, qValue)) :: _ ⇒ // so we only need to look at the first one + if (qValue == 1f) withCharset(cs) // if the client has high preference for this charset, pick it + else if (qValueCS(`ISO-8859-1`) == 1f) withCharset(`ISO-8859-1`) // give some more preference to `ISO-8859-1` + else if (qValue > 0f) withCharset(cs) // ok, simply choose the first one if the client doesn't reject it + else acc + case _ ⇒ acc + } - case (acc, Marshalling.WithOpenCharset(mt, marshal)) ⇒ - def withCharset(cs: HttpCharset) = { - val w = weight(mt, cs) - if (w > acc.weight) new MarshallingWeight(w, () ⇒ marshal(cs)) else acc - } - // logic for choosing the charset adapted from http://tools.ietf.org/html/rfc7231#section-5.3.3 - if (qValueCS(`UTF-8`) == 1f) withCharset(`UTF-8`) // prefer UTF-8 if fully accepted - else charsetRanges match { // ranges are sorted by descending q-value, - case (HttpCharsetRange.One(cs, qValue)) :: _ ⇒ // so we only need to look at the first one - if (qValue == 1f) withCharset(cs) // if the client has high preference for this charset, pick it - else if (qValueCS(`ISO-8859-1`) == 1f) withCharset(`ISO-8859-1`) // give some more preference to `ISO-8859-1` - else if (qValue > 0f) withCharset(cs) // ok, simply choose the first one if the client doesn't reject it - else acc - case _ ⇒ acc - } - - case (acc, Marshalling.Opaque(marshal)) ⇒ - if (acc.weight == 0f) new MarshallingWeight(Float.MinPositiveValue, marshal) else acc - } - best.marshal() + case (acc, Marshalling.Opaque(marshal)) ⇒ + if (acc.weight == 0f) new MarshallingWeight(Float.MinPositiveValue, marshal) else acc } - Deferrable(marshallingFuture) + best.marshal() + } } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala index a51ebef5c5..5ab98ca5f3 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/Marshallers.scala @@ -5,10 +5,11 @@ package akka.http.marshalling import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.xml.NodeSeq -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model._ +import FastFuture._ import MediaTypes._ case class Marshallers[-A, +B](marshallers: immutable.Seq[Marshaller[A, B]]) { @@ -39,10 +40,10 @@ sealed abstract class SingleMarshallerMarshallers { } sealed trait Marshaller[-A, +B] { outer ⇒ - def apply(value: A): Deferrable[Marshalling[B]] + def apply(value: A): Future[Marshalling[B]] def map[C](f: B ⇒ C)(implicit ec: ExecutionContext): Marshaller[A, C] = - Marshaller[A, C](value ⇒ outer(value) map (_ map f)) + Marshaller[A, C](value ⇒ outer(value).fast.map(_ map f)) /** * Reuses this Marshaller's logic to produce a new Marshaller from another type `C` which overrides @@ -51,7 +52,7 @@ sealed trait Marshaller[-A, +B] { outer ⇒ def wrap[C, D >: B](mediaType: MediaType)(f: C ⇒ A)(implicit ec: ExecutionContext, mto: MediaTypeOverrider[D]): Marshaller[C, D] = Marshaller { value ⇒ import Marshalling._ - outer(f(value)) map { + outer(f(value)).fast.map { case WithFixedCharset(_, cs, marshal) ⇒ WithFixedCharset(mediaType, cs, () ⇒ mto(marshal(), mediaType)) case WithOpenCharset(_, marshal) ⇒ WithOpenCharset(mediaType, cs ⇒ mto(marshal(cs), mediaType)) case Opaque(marshal) ⇒ Opaque(() ⇒ mto(marshal(), mediaType)) @@ -67,19 +68,19 @@ object Marshaller with PredefinedToResponseMarshallers with PredefinedToRequestMarshallers { - def apply[A, B](f: A ⇒ Deferrable[Marshalling[B]]): Marshaller[A, B] = + def apply[A, B](f: A ⇒ Future[Marshalling[B]]): Marshaller[A, B] = new Marshaller[A, B] { def apply(value: A) = f(value) } def withFixedCharset[A, B](mediaType: MediaType, charset: HttpCharset)(marshal: A ⇒ B): Marshaller[A, B] = - Marshaller { value ⇒ Deferrable(Marshalling.WithFixedCharset(mediaType, charset, () ⇒ marshal(value))) } + Marshaller { value ⇒ FastFuture.successful(Marshalling.WithFixedCharset(mediaType, charset, () ⇒ marshal(value))) } def withOpenCharset[A, B](mediaType: MediaType)(marshal: (A, HttpCharset) ⇒ B): Marshaller[A, B] = - Marshaller { value ⇒ Deferrable(Marshalling.WithOpenCharset(mediaType, charset ⇒ marshal(value, charset))) } + Marshaller { value ⇒ FastFuture.successful(Marshalling.WithOpenCharset(mediaType, charset ⇒ marshal(value, charset))) } def opaque[A, B](marshal: A ⇒ B): Marshaller[A, B] = - Marshaller { value ⇒ Deferrable(Marshalling.Opaque(() ⇒ marshal(value))) } + Marshaller { value ⇒ FastFuture.successful(Marshalling.Opaque(() ⇒ marshal(value))) } } /** diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 97a97362af..6b2b014766 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -12,6 +12,7 @@ import akka.stream.{ FlattenStrategy, FlowMaterializer } import akka.stream.scaladsl.Flow import akka.http.engine.rendering.BodyPartRenderer import akka.http.util.actorSystem +import akka.http.util.FastFuture._ import akka.http.model._ import MediaTypes._ @@ -47,7 +48,7 @@ trait MultipartMarshallers { implicit def multipartFormDataMarshaller(implicit mcm: ToEntityMarshaller[MultipartContent], ec: ExecutionContext): ToEntityMarshaller[MultipartFormData] = Marshaller { value ⇒ - mcm(MultipartContent(value.parts)) map { + mcm(MultipartContent(value.parts)).fast.map { case Marshalling.WithOpenCharset(mt, marshal) ⇒ val mediaType = `multipart/form-data` withBoundary mt.params("boundary") Marshalling.WithOpenCharset(mediaType, cs ⇒ MediaTypeOverrider.forEntity(marshal(cs), mediaType)) diff --git a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala index 657f034dd7..0472ab7ee3 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToEntityMarshallers.scala @@ -9,7 +9,7 @@ import scala.concurrent.ExecutionContext import scala.xml.NodeSeq import akka.http.model.MediaTypes._ import akka.http.model._ -import akka.http.util.{ Deferrable, StringRendering } +import akka.http.util.{ FastFuture, StringRendering } import akka.util.ByteString trait PredefinedToEntityMarshallers extends MultipartMarshallers { @@ -63,7 +63,7 @@ trait PredefinedToEntityMarshallers extends MultipartMarshallers { implicit val HttpEntityMarshaller: ToEntityMarshaller[MessageEntity] = Marshaller { value ⇒ // since we don't want to recode we simply ignore the charset determined by content negotiation here - Deferrable(Marshalling.WithOpenCharset(value.contentType.mediaType, _ ⇒ value)) + FastFuture.successful(Marshalling.WithOpenCharset(value.contentType.mediaType, _ ⇒ value)) } } diff --git a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala index 4de715d2f2..39a2801d25 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToRequestMarshallers.scala @@ -6,8 +6,9 @@ package akka.http.marshalling import scala.collection.immutable import scala.concurrent.ExecutionContext -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model._ +import FastFuture._ trait PredefinedToRequestMarshallers { private type TRM[T] = ToRequestMarshaller[T] // brevity alias @@ -15,7 +16,7 @@ trait PredefinedToRequestMarshallers { implicit val fromRequest: TRM[HttpRequest] = Marshaller.opaque(identity) implicit def fromUri(implicit ec: ExecutionContext): TRM[Uri] = - Marshaller { uri ⇒ Deferrable(Marshalling.Opaque(() ⇒ HttpRequest(uri = uri))) } + Marshaller { uri ⇒ FastFuture.successful(Marshalling.Opaque(() ⇒ HttpRequest(uri = uri))) } implicit def fromMethodAndUriAndValue[S, T](implicit mt: ToEntityMarshaller[T], ec: ExecutionContext): TRM[(HttpMethod, Uri, T)] = @@ -23,7 +24,7 @@ trait PredefinedToRequestMarshallers { implicit def fromMethodAndUriAndHeadersAndValue[T](implicit mt: ToEntityMarshaller[T], ec: ExecutionContext): TRM[(HttpMethod, Uri, immutable.Seq[HttpHeader], T)] = - Marshaller { case (m, u, h, v) ⇒ mt(v) map (_ map (HttpRequest(m, u, h, _))) } + Marshaller { case (m, u, h, v) ⇒ mt(v).fast.map(_ map (HttpRequest(m, u, h, _))) } } object PredefinedToRequestMarshallers extends PredefinedToRequestMarshallers diff --git a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala index c6b5cd3c84..19cf405aa8 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/PredefinedToResponseMarshallers.scala @@ -6,6 +6,7 @@ package akka.http.marshalling import scala.collection.immutable import scala.concurrent.ExecutionContext +import akka.http.util.FastFuture._ import akka.http.model.MediaTypes._ import akka.http.model._ @@ -33,7 +34,7 @@ trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImp implicit def fromStatusCodeAndHeadersAndValue[T](implicit mt: ToEntityMarshaller[T], ec: ExecutionContext): TRM[(StatusCode, immutable.Seq[HttpHeader], T)] = - Marshaller { case (status, headers, value) ⇒ mt(value).map(_ map (HttpResponse(status, headers, _))) } + Marshaller { case (status, headers, value) ⇒ mt(value).fast.map(_ map (HttpResponse(status, headers, _))) } } trait LowPriorityToResponseMarshallerImplicits { diff --git a/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala b/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala index c1c0ebd674..c58e66dd59 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/ToResponseMarshallable.scala @@ -4,8 +4,7 @@ package akka.http.marshalling -import scala.concurrent.ExecutionContext -import akka.http.util.Deferrable +import scala.concurrent.{ Future, ExecutionContext } import akka.http.model._ /** Something that can later be marshalled into a response */ @@ -14,7 +13,7 @@ trait ToResponseMarshallable { def value: T implicit def marshaller: ToResponseMarshaller[T] - def apply(request: HttpRequest)(implicit ec: ExecutionContext): Deferrable[HttpResponse] = + def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = Marshal(value).toResponseFor(request) } diff --git a/akka-http/src/main/scala/akka/http/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/server/RequestContext.scala index e7ccd74ae4..d30aa36363 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContext.scala @@ -5,10 +5,9 @@ package akka.http.server import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter import akka.stream.FlowMaterializer -import akka.http.util.Deferrable import akka.http.marshalling.ToResponseMarshallable import akka.http.model._ @@ -49,19 +48,19 @@ trait RequestContext { /** * Completes the request with the given ToResponseMarshallable. */ - def complete(obj: ToResponseMarshallable): Deferrable[RouteResult] + def complete(obj: ToResponseMarshallable): Future[RouteResult] /** * Rejects the request with the given rejections. */ - def reject(rejections: Rejection*): Deferrable[RouteResult] + def reject(rejections: Rejection*): Future[RouteResult] /** * Bubbles the given error up the response chain where it is dealt with by the closest `handleExceptions` * directive and its ``ExceptionHandler``, unless the error is a ``RejectionError``. In this case the * wrapped rejection is unpacked and "executed". */ - def fail(error: Throwable): Deferrable[RouteResult] + def fail(error: Throwable): Future[RouteResult] /** * Returns a copy of this context with the new HttpRequest. @@ -96,7 +95,7 @@ trait RequestContext { /** * Returns a copy of this context with the given response transformation function chained into the response chain. */ - def withRouteResponseFlatMapped(f: RouteResult ⇒ Deferrable[RouteResult]): RequestContext + def withRouteResponseFlatMapped(f: RouteResult ⇒ Future[RouteResult]): RequestContext /** * Returns a copy of this context with the given response transformation function chained into the response chain. @@ -121,17 +120,17 @@ trait RequestContext { /** * Returns a copy of this context with the given rejection handling function chained into the response chain. */ - def withRejectionHandling(f: List[Rejection] ⇒ Deferrable[RouteResult]): RequestContext + def withRejectionHandling(f: List[Rejection] ⇒ Future[RouteResult]): RequestContext /** * Returns a copy of this context with the given exception handling function chained into the response chain. */ - def withExceptionHandling(pf: PartialFunction[Throwable, Deferrable[RouteResult]]): RequestContext + def withExceptionHandling(pf: PartialFunction[Throwable, Future[RouteResult]]): RequestContext /** * Returns a copy of this context with the given function handling a part of the response space. */ - def withRouteResponseHandling(pf: PartialFunction[RouteResult, Deferrable[RouteResult]]): RequestContext + def withRouteResponseHandling(pf: PartialFunction[RouteResult, Future[RouteResult]]): RequestContext /** * Removes a potentially existing Accept header from the request headers. diff --git a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala index d2691d0cab..bd2795472b 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala @@ -5,12 +5,13 @@ package akka.http.server import scala.collection.immutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter import akka.stream.FlowMaterializer import akka.http.marshalling.ToResponseMarshallable -import akka.http.util.{ Deferrable, identityFunc } +import akka.http.util.{ FastFuture, identityFunc } import akka.http.model._ +import FastFuture._ /** * INTERNAL API @@ -21,7 +22,7 @@ private[http] class RequestContextImpl( val executionContext: ExecutionContext, val flowMaterializer: FlowMaterializer, val log: LoggingAdapter, - finish: RouteResult ⇒ Deferrable[RouteResult] = Deferrable(_)) extends RequestContext { + finish: RouteResult ⇒ Future[RouteResult] = FastFuture.successful) extends RequestContext { def this(request: HttpRequest, log: LoggingAdapter)(implicit ec: ExecutionContext, fm: FlowMaterializer) = this(request, request.uri.path, ec, fm, log) @@ -31,19 +32,19 @@ private[http] class RequestContextImpl( log: LoggingAdapter): RequestContext = copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log) - override def complete(trm: ToResponseMarshallable): Deferrable[RouteResult] = + override def complete(trm: ToResponseMarshallable): Future[RouteResult] = trm(request)(executionContext) - .map(res ⇒ RouteResult.complete(res))(executionContext) - .recover { + .fast.map(res ⇒ RouteResult.complete(res))(executionContext) + .fast.recover { case RejectionError(rej) ⇒ RouteResult.rejected(rej :: Nil) case error ⇒ RouteResult.failure(error) }(executionContext) - .flatMap(finish)(executionContext) + .fast.flatMap(finish)(executionContext) - override def reject(rejections: Rejection*): Deferrable[RouteResult] = + override def reject(rejections: Rejection*): Future[RouteResult] = finish(RouteResult.rejected(rejections.toList)) - override def fail(error: Throwable): Deferrable[RouteResult] = + override def fail(error: Throwable): Future[RouteResult] = finish(RouteResult.failure(error)) override def withRequest(req: HttpRequest): RequestContext = @@ -64,8 +65,8 @@ private[http] class RequestContextImpl( override def withRouteResponseMappedPF(pf: PartialFunction[RouteResult, RouteResult]): RequestContext = withRouteResponseMapped(pf.applyOrElse(_, identityFunc[RouteResult])) - override def withRouteResponseFlatMapped(f: RouteResult ⇒ Deferrable[RouteResult]): RequestContext = - copy(finish = rr ⇒ f(rr).flatMap(finish)(executionContext)) + override def withRouteResponseFlatMapped(f: RouteResult ⇒ Future[RouteResult]): RequestContext = + copy(finish = rr ⇒ f(rr).fast.flatMap(finish)(executionContext)) override def withHttpResponseMapped(f: HttpResponse ⇒ HttpResponse): RequestContext = withRouteResponseMappedPF { @@ -83,21 +84,21 @@ private[http] class RequestContextImpl( case RouteResult.Rejected(rejs) ⇒ RouteResult.rejected(f(rejs)) } - override def withRejectionHandling(f: List[Rejection] ⇒ Deferrable[RouteResult]): RequestContext = + override def withRejectionHandling(f: List[Rejection] ⇒ Future[RouteResult]): RequestContext = withRouteResponseHandling { case RouteResult.Rejected(rejs) ⇒ // `finish` is *not* chained in here, because the user already applied it when creating the result of f f(rejs) } - override def withExceptionHandling(pf: PartialFunction[Throwable, Deferrable[RouteResult]]): RequestContext = + override def withExceptionHandling(pf: PartialFunction[Throwable, Future[RouteResult]]): RequestContext = withRouteResponseHandling { case RouteResult.Failure(error) if pf isDefinedAt error ⇒ // `finish` is *not* chained in here, because the user already applied it when creating the result of pf pf(error) } - def withRouteResponseHandling(pf: PartialFunction[RouteResult, Deferrable[RouteResult]]): RequestContext = + def withRouteResponseHandling(pf: PartialFunction[RouteResult, Future[RouteResult]]): RequestContext = copy(finish = pf.applyOrElse(_, finish)) override def withContentNegotiationDisabled: RequestContext = @@ -108,6 +109,6 @@ private[http] class RequestContextImpl( executionContext: ExecutionContext = executionContext, flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, - finish: RouteResult ⇒ Deferrable[RouteResult] = finish) = + finish: RouteResult ⇒ Future[RouteResult] = finish) = new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, finish) } diff --git a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala index 9256f41657..4d98c81fd5 100644 --- a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala +++ b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala @@ -4,14 +4,13 @@ package akka.http.server -import akka.stream.FlowMaterializer - import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success } +import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model.{ HttpRequest, HttpResponse } import akka.http.Http +import FastFuture._ /** * The main entry point into the Scala routing DSL. @@ -24,7 +23,6 @@ trait ScalaRoutingDSL extends Directives { def withRoute(route: Route): R def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): R def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): R - def withHandler(handler: HttpRequest ⇒ Deferrable[HttpResponse]): R } def handleConnections(bindingFuture: Future[Http.ServerBinding])(implicit ec: ExecutionContext, fm: FlowMaterializer, @@ -33,7 +31,6 @@ trait ScalaRoutingDSL extends Directives { def withRoute(route: Route) = afterBinding(_ withRoute route) def withSyncHandler(handler: HttpRequest ⇒ HttpResponse) = afterBinding(_ withSyncHandler handler) def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]) = afterBinding(_ withAsyncHandler handler) - def withHandler(handler: HttpRequest ⇒ Deferrable[HttpResponse]) = afterBinding(_ withHandler handler) def afterBinding(f: Applicator[Unit] ⇒ Unit): Future[Unit] = bindingFuture.map(binding ⇒ f(handleConnections(binding))) @@ -46,32 +43,29 @@ trait ScalaRoutingDSL extends Directives { run(routeRunner(route, _)) def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): Unit = - withHandler(request ⇒ Deferrable(handler(request))) + withAsyncHandler(request ⇒ FastFuture.successful(handler(request))) def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): Unit = - withHandler(request ⇒ Deferrable(handler(request))) - - def withHandler(handler: HttpRequest ⇒ Deferrable[HttpResponse]): Unit = run(_ ⇒ handler) - private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Deferrable[HttpResponse]): Unit = + private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Future[HttpResponse]): Unit = Flow(binding.connectionStream).foreach { case connection @ Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ val setup = setupProvider(connection) setup.routingLog.log.debug("Accepted new connection from " + remoteAddress) val runner = f(setup) Flow(requestProducer) - .mapFuture(request ⇒ runner(request).toFuture) + .mapFuture(request ⇒ runner(request)) .produceTo(responseConsumer)(setup.flowMaterializer) } } } - def routeRunner(route: Route, setup: RoutingSetup): HttpRequest ⇒ Deferrable[HttpResponse] = { + def routeRunner(route: Route, setup: RoutingSetup): HttpRequest ⇒ Future[HttpResponse] = { import setup._ val sealedRoute = sealRoute(route)(setup) request ⇒ - sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request))) map { + sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request))).fast.map { case RouteResult.Complete(response) ⇒ response case RouteResult.Rejected(rejected) ⇒ throw new IllegalStateException(s"Unhandled rejections '$rejected', unsealed RejectionHandler?!") case RouteResult.Failure(error) ⇒ throw new IllegalStateException(s"Unhandled error '$error', unsealed ExceptionHandler?!") diff --git a/akka-http/src/main/scala/akka/http/server/package.scala b/akka-http/src/main/scala/akka/http/server/package.scala index 7282692dd3..ab19fd1b0c 100644 --- a/akka-http/src/main/scala/akka/http/server/package.scala +++ b/akka-http/src/main/scala/akka/http/server/package.scala @@ -4,11 +4,11 @@ package akka.http -import akka.http.util.Deferrable +import scala.concurrent.Future package object server { - type Route = RequestContext ⇒ Deferrable[RouteResult] + type Route = RequestContext ⇒ Future[RouteResult] type RouteGenerator[T] = T ⇒ Route type Directive0 = Directive[Unit] type Directive1[T] = Directive[Tuple1[T]] diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 55e1b39178..9ee514d3db 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -46,7 +46,7 @@ trait MultipartUnmarshallers { BodyPart(createEntity(entityParts), headers) case (BodyPartParser.ParseError(errorInfo), _) ⇒ throw new ParsingException(errorInfo) }.toPublisher()(fm) - Deferrable(create(bodyParts)) + FastFuture.successful(create(bodyParts)) } } else UnmarshallingError.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index a11b513b71..6f20e43e63 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -10,7 +10,7 @@ import scala.xml.{ XML, NodeSeq } import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import akka.util.ByteString -import akka.http.util.Deferrable +import akka.http.util.FastFuture import akka.http.model._ import MediaTypes._ @@ -18,8 +18,8 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = Unmarshaller { entity ⇒ - if (entity.isKnownEmpty) Deferrable(ByteString.empty) - else Deferrable(Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).toFuture()) + if (entity.isKnownEmpty) FastFuture.successful(ByteString.empty) + else Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).toFuture() } implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, @@ -53,7 +53,7 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { case e: org.xml.sax.SAXNotRecognizedException ⇒ // property is not needed } val reader = new InputStreamReader(new ByteArrayInputStream(bytes), entity.contentType.charset.nioCharset) - Deferrable(XML.withSAXParser(parser).load(reader)) // blocking call! Ideally we'd have a `loadToFuture` + FastFuture.successful(XML.withSAXParser(parser).load(reader)) // blocking call! Ideally we'd have a `loadToFuture` } else UnmarshallingError.UnsupportedContentType(nodeSeqMediaTypes map (ContentTypeRange(_))) } } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala index 5df1da5921..715f6e1e77 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshal.scala @@ -4,7 +4,7 @@ package akka.http.unmarshalling -import akka.http.util.Deferrable +import scala.concurrent.Future object Unmarshal { def apply[T](value: T): Unmarshal[T] = new Unmarshal(value) @@ -14,5 +14,5 @@ class Unmarshal[A](val value: A) { /** * Unmarshals the value to the given Type using the in-scope Unmarshaller. */ - def to[B](implicit um: Unmarshaller[A, B]): Deferrable[B] = um(value) + def to[B](implicit um: Unmarshaller[A, B]): Future[B] = um(value) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala index 5bd4faf889..5c13e088da 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/Unmarshaller.scala @@ -6,26 +6,27 @@ package akka.http.unmarshalling import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.ExecutionContext -import akka.http.util.Deferrable +import scala.concurrent.{ Future, ExecutionContext } +import akka.http.util.FastFuture import akka.http.model.ContentTypeRange +import FastFuture._ -trait Unmarshaller[-A, B] extends (A ⇒ Deferrable[B]) { +trait Unmarshaller[-A, B] extends (A ⇒ Future[B]) { def map[C](f: B ⇒ C)(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(this andThen (_ map f)) + Unmarshaller(this andThen (_.fast.map(f))) - def flatMap[C](f: B ⇒ Deferrable[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(this andThen (_ flatMap f)) + def flatMap[C](f: B ⇒ Future[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller(this andThen (_.fast.flatMap(f))) def mapWithInput[C](f: (A @uncheckedVariance, B) ⇒ C)(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(a ⇒ this(a) map (f(a, _))) + Unmarshaller(a ⇒ this(a).fast.map(f(a, _))) - def flatMapWithInput[C](f: (A @uncheckedVariance, B) ⇒ Deferrable[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(a ⇒ this(a) flatMap (f(a, _))) + def flatMapWithInput[C](f: (A @uncheckedVariance, B) ⇒ Future[C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = + Unmarshaller(a ⇒ this(a).fast.flatMap(f(a, _))) def recover[C >: B](pf: PartialFunction[Throwable, C])(implicit ec: ExecutionContext): Unmarshaller[A, C] = - Unmarshaller(this andThen (_ recover pf)) + Unmarshaller(this andThen (_.fast.recover(pf))) def withDefaultValue[BB >: B](defaultValue: BB)(implicit ec: ExecutionContext): Unmarshaller[A, BB] = recover { case UnmarshallingError.ContentExpected ⇒ defaultValue } @@ -35,14 +36,14 @@ object Unmarshaller extends GenericUnmarshallers with PredefinedFromEntityUnmarshallers { - def apply[A, B](f: A ⇒ Deferrable[B]): Unmarshaller[A, B] = + def apply[A, B](f: A ⇒ Future[B]): Unmarshaller[A, B] = new Unmarshaller[A, B] { def apply(a: A) = f(a) } } sealed abstract class UnmarshallingError extends RuntimeException object UnmarshallingError { - implicit def toDeferrable[T](error: UnmarshallingError): Deferrable[T] = Deferrable.failed(error) + implicit def toDeferrable[T](error: UnmarshallingError): Future[T] = FastFuture.failed(error) case object ContentExpected extends UnmarshallingError