diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala index 7a29288227..ab08cbc6e1 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala @@ -4,25 +4,23 @@ package akka.http.testkit -import akka.http.unmarshalling.{ Unmarshal, FromEntityUnmarshaller } - import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Await } - +import akka.http.unmarshalling.{ Unmarshal, FromEntityUnmarshaller } import akka.http.marshalling._ import akka.http.model.HttpEntity -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import scala.util.Try trait MarshallingTestUtils { - def marshal[T: ToEntityMarshaller](value: T)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): HttpEntity.Strict = + def marshal[T: ToEntityMarshaller](value: T)(implicit ec: ExecutionContext, mat: FlowMaterializer): HttpEntity.Strict = Await.result(Marshal(value).to[HttpEntity].flatMap(_.toStrict(1.second)), 1.second) - def unmarshalValue[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): T = + def unmarshalValue[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: FlowMaterializer): T = unmarshal(entity).get - def unmarshal[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Try[T] = { + def unmarshal[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: FlowMaterializer): Try[T] = { val fut = Unmarshal(entity).to[T] Await.ready(fut, 1.second) fut.value.get diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index af6bea72cd..7392a5f188 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -9,7 +9,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.ExecutionContext import akka.http.util._ -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.model.HttpEntity.ChunkStreamPart import akka.http.server._ @@ -22,7 +22,7 @@ trait RouteTestResultComponent { /** * A receptacle for the response or rejections created by a route. */ - class RouteTestResult(timeout: FiniteDuration)(implicit fm: ActorFlowMaterializer) { + class RouteTestResult(timeout: FiniteDuration)(implicit fm: FlowMaterializer) { private[this] var result: Option[Either[immutable.Seq[Rejection], HttpResponse]] = None private[this] val latch = new CountDownLatch(1) diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala index b61c33bf65..c6b6db209f 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala @@ -4,15 +4,14 @@ package akka.http.testkit -import akka.http.model.HttpEntity -import akka.http.unmarshalling.FromEntityUnmarshaller -import akka.stream.ActorFlowMaterializer +import scala.util.Try +import scala.concurrent.{ ExecutionContext, Future, Await } +import scala.concurrent.duration._ import org.scalatest.Suite import org.scalatest.matchers.Matcher - -import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future, Await } -import scala.util.Try +import akka.http.model.HttpEntity +import akka.http.unmarshalling.FromEntityUnmarshaller +import akka.stream.FlowMaterializer trait ScalatestUtils extends MarshallingTestUtils { import org.scalatest.Matchers._ @@ -22,10 +21,10 @@ trait ScalatestUtils extends MarshallingTestUtils { def haveFailedWith(t: Throwable): Matcher[Future[_]] = equal(t).matcher[Throwable] compose (x ⇒ Await.result(x.failed, 1.second)) - def unmarshalToValue[T: FromEntityUnmarshaller](value: T)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Matcher[HttpEntity] = + def unmarshalToValue[T: FromEntityUnmarshaller](value: T)(implicit ec: ExecutionContext, mat: FlowMaterializer): Matcher[HttpEntity] = equal(value).matcher[T] compose (unmarshalValue(_)) - def unmarshalTo[T: FromEntityUnmarshaller](value: Try[T])(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Matcher[HttpEntity] = + def unmarshalTo[T: FromEntityUnmarshaller](value: Try[T])(implicit ec: ExecutionContext, mat: FlowMaterializer): Matcher[HttpEntity] = equal(value).matcher[Try[T]] compose (unmarshal(_)) } diff --git a/akka-http/src/main/scala/akka/http/coding/Decoder.scala b/akka-http/src/main/scala/akka/http/coding/Decoder.scala index dad980dfe0..6d7440f7d8 100644 --- a/akka-http/src/main/scala/akka/http/coding/Decoder.scala +++ b/akka-http/src/main/scala/akka/http/coding/Decoder.scala @@ -5,8 +5,7 @@ package akka.http.coding import akka.http.model._ -import akka.http.util.StreamUtils -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.stage.Stage import akka.util.ByteString import headers.HttpEncoding @@ -28,7 +27,7 @@ trait Decoder { def withMaxBytesPerChunk(maxBytesPerChunk: Int): Decoder def decoderFlow: Flow[ByteString, ByteString, Unit] - def decode(input: ByteString)(implicit mat: ActorFlowMaterializer): Future[ByteString] = + def decode(input: ByteString)(implicit mat: FlowMaterializer): Future[ByteString] = Source.single(input).via(decoderFlow).runWith(Sink.head()) } object Decoder { diff --git a/akka-http/src/main/scala/akka/http/common/StrictForm.scala b/akka-http/src/main/scala/akka/http/common/StrictForm.scala index cd2acb674e..15842162e7 100644 --- a/akka-http/src/main/scala/akka/http/common/StrictForm.scala +++ b/akka-http/src/main/scala/akka/http/common/StrictForm.scala @@ -8,7 +8,7 @@ import scala.annotation.implicitNotFound import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration._ -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.http.util.FastFuture import akka.http.unmarshalling._ import akka.http.model._ @@ -87,7 +87,7 @@ object StrictForm { implicit def unmarshaller(implicit formDataUM: FromEntityUnmarshaller[FormData], multipartUM: FromEntityUnmarshaller[Multipart.FormData], - ec: ExecutionContext, fm: ActorFlowMaterializer): FromEntityUnmarshaller[StrictForm] = { + ec: ExecutionContext, fm: FlowMaterializer): FromEntityUnmarshaller[StrictForm] = { def tryUnmarshalToQueryForm(entity: HttpEntity): Future[StrictForm] = for (formData ← formDataUM(entity).fast) yield { diff --git a/akka-http/src/main/scala/akka/http/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/server/RequestContext.scala index a38a271764..aa084a44b3 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContext.scala @@ -4,9 +4,8 @@ package akka.http.server -import akka.stream.ActorFlowMaterializer - import scala.concurrent.{ Future, ExecutionContext } +import akka.stream.FlowMaterializer import akka.event.LoggingAdapter import akka.http.marshalling.ToResponseMarshallable import akka.http.model._ @@ -31,7 +30,7 @@ trait RequestContext { /** * The default FlowMaterializer. */ - implicit def flowMaterializer: ActorFlowMaterializer + implicit def flowMaterializer: FlowMaterializer /** * The default LoggingAdapter to be used for logging messages related to this request. @@ -48,7 +47,7 @@ trait RequestContext { */ def reconfigure( executionContext: ExecutionContext = executionContext, - flowMaterializer: ActorFlowMaterializer = flowMaterializer, + flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, settings: RoutingSettings = settings): RequestContext @@ -82,7 +81,7 @@ trait RequestContext { /** * Returns a copy of this context with the new HttpRequest. */ - def withFlowMaterializer(materializer: ActorFlowMaterializer): RequestContext + def withFlowMaterializer(materializer: FlowMaterializer): RequestContext /** * Returns a copy of this context with the new LoggingAdapter. diff --git a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala index d0a9a85627..27826c4d39 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala @@ -4,9 +4,8 @@ package akka.http.server -import akka.stream.ActorFlowMaterializer - import scala.concurrent.{ Future, ExecutionContext } +import akka.stream.FlowMaterializer import akka.event.LoggingAdapter import akka.http.marshalling.{ Marshal, ToResponseMarshallable } import akka.http.util.FastFuture @@ -20,14 +19,14 @@ private[http] class RequestContextImpl( val request: HttpRequest, val unmatchedPath: Uri.Path, val executionContext: ExecutionContext, - val flowMaterializer: ActorFlowMaterializer, + val flowMaterializer: FlowMaterializer, val log: LoggingAdapter, val settings: RoutingSettings) extends RequestContext { - def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: ActorFlowMaterializer) = + def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: FlowMaterializer) = this(request, request.uri.path, ec, materializer, log, settings) - def reconfigure(executionContext: ExecutionContext, flowMaterializer: ActorFlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext = + def reconfigure(executionContext: ExecutionContext, flowMaterializer: FlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext = copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log, settings = settings) override def complete(trm: ToResponseMarshallable): Future[RouteResult] = @@ -51,7 +50,7 @@ private[http] class RequestContextImpl( override def withExecutionContext(executionContext: ExecutionContext): RequestContext = if (executionContext != this.executionContext) copy(executionContext = executionContext) else this - override def withFlowMaterializer(flowMaterializer: ActorFlowMaterializer): RequestContext = + override def withFlowMaterializer(flowMaterializer: FlowMaterializer): RequestContext = if (flowMaterializer != this.flowMaterializer) copy(flowMaterializer = flowMaterializer) else this override def withLog(log: LoggingAdapter): RequestContext = @@ -85,7 +84,7 @@ private[http] class RequestContextImpl( private def copy(request: HttpRequest = request, unmatchedPath: Uri.Path = unmatchedPath, executionContext: ExecutionContext = executionContext, - flowMaterializer: ActorFlowMaterializer = flowMaterializer, + flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, settings: RoutingSettings = settings) = new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, settings) diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala index 1bcaa4f87e..5d4f399272 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala @@ -7,7 +7,7 @@ package akka.http.server import scala.concurrent.ExecutionContext import akka.event.LoggingAdapter import akka.actor.{ ActorSystem, ActorContext } -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.http.Http import akka.http.model.HttpRequest @@ -34,12 +34,12 @@ class RoutingSetup( val exceptionHandler: ExceptionHandler, val rejectionHandler: RejectionHandler, val executionContext: ExecutionContext, - val flowMaterializer: ActorFlowMaterializer, + val flowMaterializer: FlowMaterializer, val routingLog: RoutingLog) { // enable `import setup._` to properly bring implicits in scope implicit def executor: ExecutionContext = executionContext - implicit def materializer: ActorFlowMaterializer = flowMaterializer + implicit def materializer: FlowMaterializer = flowMaterializer } object RoutingSetup { @@ -47,7 +47,7 @@ object RoutingSetup { exceptionHandler: ExceptionHandler = null, rejectionHandler: RejectionHandler = null, executionContext: ExecutionContext, - flowMaterializer: ActorFlowMaterializer, + flowMaterializer: FlowMaterializer, routingLog: RoutingLog): RoutingSetup = new RoutingSetup( routingSettings, diff --git a/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala index ab680f571c..1bd38078df 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala @@ -5,11 +5,10 @@ package akka.http.server package directives -import akka.event.LoggingAdapter -import akka.stream.ActorFlowMaterializer - import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable +import akka.event.LoggingAdapter +import akka.stream.FlowMaterializer import akka.http.server.util.Tuple import akka.http.util.FastFuture import akka.http.model._ @@ -144,13 +143,13 @@ trait BasicDirectives { /** * Runs its inner route with the given alternative [[FlowMaterializer]]. */ - def withFlowMaterializer(materializer: ActorFlowMaterializer): Directive0 = + def withFlowMaterializer(materializer: FlowMaterializer): Directive0 = mapRequestContext(_ withFlowMaterializer materializer) /** * Extracts the [[ExecutionContext]] from the [[RequestContext]]. */ - def extractFlowMaterializer: Directive1[ActorFlowMaterializer] = BasicDirectives._extractFlowMaterializer + def extractFlowMaterializer: Directive1[FlowMaterializer] = BasicDirectives._extractFlowMaterializer /** * Runs its inner route with the given alternative [[LoggingAdapter]]. @@ -193,7 +192,7 @@ object BasicDirectives extends BasicDirectives { private val _extractRequest: Directive1[HttpRequest] = extract(_.request) private val _extractUri: Directive1[Uri] = extract(_.request.uri) private val _extractExecutionContext: Directive1[ExecutionContext] = extract(_.executionContext) - private val _extractFlowMaterializer: Directive1[ActorFlowMaterializer] = extract(_.flowMaterializer) + private val _extractFlowMaterializer: Directive1[FlowMaterializer] = extract(_.flowMaterializer) private val _extractLog: Directive1[LoggingAdapter] = extract(_.log) private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings) private val _extractRequestContext: Directive1[RequestContext] = extract(akka.http.util.identityFunc) diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index 33fc181665..2ad0124efe 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -5,24 +5,24 @@ package akka.http.unmarshalling import scala.concurrent.ExecutionContext -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.util.ByteString import akka.http.util.FastFuture import akka.http.model._ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { - implicit def byteStringUnmarshaller(implicit fm: ActorFlowMaterializer): FromEntityUnmarshaller[ByteString] = + implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = Unmarshaller { case HttpEntity.Strict(_, data) ⇒ FastFuture.successful(data) case entity ⇒ entity.dataBytes.runFold(ByteString.empty)(_ ++ _) } - implicit def byteArrayUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[Array[Byte]] = byteStringUnmarshaller.map(_.toArray[Byte]) - implicit def charArrayUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def charArrayUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[Array[Char]] = byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒ val charBuffer = entity.contentType.charset.nioCharset.decode(bytes.asByteBuffer) @@ -31,17 +31,17 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { array } - implicit def stringUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def stringUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[String] = byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒ // FIXME: add `ByteString::decodeString(java.nio.Charset): String` overload!!! bytes.decodeString(entity.contentType.charset.nioCharset.name) // ouch!!! } - implicit def defaultUrlEncodedFormDataUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def defaultUrlEncodedFormDataUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[FormData] = urlEncodedFormDataUnmarshaller(MediaTypes.`application/x-www-form-urlencoded`) - def urlEncodedFormDataUnmarshaller(ranges: ContentTypeRange*)(implicit fm: ActorFlowMaterializer, + def urlEncodedFormDataUnmarshaller(ranges: ContentTypeRange*)(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[FormData] = stringUnmarshaller.forContentTypes(ranges: _*).mapWithInput { (entity, string) ⇒ try {