From 92299158f1785ad13f4be80f9205bbae47ca8f9b Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 24 Sep 2014 16:37:48 +0200 Subject: [PATCH] !htc Replace `Deferrable` with `FastFuture` --- .../scala/akka/http/model/HttpEntity.scala | 11 +- .../scala/akka/http/model/HttpMessage.scala | 7 +- .../akka/http/model/MultipartContent.scala | 11 +- .../scala/akka/http/util/Deferrable.scala | 92 ------------ .../scala/akka/http/util/FastFuture.scala | 133 ++++++++++++++++++ .../scala/akka/http/ClientServerSpec.scala | 2 +- .../engine/parsing/RequestParserSpec.scala | 15 +- .../engine/parsing/ResponseParserSpec.scala | 17 +-- .../akka/http/model/HttpEntitySpec.scala | 25 ++-- .../http/HttpModelIntegrationSpec.scala | 18 +-- 10 files changed, 189 insertions(+), 142 deletions(-) delete mode 100644 akka-http-core/src/main/scala/akka/http/util/Deferrable.scala create mode 100644 akka-http-core/src/main/scala/akka/http/util/FastFuture.scala diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index a71b8f2f86..1d7ce9086c 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -4,15 +4,16 @@ package akka.http.model +import akka.http.util.FastFuture + import language.implicitConversions import java.io.File import java.lang.{ Iterable ⇒ JIterable } import org.reactivestreams.Publisher -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import akka.util.ByteString -import akka.http.util.Deferrable import akka.stream.{ TimerTransformer, FlowMaterializer } import akka.stream.scaladsl.Flow import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable } @@ -41,7 +42,7 @@ sealed trait HttpEntity extends japi.HttpEntity { * Collects all possible parts and returns a potentially future Strict entity for easier processing. * The Deferrable is failed with an TimeoutException if the stream isn't completed after the given timeout. */ - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[HttpEntity.Strict] = { + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[HttpEntity.Strict] = { def transformer() = new TimerTransformer[ByteString, HttpEntity.Strict] { var bytes = ByteString.newBuilder @@ -59,7 +60,7 @@ sealed trait HttpEntity extends japi.HttpEntity { throw new java.util.concurrent.TimeoutException( s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data") } - Deferrable(Flow(dataBytes).timerTransform("toStrict", transformer).toFuture()) + Flow(dataBytes).timerTransform("toStrict", transformer).toFuture() } /** @@ -133,7 +134,7 @@ object HttpEntity { def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = SynchronousPublisherFromIterable(data :: Nil) override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = - Deferrable(this) + FastFuture.successful(this) def withContentType(contentType: ContentType): Strict = if (contentType == this.contentType) this else copy(contentType = contentType) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index e61c1558d0..454e9d7b9e 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -6,7 +6,7 @@ package akka.http.model import java.lang.{ Iterable ⇒ JIterable } import scala.concurrent.duration.FiniteDuration -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import scala.reflect.{ classTag, ClassTag } import akka.stream.FlowMaterializer @@ -14,6 +14,7 @@ import akka.util.ByteString import akka.http.util._ import headers._ import HttpCharsets._ +import FastFuture._ /** * Common base class of HttpRequest and HttpResponse. @@ -47,8 +48,8 @@ sealed trait HttpMessage extends japi.HttpMessage { def withEntity(entity: MessageEntity): Self /** Returns a sharable and serializable copy of this message with a strict entity. */ - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[Self] = - entity.toStrict(timeout).map(this.withEntity) + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Self] = + entity.toStrict(timeout).fast.map(this.withEntity) /** Returns a copy of this message with the entity and headers set to the given ones. */ def withHeadersAndEntity(headers: immutable.Seq[HttpHeader], entity: MessageEntity): Self diff --git a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala index b6ec3ef271..b1b1e18f3d 100644 --- a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala +++ b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala @@ -6,12 +6,13 @@ package akka.http.model import java.io.File import org.reactivestreams.Publisher -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import akka.stream.impl.SynchronousPublisherFromIterable -import akka.http.util.Deferrable +import akka.http.util.FastFuture +import FastFuture._ import headers._ trait MultipartParts { @@ -56,8 +57,8 @@ case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts * Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off * hint. */ - def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Deferrable[StrictMultipartFormData] = - Deferrable(Flow(parts).grouped(maxFieldCount).toFuture()).map(new StrictMultipartFormData(_)) + def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[StrictMultipartFormData] = + Flow(parts).grouped(maxFieldCount).toFuture().fast.map(new StrictMultipartFormData(_)) } /** @@ -70,7 +71,7 @@ class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends Multi def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName)) override def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer) = - Deferrable(this) + FastFuture.successful(this) } object MultipartFormData { diff --git a/akka-http-core/src/main/scala/akka/http/util/Deferrable.scala b/akka-http-core/src/main/scala/akka/http/util/Deferrable.scala deleted file mode 100644 index 69a49c23ed..0000000000 --- a/akka-http-core/src/main/scala/akka/http/util/Deferrable.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.util - -import scala.concurrent.duration.Duration -import scala.concurrent.{ Await, ExecutionContext, Future } -import scala.util.control.NonFatal -import scala.util.{ Failure, Success, Try } - -// TODO: remove and switch to plain old futures if performance is not significantly better - -/** - * An ADT modelling values that are either strictly available immediately or available at some point in the future. - * The benefit over a direct `Future` is that mapping and flatMapping doesn't have to be scheduled if the value - * is strictly available. - */ -sealed trait Deferrable[+A] { - def map[B](f: A ⇒ B)(implicit ec: ExecutionContext): Deferrable[B] - def flatMap[B](f: A ⇒ Deferrable[B])(implicit ec: ExecutionContext): Deferrable[B] - def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext): Unit - def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext): Deferrable[B] - def toFuture: Future[A] - def await(timeout: Duration): A -} - -object Deferrable { - def apply[T](value: T): Strict[T] = Strict(value) - def apply[T](value: Try[T]): Deferrable[T] = - value match { - case Success(x) ⇒ Strict(x) - case Failure(e) ⇒ StrictError(e) - } - def apply[T](future: Future[T]): Deferrable[T] = - future.value match { - case None ⇒ NonStrict[T](future) - case Some(x) ⇒ apply(x) - } - def failed[T](error: Throwable): StrictError = StrictError(error) - - case class Strict[A](value: A) extends Deferrable[A] { - def map[B](f: A ⇒ B)(implicit ec: ExecutionContext) = - try Strict(f(value)) - catch { case NonFatal(error) ⇒ failed(error) } - def flatMap[B](f: A ⇒ Deferrable[B])(implicit ec: ExecutionContext) = - try f(value) - catch { case NonFatal(error) ⇒ failed(error) } - def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext) = f(value) - def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = this - def toFuture = Future.successful(value) - def await(timeout: Duration) = value - } - - case class StrictError(error: Throwable) extends Deferrable[Nothing] { - def map[B](f: Nothing ⇒ B)(implicit ec: ExecutionContext) = this - def flatMap[B](f: Nothing ⇒ Deferrable[B])(implicit ec: ExecutionContext) = this - def foreach(f: Nothing ⇒ Unit)(implicit ec: ExecutionContext) = () - def recover[B](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = - if (pf isDefinedAt error) { - try Strict(pf(error)) - catch { case NonFatal(e) ⇒ failed(e) } - } else this - def toFuture = Future.failed(error) - def await(timeout: Duration) = throw error - } - - case class NonStrict[A](future: Future[A]) extends Deferrable[A] { - def map[B](f: A ⇒ B)(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ NonStrict(future map f) - case Some(x) ⇒ Deferrable(x) map f - } - def flatMap[B](f: A ⇒ Deferrable[B])(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ NonStrict(future flatMap (x ⇒ f(x).toFuture)) - case Some(x) ⇒ Deferrable(x) flatMap f - } - def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ future foreach f - case Some(x) ⇒ Deferrable(x) foreach f - } - def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext) = - future.value match { - case None ⇒ NonStrict(future recover pf) - case Some(x) ⇒ Deferrable(x) recover pf - } - def toFuture = future - def await(timeout: Duration) = Await.result(future, timeout) - } -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala b/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala new file mode 100644 index 0000000000..cffc047802 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2009-2014 Typestrict Inc. + */ + +package akka.http.util + +import scala.language.{ implicitConversions, higherKinds } +import scala.util.control.NonFatal +import scala.util.{ Failure, Success, Try } +import scala.collection.generic.CanBuildFrom +import scala.concurrent.duration.Duration +import scala.concurrent._ + +/** + * Provides alternative implementations of the basic transformation operations defined on [[Future]], + * which try to avoid scheduling to an [[ExecutionContext]] if possible, i.e. if the given future + * value is already present. + */ +class FastFuture[A](val future: Future[A]) extends AnyVal { + import FastFuture._ + + def map[B](f: A ⇒ B)(implicit ec: ExecutionContext): Future[B] = { + def strictMap(a: A) = + try FulfilledFuture(f(a)) + catch { case NonFatal(e) ⇒ ErrorFuture(e) } + + future match { + case FulfilledFuture(a) ⇒ strictMap(a) + case x: ErrorFuture ⇒ x + case _ ⇒ future.value match { + case None ⇒ future map f + case Some(Success(a)) ⇒ strictMap(a) + case Some(Failure(e)) ⇒ ErrorFuture(e) + } + } + } + + def flatMap[B](f: A ⇒ Future[B])(implicit ec: ExecutionContext): Future[B] = { + def strictFlatMap(a: A) = + try f(a) + catch { case NonFatal(e) ⇒ ErrorFuture(e) } + + future match { + case FulfilledFuture(a) ⇒ strictFlatMap(a) + case x: ErrorFuture ⇒ x + case _ ⇒ future.value match { + case None ⇒ future flatMap f + case Some(Success(a)) ⇒ strictFlatMap(a) + case Some(Failure(e)) ⇒ ErrorFuture(e) + } + } + } + + def filter(pred: A ⇒ Boolean)(implicit executor: ExecutionContext): Future[A] = + flatMap { + r ⇒ if (pred(r)) future else throw new NoSuchElementException("Future.filter predicate is not satisfied") + } + + def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext): Unit = { + def strictForeach(a: A) = + try f(a) + catch { case NonFatal(e) ⇒ ec.reportFailure(e) } // behave as if the `foreach` had been scheduled + + future match { + case FulfilledFuture(a) ⇒ strictForeach(a) + case x: ErrorFuture ⇒ // nothing to do + case _ ⇒ future.value match { + case None ⇒ future.foreach(f) + case Some(Success(a)) ⇒ strictForeach(a) + case Some(Failure(e)) ⇒ // nothing to do + } + } + } + + def recover[B >: A](pf: PartialFunction[Throwable, B])(implicit ec: ExecutionContext): Future[B] = { + def strictRecover(t: Throwable) = + try if (pf isDefinedAt t) FulfilledFuture(pf(t)) else future + catch { case NonFatal(e) ⇒ ErrorFuture(e) } + + future match { + case FulfilledFuture(_) ⇒ future + case ErrorFuture(e) ⇒ strictRecover(e) + case _ ⇒ future.value match { + case None ⇒ future recover pf + case Some(Success(a)) ⇒ FulfilledFuture(a) + case Some(Failure(e)) ⇒ strictRecover(e) + } + } + } +} + +object FastFuture { + def successful[T](value: T): Future[T] = FulfilledFuture(value) + def failed(error: Throwable): Future[Nothing] = ErrorFuture(error) + + private case class FulfilledFuture[+A](a: A) extends Future[A] { + def value = Some(Success(a)) + def onComplete[U](f: Try[A] ⇒ U)(implicit executor: ExecutionContext) = Future.successful(a).onComplete(f) + def isCompleted = true + def result(atMost: Duration)(implicit permit: CanAwait) = a + def ready(atMost: Duration)(implicit permit: CanAwait) = this + } + private case class ErrorFuture(error: Throwable) extends Future[Nothing] { + def value = Some(Failure(error)) + def onComplete[U](f: Try[Nothing] ⇒ U)(implicit executor: ExecutionContext) = Future.failed(error).onComplete(f) + def isCompleted = true + def result(atMost: Duration)(implicit permit: CanAwait) = throw error + def ready(atMost: Duration)(implicit permit: CanAwait) = this + } + + implicit class EnhancedFuture[T](val future: Future[T]) extends AnyVal { + def fast: FastFuture[T] = new FastFuture[T](future) + } + + def sequence[T, M[_] <: TraversableOnce[_]](in: M[Future[T]])(implicit cbf: CanBuildFrom[M[Future[T]], T, M[T]], executor: ExecutionContext): Future[M[T]] = + in.foldLeft(successful(cbf(in))) { + (fr, fa) ⇒ for (r ← fr.fast; a ← fa.asInstanceOf[Future[T]].fast) yield r += a + }.fast.map(_.result()) + + def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(f: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = + if (futures.isEmpty) successful(zero) + else sequence(futures).fast.map(_.foldLeft(zero)(f)) + + def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = + if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection")) + else sequence(futures).fast.map(_ reduceLeft op) + + def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + in.foldLeft(successful(cbf(in))) { (fr, a) ⇒ + val fb = fn(a.asInstanceOf[A]) + for (r ← fr.fast; b ← fb.fast) yield r += b + }.fast.map(_.result()) +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 11d2d98a74..897fe8ac9a 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -167,5 +167,5 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } - def toStrict(entity: HttpEntity): HttpEntity.Strict = entity.toStrict(500.millis).await(1.second) + def toStrict(entity: HttpEntity): HttpEntity.Strict = Await.result(entity.toStrict(500.millis), 1.second) } \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 384f66e51b..2a69b070c7 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -24,6 +24,7 @@ import HttpProtocols._ import StatusCodes._ import HttpEntity._ import ParserOutput.ParseError +import FastFuture._ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -365,7 +366,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { .map { x ⇒ Flow { x match { - case Right(request) ⇒ compactEntity(request.entity).map(x ⇒ Right(request.withEntity(x))).toFuture + case Right(request) ⇒ compactEntity(request.entity).fast.map(x ⇒ Right(request.withEntity(x))) case Left(error) ⇒ Future.successful(Left(error)) } }.toPublisher() @@ -377,16 +378,16 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { private def newParser = new HttpRequestParser(ParserSettings(system), false)() - private def compactEntity(entity: RequestEntity): Deferrable[RequestEntity] = + private def compactEntity(entity: RequestEntity): Future[RequestEntity] = entity match { - case x: Chunked ⇒ compactEntityChunks(x.chunks).map(compacted ⇒ x.copy(chunks = compacted)) + case x: Chunked ⇒ compactEntityChunks(x.chunks).fast.map(compacted ⇒ x.copy(chunks = compacted)) case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Deferrable[Publisher[ChunkStreamPart]] = - Deferrable(Flow(data).grouped(1000).toFuture()) - .map(publisher(_: _*)) - .recover { case _: NoSuchElementException ⇒ publisher() } + private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] = + Flow(data).grouped(1000).toFuture() + .fast.map(publisher(_: _*)) + .fast.recover { case _: NoSuchElementException ⇒ publisher() } def prep(response: String) = response.stripMarginWithNewline("\r\n") } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 3f978c60b1..147475d2c4 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -24,6 +24,7 @@ import HttpProtocols._ import StatusCodes._ import HttpEntity._ import ParserOutput.ParseError +import FastFuture._ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -223,8 +224,8 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { }.map { x ⇒ Flow { x match { - case Right(response) ⇒ compactEntity(response.entity).map(x ⇒ Right(response.withEntity(x))).toFuture - case Left(error) ⇒ Future.successful(Left(error.info.formatPretty)) + case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x))) + case Left(error) ⇒ FastFuture.successful(Left(error.info.formatPretty)) } }.toPublisher() } @@ -239,16 +240,16 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { parser } - private def compactEntity(entity: ResponseEntity): Deferrable[ResponseEntity] = + private def compactEntity(entity: ResponseEntity): Future[ResponseEntity] = entity match { - case x: HttpEntity.Chunked ⇒ compactEntityChunks(x.chunks).map(compacted ⇒ x.copy(chunks = compacted)) + case x: HttpEntity.Chunked ⇒ compactEntityChunks(x.chunks).fast.map(compacted ⇒ x.copy(chunks = compacted)) case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Deferrable[Publisher[ChunkStreamPart]] = - Deferrable(Flow(data).grouped(1000).toFuture()) - .map(publisher(_: _*)) - .recover { case _: NoSuchElementException ⇒ publisher() } + private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] = + Flow(data).grouped(1000).toFuture() + .fast.map(publisher(_: _*)) + .fast.recover { case _: NoSuchElementException ⇒ publisher() } def prep(response: String) = response.stripMarginWithNewline("\r\n") diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index e5ff64df74..c1a711a0dd 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -4,19 +4,20 @@ package akka.http.model -import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } -import akka.util.ByteString -import org.scalatest.matchers.Matcher -import akka.stream.scaladsl.Flow -import akka.http.model.HttpEntity._ -import org.reactivestreams.Publisher -import akka.actor.ActorSystem -import akka.stream.FlowMaterializer -import akka.stream.impl.SynchronousPublisherFromIterable +import java.util.concurrent.TimeoutException import com.typesafe.config.{ ConfigFactory, Config } +import org.reactivestreams.Publisher import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ -import java.util.concurrent.TimeoutException +import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } +import org.scalatest.matchers.Matcher +import akka.util.ByteString +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import akka.stream.FlowMaterializer +import akka.stream.impl.SynchronousPublisherFromIterable +import akka.http.model.HttpEntity._ +import akka.http.util.FastFuture._ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { val tpe: ContentType = ContentTypes.`application/octet-stream` @@ -76,7 +77,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { val neverCompleted = Promise[ByteString]() val stream: Publisher[ByteString] = Flow(neverCompleted.future).toPublisher() intercept[TimeoutException] { - Default(tpe, 42, stream).toStrict(100.millis).await(150.millis) + Await.result(Default(tpe, 42, stream).toStrict(100.millis), 150.millis) }.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data") } } @@ -91,5 +92,5 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { } def strictifyTo(strict: Strict): Matcher[HttpEntity] = - equal(strict).matcher[Strict].compose(_.toStrict(250.millis).await(250.millis)) + equal(strict).matcher[Strict].compose(x ⇒ Await.result(x.toStrict(250.millis), 250.millis)) } diff --git a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala index 5b8bfb1188..abeef6a1bc 100644 --- a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala @@ -4,16 +4,16 @@ package io.akka.integrationtest.http -import akka.actor.ActorSystem -import akka.http.model._ -import akka.http.model.ContentTypes -import akka.http.model.headers._ -import akka.http.model.parser.HeaderParser -import akka.stream.scaladsl2._ -import akka.util.ByteString import com.typesafe.config.{ ConfigFactory, Config } -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import scala.concurrent.Await import scala.concurrent.duration._ +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import akka.util.ByteString +import akka.actor.ActorSystem +import akka.http.model.parser.HeaderParser +import akka.http.model._ +import akka.stream.scaladsl2._ +import headers._ /** * Integration test for external HTTP libraries that are built on top of @@ -88,7 +88,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte // Finally convert the body into an Array[Byte]. - val entityBytes: Array[Byte] = request.entity.toStrict(1.second).await(2.seconds).data.toArray + val entityBytes: Array[Byte] = Await.result(request.entity.toStrict(1.second), 2.seconds).data.toArray entityBytes.to[Seq] shouldEqual ByteString("hello").to[Seq] }