diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 9e8ef37852..3b32869d01 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -13,11 +13,11 @@ import scala.concurrent.Future import akka.event.LoggingAdapter import akka.util.ByteString import akka.io.Inet -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.engine.client.{ HttpClient, ClientConnectionSettings } import akka.http.engine.server.{ HttpServer, ServerSettings } -import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest } +import akka.http.model.{ HttpResponse, HttpRequest } import akka.actor._ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension { @@ -29,7 +29,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def bind(interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, - log: LoggingAdapter = system.log)(implicit fm: ActorFlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = { + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = { val endpoint = new InetSocketAddress(interface, port) val effectiveSettings = ServerSettings(settings) @@ -64,7 +64,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, - log: LoggingAdapter = system.log)(implicit fm: ActorFlowMaterializer): Future[ServerBinding] = { + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = { bind(interface, port, backlog, options, settings, log).toMat(Sink.foreach { conn ⇒ conn.flow.join(handler) })(Keep.left).run() @@ -81,7 +81,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, - log: LoggingAdapter = system.log)(implicit fm: ActorFlowMaterializer): Future[ServerBinding] = + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = bindAndstartHandlingWith(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log) /** @@ -95,7 +95,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, - log: LoggingAdapter = system.log)(implicit fm: ActorFlowMaterializer): Future[ServerBinding] = + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = bindAndstartHandlingWith(Flow[HttpRequest].mapAsync(handler), interface, port, backlog, options, settings, log) /** @@ -103,7 +103,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E */ def serverFlowToTransport[Mat](serverFlow: Flow[HttpRequest, HttpResponse, Mat], settings: Option[ServerSettings] = None, - log: LoggingAdapter = system.log)(implicit mat: ActorFlowMaterializer): Flow[ByteString, ByteString, Mat] = { + log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): Flow[ByteString, ByteString, Mat] = { val effectiveSettings = ServerSettings(settings) val serverBlueprint: Graph[HttpServerPorts, Unit] = HttpServer.serverBlueprint(effectiveSettings, log) @@ -198,23 +198,23 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { /** * Handles the connection with the given flow, which is materialized exactly once - * and the respective [[MaterializedMap]] returned. + * and the respective materialization result returned. */ - def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: ActorFlowMaterializer): Mat = + def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: FlowMaterializer): Mat = flow.join(handler).mapMaterialized(_._2).run() /** * Handles the connection with the given handler function. - * Returns the [[MaterializedMap]] of the underlying flow materialization. + * Returns the materialization result of the underlying flow materialization. */ - def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: ActorFlowMaterializer): Unit = + def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: FlowMaterializer): Unit = handleWith(Flow[HttpRequest].map(handler)) /** * Handles the connection with the given handler function. - * Returns the [[MaterializedMap]] of the underlying flow materialization. + * Returns the materialization result of the underlying flow materialization. */ - def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse])(implicit fm: ActorFlowMaterializer): Unit = + def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse])(implicit fm: FlowMaterializer): Unit = handleWith(Flow[HttpRequest].mapAsync(handler)) } diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index af9403b3d1..2cffcc9683 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -4,16 +4,15 @@ package akka.http.engine.server +import scala.collection.immutable +import scala.util.control.NonFatal +import akka.util.ByteString +import akka.event.LoggingAdapter +import akka.actor.{ ActorRef, Props } +import akka.stream.stage.PushPullStage import akka.stream.scaladsl.OperationAttributes._ import akka.stream.scaladsl._ import akka.stream._ - -import scala.collection.immutable -import scala.util.control.NonFatal -import akka.actor.{ ActorRef, Props } -import akka.util.ByteString -import akka.event.LoggingAdapter -import akka.stream.stage.PushPullStage import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.parsing.ParserOutput._ @@ -50,7 +49,7 @@ private[http] object HttpServer { } def serverBlueprint(settings: ServerSettings, - log: LoggingAdapter)(implicit mat: ActorFlowMaterializer): Graph[HttpServerPorts, Unit] = { + log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[HttpServerPorts, Unit] = { // the initial header parser we initially use for every connection, // will not be mutated, all "shared copy" parsers copy on first-write into the header cache diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 5e40ed5340..b6fde9517a 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -7,19 +7,16 @@ package akka.http.model import language.implicitConversions import java.io.File import java.lang.{ Iterable ⇒ JIterable } -import scala.concurrent.{ Future, ExecutionContext } +import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.collection.immutable -import scala.util.control.NonFatal import akka.util.ByteString import akka.stream.scaladsl.OperationAttributes._ -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.stream.TimerTransformer import akka.http.util._ import japi.JavaMapping.Implicits._ -import scala.util.Success -import scala.util.Failure /** * Models the entity (aka "body" or "content) of an HTTP message. @@ -44,7 +41,7 @@ sealed trait HttpEntity extends japi.HttpEntity { * Collects all possible parts and returns a potentially future Strict entity for easier processing. * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout. */ - def toStrict(timeout: FiniteDuration)(implicit fm: ActorFlowMaterializer): Future[HttpEntity.Strict] = { + def toStrict(timeout: FiniteDuration)(implicit fm: FlowMaterializer): Future[HttpEntity.Strict] = { def transformer() = new TimerTransformer[ByteString, HttpEntity.Strict] { var bytes = ByteString.newBuilder @@ -56,7 +53,7 @@ sealed trait HttpEntity extends japi.HttpEntity { } override def onTermination(e: Option[Throwable]): immutable.Seq[HttpEntity.Strict] = - HttpEntity.Strict(contentType, bytes.result) :: Nil + HttpEntity.Strict(contentType, bytes.result()) :: Nil def onTimer(timerKey: Any): immutable.Seq[HttpEntity.Strict] = throw new java.util.concurrent.TimeoutException( @@ -160,7 +157,7 @@ object HttpEntity { def dataBytes: Source[ByteString, Unit] = Source(data :: Nil) - override def toStrict(timeout: FiniteDuration)(implicit fm: ActorFlowMaterializer) = + override def toStrict(timeout: FiniteDuration)(implicit fm: FlowMaterializer) = FastFuture.successful(this) override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): MessageEntity = diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index e535e889ef..7f9ba4624b 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -5,17 +5,15 @@ package akka.http.model import java.lang.{ Iterable ⇒ JIterable } -import akka.parboiled2.CharUtils - import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import scala.reflect.{ classTag, ClassTag } -import akka.stream.ActorFlowMaterializer +import akka.parboiled2.CharUtils +import akka.stream.FlowMaterializer import akka.util.ByteString import akka.http.util._ import headers._ -import HttpCharsets._ import FastFuture._ /** @@ -51,7 +49,7 @@ sealed trait HttpMessage extends japi.HttpMessage { def withEntity(entity: MessageEntity): Self /** Returns a sharable and serializable copy of this message with a strict entity. */ - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[Self] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Self] = entity.toStrict(timeout).fast.map(this.withEntity) /** Returns a copy of this message with the entity and headers set to the given ones. */ diff --git a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala index 4b5f57264d..f20cf8e0e5 100644 --- a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import scala.util.{ Failure, Success, Try } -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.scaladsl.Source import akka.http.util.FastFuture import akka.http.model.headers._ @@ -24,7 +24,7 @@ trait Multipart { * The given ``timeout`` denotes the max time that an individual part must be read in. * The Future is failed with an TimeoutException if one part isn't read completely after the given timeout. */ - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[Multipart.Strict] + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Multipart.Strict] } object Multipart { @@ -47,7 +47,7 @@ object Multipart { def dispositionType: Option[ContentDispositionType] = contentDispositionHeader.map(_.dispositionType) - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[BodyPart.Strict] + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] } object BodyPart { @@ -56,7 +56,7 @@ object Multipart { } } - private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP, Unit])(f: BP ⇒ Future[BPS])(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[Vector[BPS]] = + private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP, Unit])(f: BP ⇒ Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] = // TODO: move to Vector `:+` when https://issues.scala-lang.org/browse/SI-8930 is fixed parts.runFold(new VectorBuilder[Future[BPS]]) { case (builder, part) ⇒ builder += f(part) @@ -70,7 +70,7 @@ object Multipart { sealed abstract class General extends Multipart { def mediaType: MultipartMediaType def parts: Source[General.BodyPart, Unit] - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[General.Strict] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[General.Strict] = strictify(parts)(_.toStrict(timeout)).fast.map(General.Strict(mediaType, _)) } object General { @@ -90,7 +90,7 @@ object Multipart { */ case class Strict(mediaType: MultipartMediaType, strictParts: immutable.Seq[BodyPart.Strict]) extends General with Multipart.Strict { def parts: Source[BodyPart.Strict, Unit] = Source(strictParts) - override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer) = + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) override def productPrefix = "General.Strict" } @@ -99,7 +99,7 @@ object Multipart { * Body part of the [[General]] model. */ sealed abstract class BodyPart extends Multipart.BodyPart { - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[BodyPart.Strict] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] = entity.toStrict(timeout).map(BodyPart.Strict(_, headers)) def toFormDataBodyPart: Try[FormData.BodyPart] def toByteRangesBodyPart: Try[ByteRanges.BodyPart] @@ -133,7 +133,7 @@ object Multipart { * Strict [[General.BodyPart]]. */ case class Strict(entity: HttpEntity.Strict, headers: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict { - override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[Strict] = + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] = FastFuture.successful(this) override def toFormDataBodyPart: Try[FormData.BodyPart.Strict] = tryCreateFormDataBodyPart(FormData.BodyPart.Strict(_, entity, _, _)) override def toByteRangesBodyPart: Try[ByteRanges.BodyPart.Strict] = tryCreateByteRangesBodyPart(ByteRanges.BodyPart.Strict(_, entity, _, _)) @@ -149,7 +149,7 @@ object Multipart { sealed abstract class FormData extends Multipart { def mediaType = MediaTypes.`multipart/form-data` def parts: Source[FormData.BodyPart, Unit] - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[FormData.Strict] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[FormData.Strict] = strictify(parts)(_.toStrict(timeout)).fast.map(FormData.Strict(_)) } object FormData { @@ -169,7 +169,7 @@ object Multipart { */ case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends FormData with Multipart.Strict { def parts: Source[BodyPart.Strict, Unit] = Source(strictParts) - override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer) = + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) override def productPrefix = "FormData.Strict" } @@ -186,7 +186,7 @@ object Multipart { override def dispositionParams = additionalDispositionParams.updated("name", name) override def dispositionType = Some(ContentDispositionTypes.`form-data`) def filename: Option[String] = additionalDispositionParams.get("filename") - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[BodyPart.Strict] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] = entity.toStrict(timeout).map(BodyPart.Strict(name, _, additionalDispositionParams, additionalHeaders)) } object BodyPart { @@ -210,7 +210,7 @@ object Multipart { case class Strict(name: String, entity: HttpEntity.Strict, additionalDispositionParams: Map[String, String] = Map.empty, additionalHeaders: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict { - override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[Strict] = + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] = FastFuture.successful(this) override def productPrefix = "FormData.BodyPart.Strict" } @@ -224,7 +224,7 @@ object Multipart { sealed abstract class ByteRanges extends Multipart { def mediaType = MediaTypes.`multipart/byteranges` def parts: Source[ByteRanges.BodyPart, Unit] - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[ByteRanges.Strict] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[ByteRanges.Strict] = strictify(parts)(_.toStrict(timeout)).fast.map(ByteRanges.Strict(_)) } object ByteRanges { @@ -241,7 +241,7 @@ object Multipart { */ case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends ByteRanges with Multipart.Strict { def parts: Source[BodyPart.Strict, Unit] = Source(strictParts) - override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer) = + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) override def productPrefix = "ByteRanges.Strict" } @@ -255,7 +255,7 @@ object Multipart { def additionalHeaders: immutable.Seq[HttpHeader] override def headers = contentRangeHeader +: additionalHeaders def contentRangeHeader = `Content-Range`(rangeUnit, contentRange) - def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[BodyPart.Strict] = + def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[BodyPart.Strict] = entity.toStrict(timeout).map(BodyPart.Strict(contentRange, _, rangeUnit, additionalHeaders)) } object BodyPart { @@ -277,7 +277,7 @@ object Multipart { */ case class Strict(contentRange: ContentRange, entity: HttpEntity.Strict, rangeUnit: RangeUnit = RangeUnits.Bytes, additionalHeaders: immutable.Seq[HttpHeader] = Nil) extends BodyPart with Multipart.BodyPart.Strict { - override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: ActorFlowMaterializer): Future[Strict] = + override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Strict] = FastFuture.successful(this) override def productPrefix = "ByteRanges.BodyPart.Strict" } diff --git a/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala b/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala index c1fdcc5b71..100235cdf5 100644 --- a/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala +++ b/akka-http-core/src/main/scala/akka/http/util/FastFuture.scala @@ -26,8 +26,9 @@ class FastFuture[A](val future: Future[A]) extends AnyVal { transformWith(f, FastFuture.failed) def filter(pred: A ⇒ Boolean)(implicit executor: ExecutionContext): Future[A] = - flatMap { - r ⇒ if (pred(r)) future else throw new NoSuchElementException("Future.filter predicate is not satisfied") + flatMap { r ⇒ + if (pred(r)) future + else throw new NoSuchElementException("Future.filter predicate is not satisfied") // FIXME: avoid stack trace generation } def foreach(f: A ⇒ Unit)(implicit ec: ExecutionContext): Unit = map(f) diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 2f3471aef8..abd8f3735f 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -5,22 +5,16 @@ package akka.http.util import java.io.InputStream - import java.util.concurrent.atomic.AtomicBoolean - -import scala.annotation.tailrec +import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } -import scala.util.Try -import akka.actor.Props -import akka.http.model.RequestEntity -import akka.stream.{ ActorFlowMaterializerSettings, ActorFlowMaterializer, impl } -import akka.stream.impl.fusing.IteratorInterpreter -import akka.stream.scaladsl._ -import akka.stream.scaladsl.OperationAttributes._ -import akka.stream.stage._ import akka.util.ByteString -import org.reactivestreams.{ Subscriber, Publisher } +import akka.http.model.RequestEntity +import akka.stream.{ FlowMaterializer, impl } +import akka.stream.scaladsl._ +import akka.stream.stage._ +import OperationAttributes._ /** * INTERNAL API @@ -134,7 +128,7 @@ private[http] object StreamUtils { * Applies a sequence of transformers on one source and returns a sequence of sources with the result. The input source * will only be traversed once. */ - def transformMultiple(input: Source[ByteString, Unit], transformers: immutable.Seq[Flow[ByteString, ByteString, _]])(implicit materializer: ActorFlowMaterializer): immutable.Seq[Source[ByteString, Unit]] = + def transformMultiple(input: Source[ByteString, Unit], transformers: immutable.Seq[Flow[ByteString, ByteString, _]])(implicit materializer: FlowMaterializer): immutable.Seq[Source[ByteString, Unit]] = transformers match { case Nil ⇒ Nil case Seq(one) ⇒ Vector(input.via(one)) @@ -206,8 +200,8 @@ private[http] object StreamUtils { * INTERNAL API */ private[http] class EnhancedByteStringSource[Mat](val byteStringStream: Source[ByteString, Mat]) extends AnyVal { - def join(implicit materializer: ActorFlowMaterializer): Future[ByteString] = + def join(implicit materializer: FlowMaterializer): Future[ByteString] = byteStringStream.runFold(ByteString.empty)(_ ++ _) - def utf8String(implicit materializer: ActorFlowMaterializer, ec: ExecutionContext): Future[String] = + def utf8String(implicit materializer: FlowMaterializer, ec: ExecutionContext): Future[String] = join.map(_.utf8String) } diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index bb2284c94b..b26931c1d7 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -6,10 +6,9 @@ package akka.http import language.implicitConversions import language.higherKinds -import scala.collection.immutable import java.nio.charset.Charset import com.typesafe.config.Config -import akka.stream.{ ActorFlowMaterializer, FlattenStrategy } +import akka.stream.FlattenStrategy import akka.stream.scaladsl.{ Flow, Source } import akka.stream.stage._ import scala.concurrent.duration.Duration diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 12aec9a5cd..910215bd4b 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -6,7 +6,6 @@ package akka.http import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.net.Socket -import akka.stream.impl.{ PublisherSink, SubscriberSource } import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec import scala.concurrent.Await @@ -206,8 +205,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = { connSourceSub.request(1) val incomingConnection = connSource.expectNext() - val sink = Sink.publisher[HttpRequest] - val source = Source.subscriber[HttpResponse] + val sink = Sink.publisher[HttpRequest]() + val source = Source.subscriber[HttpResponse]() val handler = Flow(sink, source)(Keep.both) { implicit b ⇒ (snk, src) ⇒ diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 23892d46e4..3edf2a2fbc 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -8,7 +8,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import scala.util.{ Failure, Success } import akka.actor.ActorSystem import akka.stream.ActorFlowMaterializer -import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.scaladsl.{ Sink, Source } import akka.http.model._ object TestClient extends App { diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index 5672903efe..810accf621 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -7,7 +7,6 @@ package akka.http import akka.actor.ActorSystem import akka.http.model._ import akka.stream.ActorFlowMaterializer -import akka.stream.scaladsl.Flow import com.typesafe.config.{ ConfigFactory, Config } import HttpMethods._ diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index 6300538085..6edbea7211 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -18,10 +18,8 @@ import akka.http.util._ import akka.stream.scaladsl._ import akka.stream.scaladsl.OperationAttributes._ import akka.stream.ActorFlowMaterializer -import akka.stream.impl.SynchronousIterablePublisher import HttpEntity._ import HttpMethods._ -import akka.util.ByteString class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index 686982c669..d7a80af06b 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -6,8 +6,6 @@ package akka.http.model import java.util.concurrent.TimeoutException import com.typesafe.config.{ ConfigFactory, Config } -import org.reactivestreams.Publisher -import scala.collection.immutable import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } diff --git a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala index 4f198f7573..6ec2e8b29d 100644 --- a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala @@ -9,12 +9,12 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.util.ByteString +import akka.stream.ActorFlowMaterializer import akka.actor.ActorSystem import akka.http.model.parser.HeaderParser import akka.http.model._ import akka.stream.scaladsl._ import headers._ -import akka.stream.ActorFlowMaterializer /** * Integration test for external HTTP libraries that are built on top of diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala index 7a29288227..ab08cbc6e1 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/MarshallingTestUtils.scala @@ -4,25 +4,23 @@ package akka.http.testkit -import akka.http.unmarshalling.{ Unmarshal, FromEntityUnmarshaller } - import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Await } - +import akka.http.unmarshalling.{ Unmarshal, FromEntityUnmarshaller } import akka.http.marshalling._ import akka.http.model.HttpEntity -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import scala.util.Try trait MarshallingTestUtils { - def marshal[T: ToEntityMarshaller](value: T)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): HttpEntity.Strict = + def marshal[T: ToEntityMarshaller](value: T)(implicit ec: ExecutionContext, mat: FlowMaterializer): HttpEntity.Strict = Await.result(Marshal(value).to[HttpEntity].flatMap(_.toStrict(1.second)), 1.second) - def unmarshalValue[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): T = + def unmarshalValue[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: FlowMaterializer): T = unmarshal(entity).get - def unmarshal[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Try[T] = { + def unmarshal[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: FlowMaterializer): Try[T] = { val fut = Unmarshal(entity).to[T] Await.ready(fut, 1.second) fut.value.get diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index af6bea72cd..7392a5f188 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -9,7 +9,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.ExecutionContext import akka.http.util._ -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.model.HttpEntity.ChunkStreamPart import akka.http.server._ @@ -22,7 +22,7 @@ trait RouteTestResultComponent { /** * A receptacle for the response or rejections created by a route. */ - class RouteTestResult(timeout: FiniteDuration)(implicit fm: ActorFlowMaterializer) { + class RouteTestResult(timeout: FiniteDuration)(implicit fm: FlowMaterializer) { private[this] var result: Option[Either[immutable.Seq[Rejection], HttpResponse]] = None private[this] val latch = new CountDownLatch(1) diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala index b61c33bf65..c6b6db209f 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/ScalatestUtils.scala @@ -4,15 +4,14 @@ package akka.http.testkit -import akka.http.model.HttpEntity -import akka.http.unmarshalling.FromEntityUnmarshaller -import akka.stream.ActorFlowMaterializer +import scala.util.Try +import scala.concurrent.{ ExecutionContext, Future, Await } +import scala.concurrent.duration._ import org.scalatest.Suite import org.scalatest.matchers.Matcher - -import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future, Await } -import scala.util.Try +import akka.http.model.HttpEntity +import akka.http.unmarshalling.FromEntityUnmarshaller +import akka.stream.FlowMaterializer trait ScalatestUtils extends MarshallingTestUtils { import org.scalatest.Matchers._ @@ -22,10 +21,10 @@ trait ScalatestUtils extends MarshallingTestUtils { def haveFailedWith(t: Throwable): Matcher[Future[_]] = equal(t).matcher[Throwable] compose (x ⇒ Await.result(x.failed, 1.second)) - def unmarshalToValue[T: FromEntityUnmarshaller](value: T)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Matcher[HttpEntity] = + def unmarshalToValue[T: FromEntityUnmarshaller](value: T)(implicit ec: ExecutionContext, mat: FlowMaterializer): Matcher[HttpEntity] = equal(value).matcher[T] compose (unmarshalValue(_)) - def unmarshalTo[T: FromEntityUnmarshaller](value: Try[T])(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Matcher[HttpEntity] = + def unmarshalTo[T: FromEntityUnmarshaller](value: Try[T])(implicit ec: ExecutionContext, mat: FlowMaterializer): Matcher[HttpEntity] = equal(value).matcher[Try[T]] compose (unmarshal(_)) } diff --git a/akka-http/src/main/scala/akka/http/coding/Decoder.scala b/akka-http/src/main/scala/akka/http/coding/Decoder.scala index dad980dfe0..6d7440f7d8 100644 --- a/akka-http/src/main/scala/akka/http/coding/Decoder.scala +++ b/akka-http/src/main/scala/akka/http/coding/Decoder.scala @@ -5,8 +5,7 @@ package akka.http.coding import akka.http.model._ -import akka.http.util.StreamUtils -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.stage.Stage import akka.util.ByteString import headers.HttpEncoding @@ -28,7 +27,7 @@ trait Decoder { def withMaxBytesPerChunk(maxBytesPerChunk: Int): Decoder def decoderFlow: Flow[ByteString, ByteString, Unit] - def decode(input: ByteString)(implicit mat: ActorFlowMaterializer): Future[ByteString] = + def decode(input: ByteString)(implicit mat: FlowMaterializer): Future[ByteString] = Source.single(input).via(decoderFlow).runWith(Sink.head()) } object Decoder { diff --git a/akka-http/src/main/scala/akka/http/common/StrictForm.scala b/akka-http/src/main/scala/akka/http/common/StrictForm.scala index cd2acb674e..15842162e7 100644 --- a/akka-http/src/main/scala/akka/http/common/StrictForm.scala +++ b/akka-http/src/main/scala/akka/http/common/StrictForm.scala @@ -8,7 +8,7 @@ import scala.annotation.implicitNotFound import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration._ -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.http.util.FastFuture import akka.http.unmarshalling._ import akka.http.model._ @@ -87,7 +87,7 @@ object StrictForm { implicit def unmarshaller(implicit formDataUM: FromEntityUnmarshaller[FormData], multipartUM: FromEntityUnmarshaller[Multipart.FormData], - ec: ExecutionContext, fm: ActorFlowMaterializer): FromEntityUnmarshaller[StrictForm] = { + ec: ExecutionContext, fm: FlowMaterializer): FromEntityUnmarshaller[StrictForm] = { def tryUnmarshalToQueryForm(entity: HttpEntity): Future[StrictForm] = for (formData ← formDataUM(entity).fast) yield { diff --git a/akka-http/src/main/scala/akka/http/marshalling/Marshaller.scala b/akka-http/src/main/scala/akka/http/marshalling/Marshaller.scala index db8594f7f7..5a0007edbe 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/Marshaller.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/Marshaller.scala @@ -68,14 +68,14 @@ object Marshaller /** * Helper for creating a "super-marshaller" from a number of "sub-marshallers". - * Content-negotiation determines, which "sub-marshallers" eventually gets to do the job. + * Content-negotiation determines, which "sub-marshaller" eventually gets to do the job. */ def oneOf[A, B](marshallers: Marshaller[A, B]*)(implicit ec: ExecutionContext): Marshaller[A, B] = Marshaller { a ⇒ FastFuture.sequence(marshallers.map(_(a))).fast.map(_.flatten.toList) } /** * Helper for creating a "super-marshaller" from a number of values and a function producing "sub-marshallers" - * from these values. Content-negotiation determines, which "sub-marshallers" eventually gets to do the job. + * from these values. Content-negotiation determines, which "sub-marshaller" eventually gets to do the job. */ def oneOf[T, A, B](values: T*)(f: T ⇒ Marshaller[A, B])(implicit ec: ExecutionContext): Marshaller[A, B] = oneOf(values map f: _*) diff --git a/akka-http/src/main/scala/akka/http/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/server/RequestContext.scala index a38a271764..aa084a44b3 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContext.scala @@ -4,9 +4,8 @@ package akka.http.server -import akka.stream.ActorFlowMaterializer - import scala.concurrent.{ Future, ExecutionContext } +import akka.stream.FlowMaterializer import akka.event.LoggingAdapter import akka.http.marshalling.ToResponseMarshallable import akka.http.model._ @@ -31,7 +30,7 @@ trait RequestContext { /** * The default FlowMaterializer. */ - implicit def flowMaterializer: ActorFlowMaterializer + implicit def flowMaterializer: FlowMaterializer /** * The default LoggingAdapter to be used for logging messages related to this request. @@ -48,7 +47,7 @@ trait RequestContext { */ def reconfigure( executionContext: ExecutionContext = executionContext, - flowMaterializer: ActorFlowMaterializer = flowMaterializer, + flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, settings: RoutingSettings = settings): RequestContext @@ -82,7 +81,7 @@ trait RequestContext { /** * Returns a copy of this context with the new HttpRequest. */ - def withFlowMaterializer(materializer: ActorFlowMaterializer): RequestContext + def withFlowMaterializer(materializer: FlowMaterializer): RequestContext /** * Returns a copy of this context with the new LoggingAdapter. diff --git a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala index d0a9a85627..27826c4d39 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala @@ -4,9 +4,8 @@ package akka.http.server -import akka.stream.ActorFlowMaterializer - import scala.concurrent.{ Future, ExecutionContext } +import akka.stream.FlowMaterializer import akka.event.LoggingAdapter import akka.http.marshalling.{ Marshal, ToResponseMarshallable } import akka.http.util.FastFuture @@ -20,14 +19,14 @@ private[http] class RequestContextImpl( val request: HttpRequest, val unmatchedPath: Uri.Path, val executionContext: ExecutionContext, - val flowMaterializer: ActorFlowMaterializer, + val flowMaterializer: FlowMaterializer, val log: LoggingAdapter, val settings: RoutingSettings) extends RequestContext { - def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: ActorFlowMaterializer) = + def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: FlowMaterializer) = this(request, request.uri.path, ec, materializer, log, settings) - def reconfigure(executionContext: ExecutionContext, flowMaterializer: ActorFlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext = + def reconfigure(executionContext: ExecutionContext, flowMaterializer: FlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext = copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log, settings = settings) override def complete(trm: ToResponseMarshallable): Future[RouteResult] = @@ -51,7 +50,7 @@ private[http] class RequestContextImpl( override def withExecutionContext(executionContext: ExecutionContext): RequestContext = if (executionContext != this.executionContext) copy(executionContext = executionContext) else this - override def withFlowMaterializer(flowMaterializer: ActorFlowMaterializer): RequestContext = + override def withFlowMaterializer(flowMaterializer: FlowMaterializer): RequestContext = if (flowMaterializer != this.flowMaterializer) copy(flowMaterializer = flowMaterializer) else this override def withLog(log: LoggingAdapter): RequestContext = @@ -85,7 +84,7 @@ private[http] class RequestContextImpl( private def copy(request: HttpRequest = request, unmatchedPath: Uri.Path = unmatchedPath, executionContext: ExecutionContext = executionContext, - flowMaterializer: ActorFlowMaterializer = flowMaterializer, + flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, settings: RoutingSettings = settings) = new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, settings) diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala index 1bcaa4f87e..5d4f399272 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala @@ -7,7 +7,7 @@ package akka.http.server import scala.concurrent.ExecutionContext import akka.event.LoggingAdapter import akka.actor.{ ActorSystem, ActorContext } -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.http.Http import akka.http.model.HttpRequest @@ -34,12 +34,12 @@ class RoutingSetup( val exceptionHandler: ExceptionHandler, val rejectionHandler: RejectionHandler, val executionContext: ExecutionContext, - val flowMaterializer: ActorFlowMaterializer, + val flowMaterializer: FlowMaterializer, val routingLog: RoutingLog) { // enable `import setup._` to properly bring implicits in scope implicit def executor: ExecutionContext = executionContext - implicit def materializer: ActorFlowMaterializer = flowMaterializer + implicit def materializer: FlowMaterializer = flowMaterializer } object RoutingSetup { @@ -47,7 +47,7 @@ object RoutingSetup { exceptionHandler: ExceptionHandler = null, rejectionHandler: RejectionHandler = null, executionContext: ExecutionContext, - flowMaterializer: ActorFlowMaterializer, + flowMaterializer: FlowMaterializer, routingLog: RoutingLog): RoutingSetup = new RoutingSetup( routingSettings, diff --git a/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala index ab680f571c..1bd38078df 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/BasicDirectives.scala @@ -5,11 +5,10 @@ package akka.http.server package directives -import akka.event.LoggingAdapter -import akka.stream.ActorFlowMaterializer - import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable +import akka.event.LoggingAdapter +import akka.stream.FlowMaterializer import akka.http.server.util.Tuple import akka.http.util.FastFuture import akka.http.model._ @@ -144,13 +143,13 @@ trait BasicDirectives { /** * Runs its inner route with the given alternative [[FlowMaterializer]]. */ - def withFlowMaterializer(materializer: ActorFlowMaterializer): Directive0 = + def withFlowMaterializer(materializer: FlowMaterializer): Directive0 = mapRequestContext(_ withFlowMaterializer materializer) /** * Extracts the [[ExecutionContext]] from the [[RequestContext]]. */ - def extractFlowMaterializer: Directive1[ActorFlowMaterializer] = BasicDirectives._extractFlowMaterializer + def extractFlowMaterializer: Directive1[FlowMaterializer] = BasicDirectives._extractFlowMaterializer /** * Runs its inner route with the given alternative [[LoggingAdapter]]. @@ -193,7 +192,7 @@ object BasicDirectives extends BasicDirectives { private val _extractRequest: Directive1[HttpRequest] = extract(_.request) private val _extractUri: Directive1[Uri] = extract(_.request.uri) private val _extractExecutionContext: Directive1[ExecutionContext] = extract(_.executionContext) - private val _extractFlowMaterializer: Directive1[ActorFlowMaterializer] = extract(_.flowMaterializer) + private val _extractFlowMaterializer: Directive1[FlowMaterializer] = extract(_.flowMaterializer) private val _extractLog: Directive1[LoggingAdapter] = extract(_.log) private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings) private val _extractRequestContext: Directive1[RequestContext] = extract(akka.http.util.identityFunc) diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index 33fc181665..2ad0124efe 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -5,24 +5,24 @@ package akka.http.unmarshalling import scala.concurrent.ExecutionContext -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.util.ByteString import akka.http.util.FastFuture import akka.http.model._ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { - implicit def byteStringUnmarshaller(implicit fm: ActorFlowMaterializer): FromEntityUnmarshaller[ByteString] = + implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = Unmarshaller { case HttpEntity.Strict(_, data) ⇒ FastFuture.successful(data) case entity ⇒ entity.dataBytes.runFold(ByteString.empty)(_ ++ _) } - implicit def byteArrayUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[Array[Byte]] = byteStringUnmarshaller.map(_.toArray[Byte]) - implicit def charArrayUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def charArrayUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[Array[Char]] = byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒ val charBuffer = entity.contentType.charset.nioCharset.decode(bytes.asByteBuffer) @@ -31,17 +31,17 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { array } - implicit def stringUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def stringUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[String] = byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒ // FIXME: add `ByteString::decodeString(java.nio.Charset): String` overload!!! bytes.decodeString(entity.contentType.charset.nioCharset.name) // ouch!!! } - implicit def defaultUrlEncodedFormDataUnmarshaller(implicit fm: ActorFlowMaterializer, + implicit def defaultUrlEncodedFormDataUnmarshaller(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[FormData] = urlEncodedFormDataUnmarshaller(MediaTypes.`application/x-www-form-urlencoded`) - def urlEncodedFormDataUnmarshaller(ranges: ContentTypeRange*)(implicit fm: ActorFlowMaterializer, + def urlEncodedFormDataUnmarshaller(ranges: ContentTypeRange*)(implicit fm: FlowMaterializer, ec: ExecutionContext): FromEntityUnmarshaller[FormData] = stringUnmarshaller.forContentTypes(ranges: _*).mapWithInput { (entity, string) ⇒ try { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 13d45fa6db..bb61252f14 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -12,7 +12,7 @@ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ -import scala.concurrent.{Promise, Future} +import scala.concurrent.{ Promise, Future } import scala.concurrent.duration.FiniteDuration import scala.language.higherKinds import scala.language.implicitConversions