Merge pull request #15975 from spray/wip-15962-mathias

Remove `Deferrable[T]` and replace with POF (plain old Future)
This commit is contained in:
Björn Antonsson 2014-09-26 11:27:34 +02:00
commit ad1ad44fcc
31 changed files with 346 additions and 300 deletions

View file

@ -4,15 +4,16 @@
package akka.http.model
import akka.http.util.FastFuture
import language.implicitConversions
import java.io.File
import java.lang.{ Iterable JIterable }
import org.reactivestreams.Publisher
import scala.concurrent.ExecutionContext
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import akka.util.ByteString
import akka.http.util.Deferrable
import akka.stream.{ TimerTransformer, FlowMaterializer }
import akka.stream.scaladsl.Flow
import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable }
@ -41,7 +42,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
* Collects all possible parts and returns a potentially future Strict entity for easier processing.
* The Deferrable is failed with an TimeoutException if the stream isn't completed after the given timeout.
*/
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[HttpEntity.Strict] = {
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[HttpEntity.Strict] = {
def transformer() =
new TimerTransformer[ByteString, HttpEntity.Strict] {
var bytes = ByteString.newBuilder
@ -59,7 +60,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
throw new java.util.concurrent.TimeoutException(
s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")
}
Deferrable(Flow(dataBytes).timerTransform("toStrict", transformer).toFuture())
Flow(dataBytes).timerTransform("toStrict", transformer).toFuture()
}
/**
@ -133,7 +134,7 @@ object HttpEntity {
def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = SynchronousPublisherFromIterable(data :: Nil)
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
Deferrable(this)
FastFuture.successful(this)
def withContentType(contentType: ContentType): Strict =
if (contentType == this.contentType) this else copy(contentType = contentType)

View file

@ -6,7 +6,7 @@ package akka.http.model
import java.lang.{ Iterable JIterable }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext
import scala.concurrent.{ Future, ExecutionContext }
import scala.collection.immutable
import scala.reflect.{ classTag, ClassTag }
import akka.stream.FlowMaterializer
@ -14,6 +14,7 @@ import akka.util.ByteString
import akka.http.util._
import headers._
import HttpCharsets._
import FastFuture._
/**
* Common base class of HttpRequest and HttpResponse.
@ -47,8 +48,8 @@ sealed trait HttpMessage extends japi.HttpMessage {
def withEntity(entity: MessageEntity): Self
/** Returns a sharable and serializable copy of this message with a strict entity. */
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[Self] =
entity.toStrict(timeout).map(this.withEntity)
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Self] =
entity.toStrict(timeout).fast.map(this.withEntity)
/** Returns a copy of this message with the entity and headers set to the given ones. */
def withHeadersAndEntity(headers: immutable.Seq[HttpHeader], entity: MessageEntity): Self

View file

@ -6,12 +6,13 @@ package akka.http.model
import java.io.File
import org.reactivestreams.Publisher
import scala.concurrent.ExecutionContext
import scala.concurrent.{ Future, ExecutionContext }
import scala.collection.immutable
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.impl.SynchronousPublisherFromIterable
import akka.http.util.Deferrable
import akka.http.util.FastFuture
import FastFuture._
import headers._
trait MultipartParts {
@ -56,8 +57,8 @@ case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts
* Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off
* hint.
*/
def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[StrictMultipartFormData] =
Deferrable(Flow(parts).grouped(maxFieldCount).toFuture()).map(new StrictMultipartFormData(_))
def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[StrictMultipartFormData] =
Flow(parts).grouped(maxFieldCount).toFuture().fast.map(new StrictMultipartFormData(_))
}
/**
@ -70,7 +71,7 @@ class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends Multi
def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName))
override def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
Deferrable(this)
FastFuture.successful(this)
}
object MultipartFormData {

View file

@ -1,92 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.util
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
// TODO: remove and switch to plain old futures if performance is not significantly better
/**
* An ADT modelling values that are either strictly available immediately or available at some point in the future.
* The benefit over a direct `Future` is that mapping and flatMapping doesn't have to be scheduled if the value
* is strictly available.
*/
sealed trait Deferrable[+A] {
def map[B](f: A B)(implicit ec: ExecutionContext): Deferrable[B]
def flatMap[B](f: A Deferrable[B])(implicit ec: ExecutionContext): Deferrable[B]
def foreach(f: A Unit)(implicit ec: ExecutionContext): Unit
def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext): Deferrable[B]
def toFuture: Future[A]
def await(timeout: Duration): A
}
object Deferrable {
def apply[T](value: T): Strict[T] = Strict(value)
def apply[T](value: Try[T]): Deferrable[T] =
value match {
case Success(x) Strict(x)
case Failure(e) StrictError(e)
}
def apply[T](future: Future[T]): Deferrable[T] =
future.value match {
case None NonStrict[T](future)
case Some(x) apply(x)
}
def failed[T](error: Throwable): StrictError = StrictError(error)
case class Strict[A](value: A) extends Deferrable[A] {
def map[B](f: A B)(implicit ec: ExecutionContext) =
try Strict(f(value))
catch { case NonFatal(error) failed(error) }
def flatMap[B](f: A Deferrable[B])(implicit ec: ExecutionContext) =
try f(value)
catch { case NonFatal(error) failed(error) }
def foreach(f: A Unit)(implicit ec: ExecutionContext) = f(value)
def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = this
def toFuture = Future.successful(value)
def await(timeout: Duration) = value
}
case class StrictError(error: Throwable) extends Deferrable[Nothing] {
def map[B](f: Nothing B)(implicit ec: ExecutionContext) = this
def flatMap[B](f: Nothing Deferrable[B])(implicit ec: ExecutionContext) = this
def foreach(f: Nothing Unit)(implicit ec: ExecutionContext) = ()
def recover[B](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) =
if (pf isDefinedAt error) {
try Strict(pf(error))
catch { case NonFatal(e) failed(e) }
} else this
def toFuture = Future.failed(error)
def await(timeout: Duration) = throw error
}
case class NonStrict[A](future: Future[A]) extends Deferrable[A] {
def map[B](f: A B)(implicit ec: ExecutionContext) =
future.value match {
case None NonStrict(future map f)
case Some(x) Deferrable(x) map f
}
def flatMap[B](f: A Deferrable[B])(implicit ec: ExecutionContext) =
future.value match {
case None NonStrict(future flatMap (x f(x).toFuture))
case Some(x) Deferrable(x) flatMap f
}
def foreach(f: A Unit)(implicit ec: ExecutionContext) =
future.value match {
case None future foreach f
case Some(x) Deferrable(x) foreach f
}
def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) =
future.value match {
case None NonStrict(future recover pf)
case Some(x) Deferrable(x) recover pf
}
def toFuture = future
def await(timeout: Duration) = Await.result(future, timeout)
}
}

View file

@ -0,0 +1,133 @@
/**
* Copyright (C) 2009-2014 Typestrict Inc. <http://www.typestrict.com>
*/
package akka.http.util
import scala.language.{ implicitConversions, higherKinds }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import scala.concurrent._
/**
* Provides alternative implementations of the basic transformation operations defined on [[Future]],
* which try to avoid scheduling to an [[ExecutionContext]] if possible, i.e. if the given future
* value is already present.
*/
class FastFuture[A](val future: Future[A]) extends AnyVal {
import FastFuture._
def map[B](f: A B)(implicit ec: ExecutionContext): Future[B] = {
def strictMap(a: A) =
try FulfilledFuture(f(a))
catch { case NonFatal(e) ErrorFuture(e) }
future match {
case FulfilledFuture(a) strictMap(a)
case x: ErrorFuture x
case _ future.value match {
case None future map f
case Some(Success(a)) strictMap(a)
case Some(Failure(e)) ErrorFuture(e)
}
}
}
def flatMap[B](f: A Future[B])(implicit ec: ExecutionContext): Future[B] = {
def strictFlatMap(a: A) =
try f(a)
catch { case NonFatal(e) ErrorFuture(e) }
future match {
case FulfilledFuture(a) strictFlatMap(a)
case x: ErrorFuture x
case _ future.value match {
case None future flatMap f
case Some(Success(a)) strictFlatMap(a)
case Some(Failure(e)) ErrorFuture(e)
}
}
}
def filter(pred: A Boolean)(implicit executor: ExecutionContext): Future[A] =
flatMap {
r if (pred(r)) future else throw new NoSuchElementException("Future.filter predicate is not satisfied")
}
def foreach(f: A Unit)(implicit ec: ExecutionContext): Unit = {
def strictForeach(a: A) =
try f(a)
catch { case NonFatal(e) ec.reportFailure(e) } // behave as if the `foreach` had been scheduled
future match {
case FulfilledFuture(a) strictForeach(a)
case x: ErrorFuture // nothing to do
case _ future.value match {
case None future.foreach(f)
case Some(Success(a)) strictForeach(a)
case Some(Failure(e)) // nothing to do
}
}
}
def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext): Future[B] = {
def strictRecover(t: Throwable) =
try if (pf isDefinedAt t) FulfilledFuture(pf(t)) else future
catch { case NonFatal(e) ErrorFuture(e) }
future match {
case FulfilledFuture(_) future
case ErrorFuture(e) strictRecover(e)
case _ future.value match {
case None future recover pf
case Some(Success(a)) FulfilledFuture(a)
case Some(Failure(e)) strictRecover(e)
}
}
}
}
object FastFuture {
def successful[T](value: T): Future[T] = FulfilledFuture(value)
def failed(error: Throwable): Future[Nothing] = ErrorFuture(error)
private case class FulfilledFuture[+A](a: A) extends Future[A] {
def value = Some(Success(a))
def onComplete[U](f: Try[A] U)(implicit executor: ExecutionContext) = Future.successful(a).onComplete(f)
def isCompleted = true
def result(atMost: Duration)(implicit permit: CanAwait) = a
def ready(atMost: Duration)(implicit permit: CanAwait) = this
}
private case class ErrorFuture(error: Throwable) extends Future[Nothing] {
def value = Some(Failure(error))
def onComplete[U](f: Try[Nothing] U)(implicit executor: ExecutionContext) = Future.failed(error).onComplete(f)
def isCompleted = true
def result(atMost: Duration)(implicit permit: CanAwait) = throw error
def ready(atMost: Duration)(implicit permit: CanAwait) = this
}
implicit class EnhancedFuture[T](val future: Future[T]) extends AnyVal {
def fast: FastFuture[T] = new FastFuture[T](future)
}
def sequence[T, M[_] <: TraversableOnce[_]](in: M[Future[T]])(implicit cbf: CanBuildFrom[M[Future[T]], T, M[T]], executor: ExecutionContext): Future[M[T]] =
in.foldLeft(successful(cbf(in))) {
(fr, fa) for (r fr.fast; a fa.asInstanceOf[Future[T]].fast) yield r += a
}.fast.map(_.result())
def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(f: (R, T) R)(implicit executor: ExecutionContext): Future[R] =
if (futures.isEmpty) successful(zero)
else sequence(futures).fast.map(_.foldLeft(zero)(f))
def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) R)(implicit executor: ExecutionContext): Future[R] =
if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).fast.map(_ reduceLeft op)
def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr.fast; b fb.fast) yield r += b
}.fast.map(_.result())
}

View file

@ -167,5 +167,5 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}
}
def toStrict(entity: HttpEntity): HttpEntity.Strict = entity.toStrict(500.millis).await(1.second)
def toStrict(entity: HttpEntity): HttpEntity.Strict = Await.result(entity.toStrict(500.millis), 1.second)
}

View file

@ -24,6 +24,7 @@ import HttpProtocols._
import StatusCodes._
import HttpEntity._
import ParserOutput.ParseError
import FastFuture._
class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
@ -397,7 +398,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
.map { x
Flow {
x match {
case Right(request) compactEntity(request.entity).map(x Right(request.withEntity(x))).toFuture
case Right(request) compactEntity(request.entity).fast.map(x Right(request.withEntity(x)))
case Left(error) Future.successful(Left(error))
}
}.toPublisher()
@ -410,16 +411,16 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
protected def parserSettings: ParserSettings = ParserSettings(system)
protected def newParser = new HttpRequestParser(parserSettings, false)()
private def compactEntity(entity: RequestEntity): Deferrable[RequestEntity] =
private def compactEntity(entity: RequestEntity): Future[RequestEntity] =
entity match {
case x: Chunked compactEntityChunks(x.chunks).map(compacted x.copy(chunks = compacted))
case x: Chunked compactEntityChunks(x.chunks).fast.map(compacted x.copy(chunks = compacted))
case _ entity.toStrict(250.millis)
}
private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Deferrable[Publisher[ChunkStreamPart]] =
Deferrable(Flow(data).grouped(1000).toFuture())
.map(publisher(_: _*))
.recover { case _: NoSuchElementException publisher() }
private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] =
Flow(data).grouped(1000).toFuture()
.fast.map(publisher(_: _*))
.fast.recover { case _: NoSuchElementException publisher() }
def prep(response: String) = response.stripMarginWithNewline("\r\n")
}

View file

@ -24,6 +24,7 @@ import HttpProtocols._
import StatusCodes._
import HttpEntity._
import ParserOutput.ParseError
import FastFuture._
class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
@ -249,8 +250,8 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}.map { x
Flow {
x match {
case Right(response) compactEntity(response.entity).map(x Right(response.withEntity(x))).toFuture
case Left(error) Future.successful(Left(error.info.formatPretty))
case Right(response) compactEntity(response.entity).fast.map(x Right(response.withEntity(x)))
case Left(error) FastFuture.successful(Left(error.info.formatPretty))
}
}.toPublisher()
}
@ -266,16 +267,16 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
parser
}
private def compactEntity(entity: ResponseEntity): Deferrable[ResponseEntity] =
private def compactEntity(entity: ResponseEntity): Future[ResponseEntity] =
entity match {
case x: HttpEntity.Chunked compactEntityChunks(x.chunks).map(compacted x.copy(chunks = compacted))
case x: HttpEntity.Chunked compactEntityChunks(x.chunks).fast.map(compacted x.copy(chunks = compacted))
case _ entity.toStrict(250.millis)
}
private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Deferrable[Publisher[ChunkStreamPart]] =
Deferrable(Flow(data).grouped(1000).toFuture())
.map(publisher(_: _*))
.recover { case _: NoSuchElementException publisher() }
private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] =
Flow(data).grouped(1000).toFuture()
.fast.map(publisher(_: _*))
.fast.recover { case _: NoSuchElementException publisher() }
def prep(response: String) = response.stripMarginWithNewline("\r\n")

View file

@ -4,19 +4,20 @@
package akka.http.model
import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec }
import akka.util.ByteString
import org.scalatest.matchers.Matcher
import akka.stream.scaladsl.Flow
import akka.http.model.HttpEntity._
import org.reactivestreams.Publisher
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.impl.SynchronousPublisherFromIterable
import java.util.concurrent.TimeoutException
import com.typesafe.config.{ ConfigFactory, Config }
import org.reactivestreams.Publisher
import scala.concurrent.{ Promise, Await }
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec }
import org.scalatest.matchers.Matcher
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.FlowMaterializer
import akka.stream.impl.SynchronousPublisherFromIterable
import akka.http.model.HttpEntity._
import akka.http.util.FastFuture._
class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
val tpe: ContentType = ContentTypes.`application/octet-stream`
@ -76,7 +77,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
val neverCompleted = Promise[ByteString]()
val stream: Publisher[ByteString] = Flow(neverCompleted.future).toPublisher()
intercept[TimeoutException] {
Default(tpe, 42, stream).toStrict(100.millis).await(150.millis)
Await.result(Default(tpe, 42, stream).toStrict(100.millis), 150.millis)
}.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data")
}
}
@ -91,5 +92,5 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
}
def strictifyTo(strict: Strict): Matcher[HttpEntity] =
equal(strict).matcher[Strict].compose(_.toStrict(250.millis).await(250.millis))
equal(strict).matcher[Strict].compose(x Await.result(x.toStrict(250.millis), 250.millis))
}

View file

@ -4,16 +4,16 @@
package io.akka.integrationtest.http
import akka.actor.ActorSystem
import akka.http.model._
import akka.http.model.ContentTypes
import akka.http.model.headers._
import akka.http.model.parser.HeaderParser
import akka.stream.scaladsl2._
import akka.util.ByteString
import com.typesafe.config.{ ConfigFactory, Config }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.http.model.parser.HeaderParser
import akka.http.model._
import akka.stream.scaladsl2._
import headers._
/**
* Integration test for external HTTP libraries that are built on top of
@ -88,7 +88,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte
// Finally convert the body into an Array[Byte].
val entityBytes: Array[Byte] = request.entity.toStrict(1.second).await(2.seconds).data.toArray
val entityBytes: Array[Byte] = Await.result(request.entity.toStrict(1.second), 2.seconds).data.toArray
entityBytes.to[Seq] shouldEqual ByteString("hello").to[Seq]
}