diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java index c7bcfbcad9..58a5e196cc 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java @@ -7,9 +7,7 @@ package akka.http.model.japi; import java.io.File; import akka.util.ByteString; -import org.reactivestreams.Publisher; - -import akka.stream.FlowMaterializer; +import akka.stream.scaladsl2.Source; import akka.http.model.HttpEntity$; /** Constructors for HttpEntity instances */ @@ -44,27 +42,21 @@ public final class HttpEntities { return HttpEntity$.MODULE$.apply((akka.http.model.ContentType) contentType, file); } - public static HttpEntityDefault create(ContentType contentType, long contentLength, Publisher data) { + public static HttpEntityDefault create(ContentType contentType, long contentLength, Source data) { return new akka.http.model.HttpEntity.Default((akka.http.model.ContentType) contentType, contentLength, data); } - public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Publisher data) { + public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Source data) { return new akka.http.model.HttpEntity.CloseDelimited((akka.http.model.ContentType) contentType, data); } - public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Publisher data) { + public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Source data) { return new akka.http.model.HttpEntity.IndefiniteLength((akka.http.model.ContentType) contentType, data); } - public static HttpEntityChunked createChunked(ContentType contentType, Publisher chunks) { - return new akka.http.model.HttpEntity.Chunked( - (akka.http.model.ContentType) contentType, - Util.upcastPublisher(chunks)); - } - - public static HttpEntityChunked createChunked(ContentType contentType, Publisher data, FlowMaterializer materializer) { + public static HttpEntityChunked createChunked(ContentType contentType, Source data) { return akka.http.model.HttpEntity.Chunked$.MODULE$.fromData( (akka.http.model.ContentType) contentType, - data, materializer); + data); } } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java index b8bed69fab..6d8f25cc8c 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java @@ -5,9 +5,8 @@ package akka.http.model.japi; import akka.http.model.HttpEntity$; -import akka.stream.FlowMaterializer; +import akka.stream.scaladsl2.Source; import akka.util.ByteString; -import org.reactivestreams.Publisher; /** * Represents the entity of an Http message. An entity consists of the content-type of the data @@ -74,5 +73,5 @@ public interface HttpEntity { /** * Returns a stream of data bytes this entity consists of. */ - public abstract Publisher getDataBytes(FlowMaterializer materializer); + public abstract Source getDataBytes(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java index 00e6d38816..f0243499a8 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java @@ -4,12 +4,12 @@ package akka.http.model.japi; -import org.reactivestreams.Publisher; +import akka.stream.scaladsl2.Source; /** * Represents an entity transferred using `Transfer-Encoding: chunked`. It consists of a * stream of {@link ChunkStreamPart}. */ public abstract class HttpEntityChunked implements RequestEntity, ResponseEntity { - public abstract Publisher getChunks(); + public abstract Source getChunks(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java index 9abde7b677..6127f263fa 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java @@ -5,7 +5,7 @@ package akka.http.model.japi; import akka.util.ByteString; -import org.reactivestreams.Publisher; +import akka.stream.scaladsl2.Source; /** * Represents an entity without a predetermined content-length. Its length is implicitly @@ -13,5 +13,5 @@ import org.reactivestreams.Publisher; * available for Http responses. */ public abstract class HttpEntityCloseDelimited implements ResponseEntity { - public abstract Publisher data(); + public abstract Source data(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java index e09ebd1a68..47b7df52b5 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java @@ -5,12 +5,12 @@ package akka.http.model.japi; import akka.util.ByteString; -import org.reactivestreams.Publisher; +import akka.stream.scaladsl2.Source; /** * The default entity type which has a predetermined length and a stream of data bytes. */ public abstract class HttpEntityDefault implements BodyPartEntity, RequestEntity, ResponseEntity { public abstract long contentLength(); - public abstract Publisher data(); + public abstract Source data(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java index d831ae9297..f79c3e8d55 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java @@ -5,11 +5,11 @@ package akka.http.model.japi; import akka.util.ByteString; -import org.reactivestreams.Publisher; +import akka.stream.scaladsl2.Source; /** * Represents an entity without a predetermined content-length to use in a BodyParts. */ public abstract class HttpEntityIndefiniteLength implements BodyPartEntity { - public abstract Publisher data(); + public abstract Source data(); } \ No newline at end of file diff --git a/akka-http-core/src/main/java/akka/http/model/japi/Util.java b/akka-http-core/src/main/java/akka/http/model/japi/Util.java index 09cabe14fe..332d958cc7 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/Util.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/Util.java @@ -4,15 +4,12 @@ package akka.http.model.japi; -import akka.http.model.*; import akka.http.util.ObjectRegistry; import akka.japi.Option; -import org.reactivestreams.Publisher; -import scala.None; import scala.None$; -import scala.NotImplementedError; import scala.collection.immutable.Map$; import scala.collection.immutable.Seq; +import akka.stream.scaladsl2.Source; import java.util.Arrays; import java.util.Map; @@ -30,12 +27,12 @@ public abstract class Util { @SuppressWarnings("unchecked") // no support for covariance of Publisher in Java // needed to provide covariant conversions that the Java interfaces don't provide automatically. // The alternative would be having to cast around everywhere instead of doing it here in a central place. - public static Publisher convertPublisher(Publisher p) { - return (Publisher)(Publisher) p; + public static Source convertPublisher(Source p) { + return (Source)(Source) p; } @SuppressWarnings("unchecked") - public static Publisher upcastPublisher(Publisher p) { - return (Publisher)(Publisher) p; + public static Source upcastSource(Source p) { + return (Source)(Source) p; } @SuppressWarnings("unchecked") public static scala.collection.immutable.Map convertMapToScala(Map map) { diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 5f77c6b594..2100c3e9ac 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -66,17 +66,14 @@ object Http extends ExtensionKey[HttpExt] { // // case object SetupRequestChannel extends SetupOutgoingChannel - sealed trait OutgoingChannel { - def processor[T]: HttpClientProcessor[T] - } - /** * An `OutgoingHttpChannel` with a single outgoing HTTP connection as the underlying transport. */ + // FIXME: hook up with new style IO final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, - untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel { - def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] + responsePublisher: Publisher[(HttpResponse, Any)], + requestSubscriber: Subscriber[(HttpRequest, Any)]) { } // PREVIEW OF COMING API HERE: diff --git a/akka-http-core/src/main/scala/akka/http/HttpManager.scala b/akka-http-core/src/main/scala/akka/http/HttpManager.scala index 55d7d2a3e5..6869d9c469 100644 --- a/akka-http-core/src/main/scala/akka/http/HttpManager.scala +++ b/akka-http-core/src/main/scala/akka/http/HttpManager.scala @@ -11,9 +11,8 @@ import akka.http.engine.client._ import akka.http.engine.server.{ HttpServerPipeline, ServerSettings } import akka.io.IO import akka.pattern.ask -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.{ PublisherDrain, Source, FlowMaterializer } import akka.stream.io.StreamTcp -import akka.stream.scaladsl.Flow import akka.util.Timeout /** @@ -64,9 +63,9 @@ private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor wi log.info("Bound to {}", endpoint) implicit val materializer = FlowMaterializer() val httpServerPipeline = new HttpServerPipeline(effectiveSettings, log) - val httpConnectionStream = Flow(connectionStream) + val httpConnectionStream = Source(connectionStream) .map(httpServerPipeline) - .toPublisher() + .runWith(PublisherDrain()) commander ! Http.ServerBinding(localAddress, httpConnectionStream, tcpServerBinding) case Failure(error) ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala index 200f809368..fc1f5d1d19 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala @@ -5,11 +5,12 @@ package akka.http.engine.client import java.net.InetSocketAddress +import akka.util.ByteString + import scala.collection.immutable.Queue -import akka.stream.{ FlattenStrategy, FlowMaterializer } +import akka.stream.scaladsl2._ import akka.event.LoggingAdapter import akka.stream.io.StreamTcp -import akka.stream.scaladsl.{ Flow, Duct } import akka.http.Http import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse } import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } @@ -34,34 +35,49 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log) def apply(tcpConn: StreamTcp.OutgoingTcpConnection): Http.OutgoingConnection = { + import FlowGraphImplicits._ + val requestMethodByPass = new RequestMethodByPass(tcpConn.remoteAddress) - val (contextBypassSubscriber, contextBypassPublisher) = - Duct[(HttpRequest, Any)].map(_._2).build() + val userIn = SubscriberTap[(HttpRequest, Any)]() + val userOut = PublisherDrain[(HttpResponse, Any)]() - val requestSubscriber = - Duct[(HttpRequest, Any)] - .broadcast(contextBypassSubscriber) - .map(requestMethodByPass) - .transform("renderer", () ⇒ requestRendererFactory.newRenderer) - .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) - .produceTo(tcpConn.outputStream) + val netOut = SubscriberDrain(tcpConn.outputStream) + val netIn = PublisherTap(tcpConn.inputStream) - val responsePublisher = - Flow(tcpConn.inputStream) - .transform("rootParser", () ⇒ rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass)) - .splitWhen(_.isInstanceOf[MessageStart]) - .headAndTail - .collect { - case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ - HttpResponse(statusCode, headers, createEntity(entityParts), protocol) - } - .zip(contextBypassPublisher) - .toPublisher() + val pipeline = FlowGraph { implicit b ⇒ + val bypassFanout = Broadcast[(HttpRequest, Any)]("bypassFanout") + val bypassFanin = Zip[HttpResponse, Any]("bypassFanin") - val processor = HttpClientProcessor(requestSubscriber, responsePublisher) - Http.OutgoingConnection(tcpConn.remoteAddress, tcpConn.localAddress, processor) + val requestPipeline = + Flow[(HttpRequest, Any)] + .map(requestMethodByPass) + .transform("renderer", () ⇒ requestRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) + + val responsePipeline = + Flow[ByteString] + .transform("rootParser", () ⇒ rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass)) + .splitWhen(_.isInstanceOf[MessageStart]) + .headAndTail + .collect { + case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ + HttpResponse(statusCode, headers, createEntity(entityParts), protocol) + } + + //FIXME: the graph is unnecessary after fixing #15957 + userIn ~> bypassFanout ~> requestPipeline ~> netOut + bypassFanout ~> Flow[(HttpRequest, Any)].map(_._2) ~> bypassFanin.right + netIn ~> responsePipeline ~> bypassFanin.left + bypassFanin.out ~> userOut + }.run() + + Http.OutgoingConnection( + tcpConn.remoteAddress, + tcpConn.localAddress, + pipeline.materializedDrain(userOut), + pipeline.materializedTap(userIn)) } class RequestMethodByPass(serverAddress: InetSocketAddress) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala index 03c49e523f..8ed31ffa0b 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala @@ -4,13 +4,12 @@ package akka.http.engine.parsing -import org.reactivestreams.Publisher import scala.annotation.tailrec import scala.collection.mutable.ListBuffer import akka.event.LoggingAdapter import akka.parboiled2.CharPredicate -import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.Transformer +import akka.stream.scaladsl2.Source import akka.util.ByteString import akka.http.model._ import akka.http.util._ @@ -24,7 +23,7 @@ import headers._ private[http] final class BodyPartParser(defaultContentType: ContentType, boundary: String, log: LoggingAdapter, - settings: BodyPartParser.Settings = BodyPartParser.defaultSettings)(implicit fm: FlowMaterializer) + settings: BodyPartParser.Settings = BodyPartParser.defaultSettings) extends Transformer[ByteString, BodyPartParser.Output] { import BodyPartParser._ import settings._ @@ -144,7 +143,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, emitPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = { (headers, ct, bytes) ⇒ emit(BodyPartStart(headers, entityParts ⇒ HttpEntity.IndefiniteLength(ct, - Flow(entityParts).collect { case EntityPart(data) ⇒ data }.toPublisher()))) + entityParts.collect { case EntityPart(data) ⇒ data }))) emit(bytes) }, emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = { @@ -217,7 +216,7 @@ private[http] object BodyPartParser { val boundaryCharNoSpace = CharPredicate.Digit ++ CharPredicate.Alpha ++ "'()+_,-./:=?" sealed trait Output - final case class BodyPartStart(headers: List[HttpHeader], createEntity: Publisher[Output] ⇒ BodyPartEntity) extends Output + final case class BodyPartStart(headers: List[HttpHeader], createEntity: Source[Output] ⇒ BodyPartEntity) extends Output final case class EntityPart(data: ByteString) extends Output final case class ParseError(info: ErrorInfo) extends Output diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala index 889ca565b2..a05c5f3aa1 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala @@ -4,14 +4,13 @@ package akka.http.engine.parsing -import org.reactivestreams.Publisher import scala.annotation.tailrec import scala.collection.mutable.ListBuffer import scala.collection.immutable -import akka.stream.scaladsl.Flow import akka.parboiled2.CharUtils import akka.util.ByteString -import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.Transformer +import akka.stream.scaladsl2.Source import akka.http.model.parser.CharacterClasses import akka.http.model._ import headers._ @@ -238,13 +237,13 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut contentLength: Int)(entityParts: Any): UniversalEntity = HttpEntity.Strict(contentType(cth), input.slice(bodyStart, bodyStart + contentLength)) - def defaultEntity(cth: Option[`Content-Type`], contentLength: Long)(entityParts: Publisher[_ <: ParserOutput])(implicit fm: FlowMaterializer): UniversalEntity = { - val data = Flow(entityParts).collect { case ParserOutput.EntityPart(bytes) ⇒ bytes }.toPublisher() + def defaultEntity(cth: Option[`Content-Type`], contentLength: Long)(entityParts: Source[_ <: ParserOutput]): UniversalEntity = { + val data = entityParts.collect { case ParserOutput.EntityPart(bytes) ⇒ bytes } HttpEntity.Default(contentType(cth), contentLength, data) } - def chunkedEntity(cth: Option[`Content-Type`])(entityChunks: Publisher[_ <: ParserOutput])(implicit fm: FlowMaterializer): RequestEntity with ResponseEntity = { - val chunks = Flow(entityChunks).collect { case ParserOutput.EntityChunk(chunk) ⇒ chunk }.toPublisher() + def chunkedEntity(cth: Option[`Content-Type`])(entityChunks: Source[_ <: ParserOutput]): RequestEntity with ResponseEntity = { + val chunks = entityChunks.collect { case ParserOutput.EntityChunk(chunk) ⇒ chunk } HttpEntity.Chunked(contentType(cth), chunks) } diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index 0df5009828..530fc47993 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -5,10 +5,9 @@ package akka.http.engine.parsing import java.lang.{ StringBuilder ⇒ JStringBuilder } -import org.reactivestreams.Publisher import scala.annotation.tailrec import akka.http.model.parser.CharacterClasses -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.Source import akka.util.ByteString import akka.http.model._ import headers._ @@ -18,7 +17,7 @@ import StatusCodes._ * INTERNAL API */ private[http] class HttpRequestParser(_settings: ParserSettings, - rawRequestUriHeader: Boolean)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings))(implicit fm: FlowMaterializer) + rawRequestUriHeader: Boolean)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings)) extends HttpMessageParser[ParserOutput.RequestOutput](_settings, _headerParser) { import settings._ @@ -110,7 +109,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`], hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult = if (hostHeaderPresent || protocol == HttpProtocols.`HTTP/1.0`) { - def emitRequestStart(createEntity: Publisher[ParserOutput.RequestOutput] ⇒ RequestEntity, + def emitRequestStart(createEntity: Source[ParserOutput.RequestOutput] ⇒ RequestEntity, headers: List[HttpHeader] = headers) = emit(ParserOutput.RequestStart(method, uri, protocol, headers, createEntity, closeAfterResponseCompletion)) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala index 53e3c41b54..8b4f4e38af 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala @@ -4,11 +4,9 @@ package akka.http.engine.parsing -import org.reactivestreams.Publisher import scala.annotation.tailrec import akka.http.model.parser.CharacterClasses -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.Source import akka.util.ByteString import akka.http.model._ import headers._ @@ -18,7 +16,7 @@ import HttpResponseParser.NoMethod * INTERNAL API */ private[http] class HttpResponseParser(_settings: ParserSettings, - dequeueRequestMethodForNextResponse: () ⇒ HttpMethod = () ⇒ NoMethod)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings))(implicit fm: FlowMaterializer) + dequeueRequestMethodForNextResponse: () ⇒ HttpMethod = () ⇒ NoMethod)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings)) extends HttpMessageParser[ParserOutput.ResponseOutput](_settings, _headerParser) { import settings._ @@ -75,7 +73,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, def parseEntity(headers: List[HttpHeader], protocol: HttpProtocol, input: ByteString, bodyStart: Int, clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`], hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult = { - def emitResponseStart(createEntity: Publisher[ParserOutput.ResponseOutput] ⇒ ResponseEntity, + def emitResponseStart(createEntity: Source[ParserOutput.ResponseOutput] ⇒ ResponseEntity, headers: List[HttpHeader] = headers) = emit(ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, closeAfterResponseCompletion)) def finishEmptyResponse() = { @@ -100,7 +98,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, } case None ⇒ emitResponseStart { entityParts ⇒ - val data = Flow(entityParts).collect { case ParserOutput.EntityPart(bytes) ⇒ bytes }.toPublisher() + val data = entityParts.collect { case ParserOutput.EntityPart(bytes) ⇒ bytes } HttpEntity.CloseDelimited(contentType(cth), data) } parseToCloseBody(input, bodyStart) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala index 521bec34d8..c4e756c21c 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala @@ -4,9 +4,9 @@ package akka.http.engine.parsing -import org.reactivestreams.Publisher import akka.http.model._ import akka.util.ByteString +import akka.stream.scaladsl2.Source /** * INTERNAL API @@ -27,14 +27,14 @@ private[http] object ParserOutput { uri: Uri, protocol: HttpProtocol, headers: List[HttpHeader], - createEntity: Publisher[RequestOutput] ⇒ RequestEntity, + createEntity: Source[RequestOutput] ⇒ RequestEntity, closeAfterResponseCompletion: Boolean) extends MessageStart with RequestOutput final case class ResponseStart( statusCode: StatusCode, protocol: HttpProtocol, headers: List[HttpHeader], - createEntity: Publisher[ResponseOutput] ⇒ ResponseEntity, + createEntity: Source[ResponseOutput] ⇒ ResponseEntity, closeAfterResponseCompletion: Boolean) extends MessageStart with ResponseOutput case object MessageEnd extends MessageOutput diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala index fb29d90f09..4fea05fe39 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala @@ -5,16 +5,14 @@ package akka.http.engine.rendering import java.nio.charset.Charset -import org.reactivestreams.Publisher import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.http.model._ import akka.http.model.headers._ import akka.http.engine.rendering.RenderSupport._ import akka.http.util._ -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.scaladsl2.Source +import akka.stream.Transformer import akka.util.ByteString import HttpEntity._ @@ -24,11 +22,11 @@ import HttpEntity._ private[http] class BodyPartRenderer(boundary: String, nioCharset: Charset, partHeadersSizeHint: Int, - log: LoggingAdapter)(implicit fm: FlowMaterializer) extends Transformer[BodyPart, Publisher[ChunkStreamPart]] { + log: LoggingAdapter) extends Transformer[BodyPart, Source[ChunkStreamPart]] { private[this] var firstBoundaryRendered = false - def onNext(bodyPart: BodyPart): List[Publisher[ChunkStreamPart]] = { + def onNext(bodyPart: BodyPart): List[Source[ChunkStreamPart]] = { val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint) def renderBoundary(): Unit = { @@ -60,12 +58,12 @@ private[http] class BodyPartRenderer(boundary: String, case Nil ⇒ r ~~ CrLf } - def bodyPartChunks(data: Publisher[ByteString]): List[Publisher[ChunkStreamPart]] = { - val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toPublisher() - Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toPublisher() :: Nil + def bodyPartChunks(data: Source[ByteString]): List[Source[ChunkStreamPart]] = { + val entityChunks = data.map[ChunkStreamPart](Chunk(_)) + (Source(Chunk(r.get) :: Nil) ++ entityChunks) :: Nil } - def completePartRendering(): List[Publisher[ChunkStreamPart]] = + def completePartRendering(): List[Source[ChunkStreamPart]] = bodyPart.entity match { case x if x.isKnownEmpty ⇒ chunkStream(r.get) case Strict(_, data) ⇒ chunkStream((r ~~ data).get) @@ -80,7 +78,7 @@ private[http] class BodyPartRenderer(boundary: String, completePartRendering() } - override def onTermination(e: Option[Throwable]): List[Publisher[ChunkStreamPart]] = + override def onTermination(e: Option[Throwable]): List[Source[ChunkStreamPart]] = if (e.isEmpty && firstBoundaryRendered) { val r = new ByteStringRendering(boundary.length + 4) r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-' @@ -88,6 +86,6 @@ private[http] class BodyPartRenderer(boundary: String, } else Nil private def chunkStream(byteString: ByteString) = - SynchronousPublisherFromIterable[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil + Source[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil } diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala index 5da5a605fa..36eb789802 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala @@ -5,13 +5,11 @@ package akka.http.engine.rendering import java.net.InetSocketAddress -import org.reactivestreams.Publisher import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString -import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, Transformer } -import akka.stream.impl.SynchronousPublisherFromIterable +import akka.stream.scaladsl2.Source +import akka.stream.Transformer import akka.http.model._ import akka.http.util._ import RenderSupport._ @@ -22,13 +20,13 @@ import headers._ */ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`User-Agent`], requestHeaderSizeHint: Int, - log: LoggingAdapter)(implicit fm: FlowMaterializer) { + log: LoggingAdapter) { def newRenderer: HttpRequestRenderer = new HttpRequestRenderer - final class HttpRequestRenderer extends Transformer[RequestRenderingContext, Publisher[ByteString]] { + final class HttpRequestRenderer extends Transformer[RequestRenderingContext, Source[ByteString]] { - def onNext(ctx: RequestRenderingContext): List[Publisher[ByteString]] = { + def onNext(ctx: RequestRenderingContext): List[Source[ByteString]] = { val r = new ByteStringRendering(requestHeaderSizeHint) import ctx.request._ @@ -101,24 +99,24 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.` r ~~ CrLf } - def completeRequestRendering(): List[Publisher[ByteString]] = + def completeRequestRendering(): List[Source[ByteString]] = entity match { case x if x.isKnownEmpty ⇒ renderContentLength(0) - SynchronousPublisherFromIterable(r.get :: Nil) :: Nil + Source(r.get :: Nil) :: Nil case HttpEntity.Strict(_, data) ⇒ renderContentLength(data.length) - SynchronousPublisherFromIterable(r.get :: data :: Nil) :: Nil + Source(r.get :: data :: Nil) :: Nil case HttpEntity.Default(_, contentLength, data) ⇒ renderContentLength(contentLength) renderByteStrings(r, - Flow(data).transform("checkContentLength", () ⇒ new CheckContentLengthTransformer(contentLength)).toPublisher()) + data.transform("checkContentLength", () ⇒ new CheckContentLengthTransformer(contentLength))) case HttpEntity.Chunked(_, chunks) ⇒ r ~~ CrLf - renderByteStrings(r, Flow(chunks).transform("chunkTransform", () ⇒ new ChunkTransformer).toPublisher()) + renderByteStrings(r, chunks.transform("chunkTransform", () ⇒ new ChunkTransformer)) } renderRequestLine() diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala index 67df4524e8..215a53d430 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala @@ -4,13 +4,11 @@ package akka.http.engine.rendering -import org.reactivestreams.Publisher import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString -import akka.stream.scaladsl.Flow -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.scaladsl2.Source +import akka.stream.Transformer import akka.http.model._ import akka.http.util._ import RenderSupport._ @@ -22,7 +20,7 @@ import headers._ */ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Server], responseHeaderSizeHint: Int, - log: LoggingAdapter)(implicit fm: FlowMaterializer) { + log: LoggingAdapter) { private val renderDefaultServerHeader: Rendering ⇒ Unit = serverHeader match { @@ -52,12 +50,12 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser def newRenderer: HttpResponseRenderer = new HttpResponseRenderer - final class HttpResponseRenderer extends Transformer[ResponseRenderingContext, Publisher[ByteString]] { + final class HttpResponseRenderer extends Transformer[ResponseRenderingContext, Source[ByteString]] { private[this] var close = false // signals whether the connection is to be closed after the current response override def isComplete = close - def onNext(ctx: ResponseRenderingContext): List[Publisher[ByteString]] = { + def onNext(ctx: ResponseRenderingContext): List[Source[ByteString]] = { val r = new ByteStringRendering(responseHeaderSizeHint) import ctx.response._ @@ -132,23 +130,23 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf } - def byteStrings(entityBytes: ⇒ Publisher[ByteString]): List[Publisher[ByteString]] = + def byteStrings(entityBytes: ⇒ Source[ByteString]): List[Source[ByteString]] = renderByteStrings(r, entityBytes, skipEntity = noEntity) - def completeResponseRendering(entity: ResponseEntity): List[Publisher[ByteString]] = + def completeResponseRendering(entity: ResponseEntity): List[Source[ByteString]] = entity match { case HttpEntity.Strict(_, data) ⇒ renderHeaders(headers.toList) renderEntityContentType(r, entity) r ~~ `Content-Length` ~~ data.length ~~ CrLf ~~ CrLf val entityBytes = if (noEntity) Nil else data :: Nil - SynchronousPublisherFromIterable(r.get :: entityBytes) :: Nil + Source(r.get :: entityBytes) :: Nil case HttpEntity.Default(_, contentLength, data) ⇒ renderHeaders(headers.toList) renderEntityContentType(r, entity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf - byteStrings(Flow(data).transform("checkContentLength", () ⇒ new CheckContentLengthTransformer(contentLength)).toPublisher()) + byteStrings(data.transform("checkContentLength", () ⇒ new CheckContentLengthTransformer(contentLength))) case HttpEntity.CloseDelimited(_, data) ⇒ renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD) @@ -158,12 +156,12 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser case HttpEntity.Chunked(contentType, chunks) ⇒ if (ctx.requestProtocol == `HTTP/1.0`) - completeResponseRendering(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toPublisher())) + completeResponseRendering(HttpEntity.CloseDelimited(contentType, chunks.map(_.data))) else { renderHeaders(headers.toList) renderEntityContentType(r, entity) r ~~ CrLf - byteStrings(Flow(chunks).transform("renderChunks", () ⇒ new ChunkTransformer).toPublisher()) + byteStrings(chunks.transform("renderChunks", () ⇒ new ChunkTransformer)) } } diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala index ac87405f19..92aa4f63a1 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala @@ -4,15 +4,15 @@ package akka.http.engine.rendering -import org.reactivestreams.{ Subscription, Subscriber, Publisher } import akka.parboiled2.CharUtils +import akka.stream.impl2.ActorBasedFlowMaterializer import akka.util.ByteString import akka.event.LoggingAdapter -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.scaladsl2._ +import akka.stream.Transformer import akka.http.model._ import akka.http.util._ +import org.reactivestreams.Subscriber /** * INTERNAL API @@ -30,20 +30,26 @@ private object RenderSupport { val defaultLastChunkBytes: ByteString = renderChunk(HttpEntity.LastChunk) + // This hooks into the materialization to cancel the not needed second source. This helper class + // allows us to not take a FlowMaterializer but delegate the cancellation to the point when the whole stream + // materializes + private case class CancelSecond[T](first: Source[T], second: Source[T]) extends SimpleTap[T] { + override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = { + first.connect(SubscriberDrain(flowSubscriber)).run()(materializer) + second.connect(Sink.cancelled).run()(materializer) + } + } + def renderEntityContentType(r: Rendering, entity: HttpEntity): Unit = if (entity.contentType != ContentTypes.NoContentType) r ~~ headers.`Content-Type` ~~ entity.contentType ~~ CrLf - def renderByteStrings(r: ByteStringRendering, entityBytes: ⇒ Publisher[ByteString], - skipEntity: Boolean = false)(implicit fm: FlowMaterializer): List[Publisher[ByteString]] = { - val messageStart = SynchronousPublisherFromIterable(r.get :: Nil) + def renderByteStrings(r: ByteStringRendering, entityBytes: ⇒ Source[ByteString], + skipEntity: Boolean = false): List[Source[ByteString]] = { + val messageStart = Source(r.get :: Nil) val messageBytes = - if (!skipEntity) Flow(messageStart).concat(entityBytes).toPublisher() - else { - // FIXME: This should be fixed by a CancelledDrain once #15903 is done. Currently this is needed for the tests - entityBytes.subscribe(cancelledSusbcriber) - messageStart - } + if (!skipEntity) messageStart ++ entityBytes + else CancelSecond(messageStart, entityBytes) messageBytes :: Nil } diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala index 6a9a0ea426..2e8a4b33f6 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala @@ -4,17 +4,17 @@ package akka.http.engine.server -import org.reactivestreams.Publisher import akka.event.LoggingAdapter import akka.stream.io.StreamTcp -import akka.stream.{ FlattenStrategy, Transformer, FlowMaterializer } -import akka.stream.scaladsl.{ Flow, Duct } +import akka.stream.Transformer +import akka.stream.scaladsl2._ import akka.http.engine.parsing.HttpRequestParser import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.model.{ StatusCode, ErrorInfo, HttpRequest, HttpResponse, HttpMethods } import akka.http.engine.parsing.ParserOutput._ import akka.http.Http import akka.http.util._ +import akka.util.ByteString /** * INTERNAL API @@ -30,39 +30,50 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log) def apply(tcpConn: StreamTcp.IncomingTcpConnection): Http.IncomingConnection = { - val (applicationBypassSubscriber, applicationBypassPublisher) = - Duct[(RequestOutput, Publisher[RequestOutput])] - .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } - .build() + import FlowGraphImplicits._ - val requestPublisher = - Flow(tcpConn.inputStream) - .transform("rootParser", () ⇒ rootParser.copyWith(warnOnIllegalHeader)) - // this will create extra single element `[MessageEnd]` substreams, that will - // be filtered out by the above `collect` for the applicationBypass and the - // below `collect` for the actual request handling - // TODO: replace by better combinator, maybe `splitAfter(_ == MessageEnd)`? - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail - .broadcast(applicationBypassSubscriber) - .collect { - case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ - val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) - val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method - HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) - } - .toPublisher() + val networkIn = PublisherTap(tcpConn.inputStream) + val networkOut = SubscriberDrain(tcpConn.outputStream) - val responseSubscriber = - Duct[HttpResponse] - .merge(applicationBypassPublisher) - .transform("applyApplicationBypass", () ⇒ applyApplicationBypass) - .transform("renderer", () ⇒ responseRendererFactory.newRenderer) - .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) - .produceTo(tcpConn.outputStream) + val userIn = PublisherDrain[HttpRequest]() + val userOut = SubscriberTap[HttpResponse]() - Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) + val pipeline = FlowGraph { implicit b ⇒ + val bypassFanout = Broadcast[(RequestOutput, Source[RequestOutput])]("bypassFanout") + val bypassFanin = Merge[Any]("merge") + + val rootParsePipeline = + Flow[ByteString] + .transform("rootParser", () ⇒ rootParser.copyWith(warnOnIllegalHeader)) + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) + .headAndTail + + val rendererPipeline = + Flow[Any] + .transform("applyApplicationBypass", () ⇒ applyApplicationBypass) + .transform("renderer", () ⇒ responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) + + val requestTweaking = Flow[(RequestOutput, Source[RequestOutput])].collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method + HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) + } + + val bypass = + Flow[(RequestOutput, Source[RequestOutput])] + .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } + + //FIXME: the graph is unnecessary after fixing #15957 + networkIn ~> rootParsePipeline ~> bypassFanout ~> requestTweaking ~> userIn + bypassFanout ~> bypass ~> bypassFanin + userOut ~> bypassFanin ~> rendererPipeline ~> networkOut + + }.run() + + Http.IncomingConnection(tcpConn.remoteAddress, pipeline.materializedDrain(userIn), pipeline.materializedTap(userOut)) } /** diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 9736e4b038..136c27fc1c 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -9,13 +9,12 @@ import akka.http.util.FastFuture import language.implicitConversions import java.io.File import java.lang.{ Iterable ⇒ JIterable } -import org.reactivestreams.Publisher import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import akka.util.ByteString -import akka.stream.{ Transformer, TimerTransformer, FlowMaterializer } -import akka.stream.scaladsl.Flow +import akka.stream.{ TimerTransformer, Transformer } +import akka.stream.scaladsl2._ import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable } import akka.http.util._ import japi.JavaMapping.Implicits._ @@ -39,7 +38,7 @@ sealed trait HttpEntity extends japi.HttpEntity { /** * A stream of the data of this entity. */ - def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] + def dataBytes: Source[ByteString] /** * Collects all possible parts and returns a potentially future Strict entity for easier processing. @@ -63,7 +62,7 @@ sealed trait HttpEntity extends japi.HttpEntity { throw new java.util.concurrent.TimeoutException( s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data") } - Flow(dataBytes).timerTransform("toStrict", transformer).toFuture() + dataBytes.timerTransform("toStrict", transformer).runWith(FutureDrain()) } /** @@ -74,7 +73,7 @@ sealed trait HttpEntity extends japi.HttpEntity { * This method may only throw an exception if the `transformer` function throws an exception while creating the transformer. * Any other errors are reported through the new entity data stream. */ - def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): HttpEntity + def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): HttpEntity /** * Creates a copy of this HttpEntity with the `contentType` overridden with the given one. @@ -82,7 +81,7 @@ sealed trait HttpEntity extends japi.HttpEntity { def withContentType(contentType: ContentType): HttpEntity /** Java API */ - def getDataBytes(materializer: FlowMaterializer): Publisher[ByteString] = dataBytes(materializer) + def getDataBytes: Source[ByteString] = dataBytes // default implementations, should be overridden def isCloseDelimited: Boolean = false @@ -99,13 +98,13 @@ sealed trait BodyPartEntity extends HttpEntity with japi.BodyPartEntity { sealed trait RequestEntity extends HttpEntity with japi.RequestEntity with ResponseEntity { def withContentType(contentType: ContentType): RequestEntity - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): RequestEntity + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): RequestEntity } /* An entity that can be used for responses */ sealed trait ResponseEntity extends HttpEntity with japi.ResponseEntity { def withContentType(contentType: ContentType): ResponseEntity - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): ResponseEntity + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): ResponseEntity } /* An entity that can be used for requests, responses, and body parts */ sealed trait UniversalEntity extends japi.UniversalEntity with MessageEntity with BodyPartEntity { @@ -122,7 +121,7 @@ object HttpEntity { if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes)) def apply(contentType: ContentType, data: ByteString): Strict = if (data.isEmpty) empty(contentType) else Strict(contentType, data) - def apply(contentType: ContentType, contentLength: Long, data: Publisher[ByteString]): UniversalEntity = + def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString]): UniversalEntity = if (contentLength == 0) empty(contentType) else Default(contentType, contentLength, data) def apply(contentType: ContentType, file: File): UniversalEntity = { @@ -148,19 +147,19 @@ object HttpEntity { def isKnownEmpty: Boolean = data.isEmpty - def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = SynchronousPublisherFromIterable(data :: Nil) + def dataBytes: Source[ByteString] = Source(data :: Nil) override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): MessageEntity = { + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): MessageEntity = { try { val t = transformer() val newData = (t.onNext(data) ++ t.onTermination(None)).join copy(data = newData) } catch { case NonFatal(ex) ⇒ - Chunked(contentType, StreamUtils.failedPublisher(ex)) + Chunked(contentType, Source.failed(ex)) } } @@ -173,17 +172,17 @@ object HttpEntity { */ final case class Default(contentType: ContentType, contentLength: Long, - data: Publisher[ByteString]) + data: Source[ByteString]) extends japi.HttpEntityDefault with UniversalEntity { require(contentLength > 0, "contentLength must be positive (use `HttpEntity.empty(contentType)` for empty entities)") def isKnownEmpty = false override def isDefault: Boolean = true - def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = data + def dataBytes: Source[ByteString] = data - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): Chunked = { - val chunks = Flow(data).transform("transformDataBytes-Default", () ⇒ transformer().map(Chunk(_): ChunkStreamPart)).toPublisher() + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): Chunked = { + val chunks = data.transform("transformDataBytes-Default", () ⇒ transformer().map(Chunk(_): ChunkStreamPart)) HttpEntity.Chunked(contentType, chunks) } @@ -199,11 +198,11 @@ object HttpEntity { */ private[http] sealed trait WithoutKnownLength extends HttpEntity { def contentType: ContentType - def data: Publisher[ByteString] + def data: Source[ByteString] def isKnownEmpty = data eq EmptyPublisher - def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = data + def dataBytes: Source[ByteString] = data } /** @@ -211,7 +210,7 @@ object HttpEntity { * The content-length of such responses is unknown at the time the response headers have been received. * Note that this type of HttpEntity can only be used for HttpResponses. */ - final case class CloseDelimited(contentType: ContentType, data: Publisher[ByteString]) + final case class CloseDelimited(contentType: ContentType, data: Source[ByteString]) extends japi.HttpEntityCloseDelimited with ResponseEntity with WithoutKnownLength { type Self = CloseDelimited @@ -219,42 +218,42 @@ object HttpEntity { def withContentType(contentType: ContentType): CloseDelimited = if (contentType == this.contentType) this else copy(contentType = contentType) - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): CloseDelimited = + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): CloseDelimited = HttpEntity.CloseDelimited(contentType, - Flow(data).transform("transformDataBytes-CloseDelimited", transformer).toPublisher()) + data.transform("transformDataBytes-CloseDelimited", transformer)) } /** * The model for the entity of a BodyPart with an indefinite length. * Note that this type of HttpEntity can only be used for BodyParts. */ - final case class IndefiniteLength(contentType: ContentType, data: Publisher[ByteString]) + final case class IndefiniteLength(contentType: ContentType, data: Source[ByteString]) extends japi.HttpEntityIndefiniteLength with BodyPartEntity with WithoutKnownLength { override def isIndefiniteLength: Boolean = true def withContentType(contentType: ContentType): IndefiniteLength = if (contentType == this.contentType) this else copy(contentType = contentType) - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): IndefiniteLength = + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): IndefiniteLength = HttpEntity.IndefiniteLength(contentType, - Flow(data).transform("transformDataBytes-IndefiniteLength", transformer).toPublisher()) + data.transform("transformDataBytes-IndefiniteLength", transformer)) } /** * The model for the entity of a chunked HTTP message (with `Transfer-Encoding: chunked`). */ - final case class Chunked(contentType: ContentType, chunks: Publisher[ChunkStreamPart]) + final case class Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart]) extends japi.HttpEntityChunked with MessageEntity { def isKnownEmpty = chunks eq EmptyPublisher override def isChunked: Boolean = true - def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = - Flow(chunks).map(_.data).filter(_.nonEmpty).toPublisher() + def dataBytes: Source[ByteString] = + chunks.map(_.data).filter(_.nonEmpty) - override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): Chunked = { + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString]): Chunked = { val newChunks = - Flow(chunks).transform("transformDataBytes-Chunked", () ⇒ new Transformer[ChunkStreamPart, ChunkStreamPart] { + chunks.transform("transformDataBytes-Chunked", () ⇒ new Transformer[ChunkStreamPart, ChunkStreamPart] { val byteTransformer = transformer() var sentLastChunk = false @@ -276,7 +275,7 @@ object HttpEntity { if (remaining.nonEmpty) Chunk(remaining.join) :: Nil else Nil } - }).toPublisher() + }) HttpEntity.Chunked(contentType, newChunks) } @@ -285,17 +284,17 @@ object HttpEntity { if (contentType == this.contentType) this else copy(contentType = contentType) /** Java API */ - def getChunks: Publisher[japi.ChunkStreamPart] = chunks.asInstanceOf[Publisher[japi.ChunkStreamPart]] + def getChunks: Source[japi.ChunkStreamPart] = chunks.asInstanceOf[Source[japi.ChunkStreamPart]] } object Chunked { /** * Returns a ``Chunked`` entity where one Chunk is produced for every non-empty ByteString of the given * ``Publisher[ByteString]``. */ - def fromData(contentType: ContentType, chunks: Publisher[ByteString])(implicit fm: FlowMaterializer): Chunked = - Chunked(contentType, Flow(chunks).collect[ChunkStreamPart] { + def fromData(contentType: ContentType, chunks: Source[ByteString]): Chunked = + Chunked(contentType, chunks.collect[ChunkStreamPart] { case b: ByteString if b.nonEmpty ⇒ Chunk(b) - }.toPublisher()) + }) } /** diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index cd9d9b0384..179af655dc 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import scala.reflect.{ classTag, ClassTag } -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.FlowMaterializer import akka.util.ByteString import akka.http.util._ import headers._ diff --git a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala index b1b1e18f3d..fb170f4210 100644 --- a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala +++ b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala @@ -5,30 +5,28 @@ package akka.http.model import java.io.File -import org.reactivestreams.Publisher import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.{ FutureDrain, FlowMaterializer, Source } import akka.stream.impl.SynchronousPublisherFromIterable import akka.http.util.FastFuture import FastFuture._ import headers._ trait MultipartParts { - def parts: Publisher[BodyPart] + def parts: Source[BodyPart] } /** * Basic model for multipart content as defined in RFC 2046. * If you are looking for a model for `multipart/form-data` you should be using [[MultipartFormData]]. */ -final case class MultipartContent(parts: Publisher[BodyPart]) extends MultipartParts +final case class MultipartContent(parts: Source[BodyPart]) extends MultipartParts object MultipartContent { - val Empty = MultipartContent(SynchronousPublisherFromIterable[BodyPart](Nil)) + val Empty = MultipartContent(Source[BodyPart](Nil)) - def apply(parts: BodyPart*): MultipartContent = apply(SynchronousPublisherFromIterable[BodyPart](parts.toList)) + def apply(parts: BodyPart*): MultipartContent = apply(Source[BodyPart](parts.toList)) def apply(files: Map[String, FormFile]): MultipartContent = apply(files.map(e ⇒ BodyPart(e._2, e._1))(collection.breakOut): _*) @@ -38,13 +36,13 @@ object MultipartContent { * Model for multipart/byteranges content as defined in RFC 2046. * If you are looking for a model for `multipart/form-data` you should be using [[MultipartFormData]]. */ -final case class MultipartByteRanges(parts: Publisher[BodyPart]) extends MultipartParts +final case class MultipartByteRanges(parts: Source[BodyPart]) extends MultipartParts object MultipartByteRanges { - val Empty = MultipartByteRanges(SynchronousPublisherFromIterable[BodyPart](Nil)) + val Empty = MultipartByteRanges(Source[BodyPart](Nil)) def apply(parts: BodyPart*): MultipartByteRanges = - if (parts.isEmpty) Empty else MultipartByteRanges(SynchronousPublisherFromIterable[BodyPart](parts.toList)) + if (parts.isEmpty) Empty else MultipartByteRanges(Source[BodyPart](parts.toList)) } /** @@ -52,19 +50,19 @@ object MultipartByteRanges { * All parts must contain a Content-Disposition header with a type form-data * and a name parameter that is unique. */ -case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts { +case class MultipartFormData(parts: Source[BodyPart]) extends MultipartParts { /** * Turns this instance into its strict specialization using the given `maxFieldCount` as the field number cut-off * hint. */ def toStrict(maxFieldCount: Int = 1000)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[StrictMultipartFormData] = - Flow(parts).grouped(maxFieldCount).toFuture().fast.map(new StrictMultipartFormData(_)) + parts.grouped(maxFieldCount).runWith(FutureDrain()).fast.map(new StrictMultipartFormData(_)) } /** * A specialized `MultipartFormData` that allows full random access to its parts. */ -class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends MultipartFormData(SynchronousPublisherFromIterable(fields)) { +class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends MultipartFormData(Source(fields)) { /** * Returns the BodyPart with the given name, if found. */ @@ -77,7 +75,7 @@ class StrictMultipartFormData(val fields: immutable.Seq[BodyPart]) extends Multi object MultipartFormData { val Empty = MultipartFormData() - def apply(parts: BodyPart*): MultipartFormData = apply(SynchronousPublisherFromIterable[BodyPart](parts.toList)) + def apply(parts: BodyPart*): MultipartFormData = apply(Source[BodyPart](parts.toList)) def apply(fields: Map[String, BodyPart]): MultipartFormData = apply { fields.map { diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 5a3ea605bc..4ca9638140 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -5,8 +5,7 @@ package akka.http.util import akka.stream.impl.ErrorPublisher -import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.Transformer import akka.util.ByteString import org.reactivestreams.Publisher diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index 619da30411..1aa24da002 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -5,15 +5,15 @@ package akka.http import language.implicitConversions +import language.higherKinds import java.nio.charset.Charset import com.typesafe.config.Config -import org.reactivestreams.{ Subscription, Subscriber, Publisher } +import akka.stream.scaladsl2.{ Flow, Source, FlattenStrategy } import scala.util.matching.Regex import akka.event.LoggingAdapter import akka.util.ByteString import akka.actor._ -import akka.stream.scaladsl.Flow -import akka.stream.{ Transformer, FlattenStrategy, FlowMaterializer } +import akka.stream.Transformer package object util { private[http] val UTF8 = Charset.forName("UTF8") @@ -38,15 +38,20 @@ package object util { private[http] implicit def enhanceTransformer[T, U](transformer: Transformer[T, U]): EnhancedTransformer[T, U] = new EnhancedTransformer(transformer) - private[http] implicit class FlowWithHeadAndTail[T](val underlying: Flow[Publisher[T]]) extends AnyVal { - def headAndTail(implicit fm: FlowMaterializer): Flow[(T, Publisher[T])] = - underlying.map { p ⇒ - Flow(p).prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) }.toPublisher() - }.flatten(FlattenStrategy.Concat()) + private[http] implicit class SourceWithHeadAndTail[T](val underlying: Source[Source[T]]) extends AnyVal { + def headAndTail: Source[(T, Source[T])] = + underlying.map { _.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } } + .flatten(FlattenStrategy.concat) } - private[http] implicit class FlowWithPrintEvent[T](val underlying: Flow[T]) { - def printEvent(marker: String): Flow[T] = + private[http] implicit class FlowWithHeadAndTail[In, Out](val underlying: Flow[In, Source[Out]]) extends AnyVal { + def headAndTail: Flow[In, (Out, Source[Out])] = + underlying.map { _.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } } + .flatten(FlattenStrategy.concat) + } + + private[http] implicit class SourceWithPrintEvent[T](val underlying: Source[T]) { + def printEvent(marker: String): Source[T] = underlying.transform("transform", () ⇒ new Transformer[T, T] { def onNext(element: T) = { @@ -60,14 +65,6 @@ package object util { }) } - // FIXME: This should be fixed by a CancelledDrain once #15903 is done. Currently this is needed for the tests - private[http] def cancelledSusbcriber[T]: Subscriber[T] = new Subscriber[T] { - override def onSubscribe(s: Subscription): Unit = s.cancel() - override def onError(t: Throwable): Unit = () - override def onComplete(): Unit = () - override def onNext(t: T): Unit = () - } - private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = new Transformer[ByteString, ByteString] { def onNext(element: ByteString) = element :: Nil diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index c817ce07a8..9dbd1a494c 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -7,6 +7,7 @@ package akka.http import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.net.Socket import com.typesafe.config.{ Config, ConfigFactory } +import org.reactivestreams.Publisher import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.duration._ @@ -14,11 +15,10 @@ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.{ Status, ActorSystem } import akka.io.IO import akka.testkit.TestProbe -import akka.stream.FlowMaterializer import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.scaladsl.Flow import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } +import akka.stream.scaladsl2._ import akka.http.engine.client.ClientConnectionSettings import akka.http.engine.server.ServerSettings import akka.http.model._ @@ -97,7 +97,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk) val chunkedContentType: ContentType = MediaTypes.`application/base64` - val chunkedEntity = HttpEntity.Chunked(chunkedContentType, SynchronousPublisherFromIterable(chunks)) + val chunkedEntity = HttpEntity.Chunked(chunkedContentType, Source(chunks)) val clientOutSub = clientOut.expectSubscription() clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678) @@ -107,7 +107,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { private val HttpRequest(POST, uri, List(`User-Agent`(_), Host(_, _), Accept(Vector(MediaRanges.`*/*`))), Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() uri shouldEqual Uri(s"http://$hostname:$port/chunked") - Await.result(Flow(chunkStream).grouped(4).toFuture(), 100.millis) shouldEqual chunks + Await.result(chunkStream.grouped(4).runWith(FutureDrain()), 100.millis) shouldEqual chunks val serverOutSub = serverOut.expectSubscription() serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity)) @@ -116,7 +116,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { clientInSub.request(1) val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext() - Await.result(Flow(chunkStream2).grouped(1000).toFuture(), 100.millis) shouldEqual chunks + Await.result(chunkStream2.grouped(1000).runWith(FutureDrain()), 100.millis) shouldEqual chunks } } @@ -147,8 +147,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { connection.remoteAddress.getHostName shouldEqual hostname val requestPublisherProbe = StreamTestKit.PublisherProbe[(HttpRequest, T)]() val responseSubscriberProbe = StreamTestKit.SubscriberProbe[(HttpResponse, T)]() - requestPublisherProbe.subscribe(connection.processor[T]) - connection.processor[T].subscribe(responseSubscriberProbe) + requestPublisherProbe.subscribe(connection.requestSubscriber) + connection.responsePublisher.asInstanceOf[Publisher[(HttpResponse, T)]].subscribe(responseSubscriberProbe) requestPublisherProbe -> responseSubscriberProbe } diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 615bba8da2..b3372cd2e5 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -5,14 +5,14 @@ package akka.http import com.typesafe.config.{ Config, ConfigFactory } +import org.reactivestreams.Subscriber import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{ Failure, Success } import akka.actor.ActorSystem import akka.pattern.ask import akka.util.Timeout -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.{ FutureDrain, SubscriberDrain, Source, FlowMaterializer } import akka.io.IO import akka.http.model.HttpMethods._ import akka.http.model._ @@ -37,8 +37,10 @@ object TestClient extends App { } yield response.header[headers.Server] def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = { - Flow(List(HttpRequest() -> 'NoContext)).produceTo(connection.processor) - Flow(connection.processor).map(_._1).toFuture() + Source(List(HttpRequest() -> 'NoContext)) + .connect(SubscriberDrain(connection.requestSubscriber)) + .run() + Source(connection.responsePublisher).map(_._1).runWith(FutureDrain()) } result onComplete { diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index 07272d678a..a349698d77 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -6,8 +6,7 @@ package akka.http import com.typesafe.config.{ ConfigFactory, Config } import scala.concurrent.duration._ -import akka.stream.scaladsl.Flow -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.{ SubscriberDrain, Source, FlowMaterializer } import akka.io.IO import akka.util.Timeout import akka.actor.ActorSystem @@ -36,10 +35,10 @@ object TestServer extends App { val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080) bindingFuture foreach { case Http.ServerBinding(localAddress, connectionStream) ⇒ - Flow(connectionStream).foreach { + Source(connectionStream).foreach { case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber) ⇒ println("Accepted new connection from " + remoteAddress) - Flow(requestPublisher).map(requestHandler).produceTo(responseSubscriber) + Source(requestPublisher).map(requestHandler).connect(SubscriberDrain(responseSubscriber)).run() } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 92b7d3a3bd..c312c42d58 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -9,10 +9,7 @@ import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher -import org.reactivestreams.Publisher -import akka.stream.scaladsl.Flow -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.{ FlattenStrategy, FlowMaterializer } +import akka.stream.scaladsl2._ import akka.util.ByteString import akka.actor.ActorSystem import akka.http.util._ @@ -168,7 +165,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { "request start" in new Test { Seq(start, "rest") should generalMultiParseTo( - Right(baseRequest.withEntity(HttpEntity.Chunked(`application/pdf`, publisher()))), + Right(baseRequest.withEntity(HttpEntity.Chunked(`application/pdf`, source()))), Left(ParseError(400: StatusCode, ErrorInfo("Illegal character 'r' in chunk start")))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -187,7 +184,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |0123456789""", """ABCDEF |dead""") should generalMultiParseTo( - Right(baseRequest.withEntity(Chunked(`application/pdf`, publisher( + Right(baseRequest.withEntity(Chunked(`application/pdf`, source( Chunk(ByteString("abc")), Chunk(ByteString("0123456789ABCDEF"), "some=stuff;bla"), Chunk(ByteString("0123456789ABCDEF"), "foo=bar"), @@ -200,7 +197,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { """0 | |""") should generalMultiParseTo( - Right(baseRequest.withEntity(Chunked(`application/pdf`, publisher(LastChunk))))) + Right(baseRequest.withEntity(Chunked(`application/pdf`, source(LastChunk))))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -213,7 +210,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { | |GE""") should generalMultiParseTo( Right(baseRequest.withEntity(Chunked(`application/pdf`, - publisher(LastChunk("nice=true", List(RawHeader("Bar", "xyz"), RawHeader("Foo", "pip apo")))))))) + source(LastChunk("nice=true", List(RawHeader("Bar", "xyz"), RawHeader("Foo", "pip apo")))))))) closeAfterResponseCompletion shouldEqual Seq(false) } } @@ -225,7 +222,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Host: ping | |""" should parseTo(HttpRequest(PATCH, "/data", List(`Transfer-Encoding`(TransferEncodings.Extension("fancy")), - Host("ping")), HttpEntity.Chunked(`application/pdf`, publisher()))) + Host("ping")), HttpEntity.Chunked(`application/pdf`, source()))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -238,7 +235,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { | |""" val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu")), - HttpEntity.Chunked(`application/octet-stream`, publisher())) + HttpEntity.Chunked(`application/octet-stream`, source())) "an illegal char after chunk size" in new Test { Seq(start, @@ -360,6 +357,21 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { private class Test { var closeAfterResponseCompletion = Seq.empty[Boolean] + class StrictEqualHttpRequest(val req: HttpRequest) { + override def equals(other: scala.Any): Boolean = other match { + case other: StrictEqualHttpRequest ⇒ + this.req.copy(entity = HttpEntity.Empty) == other.req.copy(entity = HttpEntity.Empty) && + Await.result(this.req.entity.toStrict(250.millis), 250.millis) == + Await.result(other.req.entity.toStrict(250.millis), 250.millis) + } + + override def toString = req.toString + } + + def strictEqualify(x: Either[ParseError, HttpRequest]): Either[ParseError, StrictEqualHttpRequest] = { + x.right.map(new StrictEqualHttpRequest(_)) + } + def parseTo(expected: HttpRequest*): Matcher[String] = multiParseTo(expected: _*).compose(_ :: Nil) @@ -382,48 +394,49 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { generalRawMultiParseTo(newParser, expected: _*) def generalRawMultiParseTo(parser: HttpRequestParser, expected: Either[ParseError, HttpRequest]*): Matcher[Seq[String]] = - equal(expected).matcher[Seq[Either[ParseError, HttpRequest]]] compose { input: Seq[String] ⇒ - val future = - Flow(input.toList) - .map(ByteString.apply) - .transform("parser", () ⇒ parser) - .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) - .headAndTail - .collect { - case (ParserOutput.RequestStart(method, uri, protocol, headers, createEntity, close), entityParts) ⇒ - closeAfterResponseCompletion :+= close - Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol)) - case (x: ParseError, _) ⇒ Left(x) - } - .map { x ⇒ - Flow { - x match { - case Right(request) ⇒ compactEntity(request.entity).fast.map(x ⇒ Right(request.withEntity(x))) - case Left(error) ⇒ Future.successful(Left(error)) + equal(expected.map(strictEqualify)) + .matcher[Seq[Either[ParseError, StrictEqualHttpRequest]]] compose { input: Seq[String] ⇒ + val future = + Source(input.toList) + .map(ByteString.apply) + .transform("parser", () ⇒ parser) + .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) + .headAndTail + .collect { + case (ParserOutput.RequestStart(method, uri, protocol, headers, createEntity, close), entityParts) ⇒ + closeAfterResponseCompletion :+= close + Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol)) + case (x: ParseError, _) ⇒ Left(x) + } + .map { x ⇒ + Source { + x match { + case Right(request) ⇒ compactEntity(request.entity).fast.map(x ⇒ Right(request.withEntity(x))) + case Left(error) ⇒ Future.successful(Left(error)) + } } - }.toPublisher() - } - .flatten(FlattenStrategy.concat) - .grouped(1000).toFuture() - Await.result(future, 250.millis) - } + } + .flatten(FlattenStrategy.concat) + .map(strictEqualify) + .grouped(1000).runWith(FutureDrain()) + Await.result(future, 250.millis) + } protected def parserSettings: ParserSettings = ParserSettings(system) protected def newParser = new HttpRequestParser(parserSettings, false)() private def compactEntity(entity: RequestEntity): Future[RequestEntity] = entity match { - case x: Chunked ⇒ compactEntityChunks(x.chunks).fast.map(compacted ⇒ x.copy(chunks = compacted)) + case x: Chunked ⇒ compactEntityChunks(x.chunks).fast.map(compacted ⇒ x.copy(chunks = source(compacted: _*))) case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] = - Flow(data).grouped(1000).toFuture() - .fast.map(publisher(_: _*)) - .fast.recover { case _: NoSuchElementException ⇒ publisher() } + private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Seq[ChunkStreamPart]] = + data.grouped(1000).runWith(FutureDrain()) + .fast.recover { case _: NoSuchElementException ⇒ Nil } def prep(response: String) = response.stripMarginWithNewline("\r\n") } - def publisher[T](elems: T*): Publisher[T] = SynchronousPublisherFromIterable(elems.toList) + def source[T](elems: T*): Source[T] = Source(elems.toList) } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 897086d576..9eec7d2d82 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -9,10 +9,7 @@ import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher -import org.reactivestreams.Publisher -import akka.stream.scaladsl.Flow -import akka.stream.impl.SynchronousPublisherFromIterable -import akka.stream.{ FlattenStrategy, FlowMaterializer } +import akka.stream.scaladsl2._ import akka.util.ByteString import akka.actor.ActorSystem import akka.http.util._ @@ -133,7 +130,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { "response start" in new Test { Seq(start, "rest") should generalMultiParseTo( - Right(baseResponse.withEntity(Chunked(`application/pdf`, publisher()))), + Right(baseResponse.withEntity(Chunked(`application/pdf`, source()))), Left("Illegal character 'r' in chunk start")) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -152,7 +149,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |0123456789""", """ABCDEF |dead""") should generalMultiParseTo( - Right(baseResponse.withEntity(Chunked(`application/pdf`, publisher( + Right(baseResponse.withEntity(Chunked(`application/pdf`, source( Chunk(ByteString("abc")), Chunk(ByteString("0123456789ABCDEF"), "some=stuff;bla"), Chunk(ByteString("0123456789ABCDEF"), "foo=bar"), @@ -165,7 +162,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { """0 | |""") should generalMultiParseTo( - Right(baseResponse.withEntity(Chunked(`application/pdf`, publisher(LastChunk))))) + Right(baseResponse.withEntity(Chunked(`application/pdf`, source(LastChunk))))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -178,7 +175,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { | |HT""") should generalMultiParseTo( Right(baseResponse.withEntity(Chunked(`application/pdf`, - publisher(LastChunk("nice=true", List(RawHeader("Bar", "xyz"), RawHeader("Foo", "pip apo")))))))) + source(LastChunk("nice=true", List(RawHeader("Bar", "xyz"), RawHeader("Foo", "pip apo")))))))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -188,7 +185,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Content-Type: application/pdf | |""" should parseTo(HttpResponse(headers = List(`Transfer-Encoding`(TransferEncodings.Extension("fancy"))), - entity = HttpEntity.Chunked(`application/pdf`, publisher()))) + entity = HttpEntity.Chunked(`application/pdf`, source()))) closeAfterResponseCompletion shouldEqual Seq(false) } } @@ -214,6 +211,21 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { private class Test { var closeAfterResponseCompletion = Seq.empty[Boolean] + class StrictEqualHttpResponse(val resp: HttpResponse) { + override def equals(other: scala.Any): Boolean = other match { + case other: StrictEqualHttpResponse ⇒ + this.resp.copy(entity = HttpEntity.Empty) == other.resp.copy(entity = HttpEntity.Empty) && + Await.result(this.resp.entity.toStrict(250.millis), 250.millis) == + Await.result(other.resp.entity.toStrict(250.millis), 250.millis) + } + + override def toString = resp.toString + } + + def strictEqualify(x: Either[String, HttpResponse]): Either[String, StrictEqualHttpResponse] = { + x.right.map(new StrictEqualHttpResponse(_)) + } + def parseTo(expected: HttpResponse*): Matcher[String] = parseTo(GET, expected: _*) def parseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[String] = multiParseTo(requestMethod, expected: _*).compose(_ :: Nil) @@ -234,31 +246,33 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def generalRawMultiParseTo(expected: Either[String, HttpResponse]*): Matcher[Seq[String]] = generalRawMultiParseTo(GET, expected: _*) def generalRawMultiParseTo(requestMethod: HttpMethod, expected: Either[String, HttpResponse]*): Matcher[Seq[String]] = - equal(expected).matcher[Seq[Either[String, HttpResponse]]] compose { - input: Seq[String] ⇒ - val future = - Flow(input.toList) - .map(ByteString.apply) - .transform("parser", () ⇒ newParser(requestMethod)) - .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) - .headAndTail - .collect { - case (ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒ - closeAfterResponseCompletion :+= close - Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol)) - case (x: ParseError, _) ⇒ Left(x) - }.map { x ⇒ - Flow { - x match { - case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x))) - case Left(error) ⇒ FastFuture.successful(Left(error.info.formatPretty)) + equal(expected.map(strictEqualify)) + .matcher[Seq[Either[String, StrictEqualHttpResponse]]] compose { + input: Seq[String] ⇒ + val future = + Source(input.toList) + .map(ByteString.apply) + .transform("parser", () ⇒ newParser(requestMethod)) + .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) + .headAndTail + .collect { + case (ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒ + closeAfterResponseCompletion :+= close + Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol)) + case (x: ParseError, _) ⇒ Left(x) + }.map { x ⇒ + Source { + x match { + case Right(response) ⇒ compactEntity(response.entity).fast.map(x ⇒ Right(response.withEntity(x))) + case Left(error) ⇒ FastFuture.successful(Left(error.info.formatPretty)) + } } - }.toPublisher() - } - .flatten(FlattenStrategy.concat) - .grouped(1000).toFuture() - Await.result(future, 250.millis) - } + } + .flatten(FlattenStrategy.concat) + .map(strictEqualify) + .grouped(1000).runWith(FutureDrain()) + Await.result(future, 500.millis) + } def parserSettings: ParserSettings = ParserSettings(system) def newParser(requestMethod: HttpMethod = GET) = { @@ -273,13 +287,13 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] = - Flow(data).grouped(1000).toFuture() - .fast.map(publisher(_: _*)) - .fast.recover { case _: NoSuchElementException ⇒ publisher() } + private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Source[ChunkStreamPart]] = + data.grouped(1000).runWith(FutureDrain()) + .fast.map(source(_: _*)) + .fast.recover { case _: NoSuchElementException ⇒ source() } def prep(response: String) = response.stripMarginWithNewline("\r\n") - def publisher[T](elems: T*): Publisher[T] = SynchronousPublisherFromIterable(elems.toList) + def source[T](elems: T*): Source[T] = Source(elems.toList) } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index d0ca01a784..f1801d3d9f 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -15,8 +15,7 @@ import akka.event.NoLogging import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ -import akka.stream.scaladsl.Flow -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2._ import akka.stream.impl.SynchronousPublisherFromIterable import HttpEntity._ import HttpMethods._ @@ -120,7 +119,8 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "proper render a chunked" - { "PUT request with empty chunk stream and custom Content-Type" in new TestSetup() { - HttpRequest(PUT, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain`, publisher())) should renderTo { + pending // Disabled until #15981 is fixed + HttpRequest(PUT, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain`, source())) should renderTo { """PUT /abc/xyz HTTP/1.1 |Host: test.com:8080 |User-Agent: spray-can/1.0.0 @@ -133,7 +133,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "POST request with body" in new TestSetup() { HttpRequest(POST, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain`, - publisher("XXXX", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) should renderTo { + source("XXXX", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) should renderTo { """POST /abc/xyz HTTP/1.1 |Host: test.com:8080 |User-Agent: spray-can/1.0.0 @@ -152,7 +152,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "POST request with custom Transfer-Encoding header" in new TestSetup() { HttpRequest(POST, "/abc/xyz", List(`Transfer-Encoding`(TransferEncodings.Extension("fancy"))), - entity = Chunked(ContentTypes.`text/plain`, publisher("XXXX", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) should renderTo { + entity = Chunked(ContentTypes.`text/plain`, source("XXXX", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) should renderTo { """POST /abc/xyz HTTP/1.1 |Transfer-Encoding: fancy, chunked |Host: test.com:8080 @@ -222,11 +222,11 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll def renderTo(expected: String): Matcher[HttpRequest] = equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒ val renderer = newRenderer - val byteStringPublisher :: Nil = renderer.onNext(RequestRenderingContext(request, serverAddress)) - val future = Flow(byteStringPublisher).grouped(1000).toFuture().map(_.reduceLeft(_ ++ _).utf8String) + val byteStringSource :: Nil = renderer.onNext(RequestRenderingContext(request, serverAddress)) + val future = byteStringSource.grouped(1000).runWith(FutureDrain()).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) } } - def publisher[T](elems: T*) = SynchronousPublisherFromIterable(elems.toList) + def source[T](elems: T*) = Source(elems.toList) } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala index f96594d263..82a86a714c 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala @@ -15,8 +15,7 @@ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ import akka.util.ByteString -import akka.stream.scaladsl.Flow -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2._ import akka.stream.impl.SynchronousPublisherFromIterable import HttpEntity._ @@ -127,7 +126,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "a response with a Default (streamed with explicit content-length body," - { "status 400 and a few headers" in new TestSetup() { HttpResponse(400, List(RawHeader("Age", "30"), Connection("Keep-Alive")), - entity = Default(contentType = ContentTypes.`text/plain(UTF-8)`, 23, publisher(ByteString("Small f*ck up overhere!")))) should renderTo { + entity = Default(contentType = ContentTypes.`text/plain(UTF-8)`, 23, source(ByteString("Small f*ck up overhere!")))) should renderTo { """HTTP/1.1 400 Bad Request |Age: 30 |Server: akka-http/1.0.0 @@ -141,14 +140,14 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "one chunk and incorrect (too large) Content-Length" in new TestSetup() { the[RuntimeException] thrownBy { HttpResponse(200, entity = Default(ContentTypes.`application/json`, 10, - publisher(ByteString("body123")))) should renderTo("") + source(ByteString("body123")))) should renderTo("") } should have message "HTTP message had declared Content-Length 10 but entity chunk stream amounts to 3 bytes less" } "one chunk and incorrect (too small) Content-Length" in new TestSetup() { the[RuntimeException] thrownBy { HttpResponse(200, entity = Default(ContentTypes.`application/json`, 5, - publisher(ByteString("body123")))) should renderTo("") + source(ByteString("body123")))) should renderTo("") } should have message "HTTP message had declared Content-Length 5 but entity chunk stream amounts to more bytes" } @@ -157,7 +156,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "without data" in new TestSetup() { ResponseRenderingContext( HttpResponse(200, entity = CloseDelimited(ContentTypes.`application/json`, - publisher(ByteString.empty)))) should renderTo( + source(ByteString.empty)))) should renderTo( """HTTP/1.1 200 OK |Server: akka-http/1.0.0 |Date: Thu, 25 Aug 2011 09:10:29 GMT @@ -169,7 +168,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "consisting of two parts" in new TestSetup() { ResponseRenderingContext( HttpResponse(200, entity = CloseDelimited(ContentTypes.`application/json`, - publisher(ByteString("abc"), ByteString("defg"))))) should renderTo( + source(ByteString("abc"), ByteString("defg"))))) should renderTo( """HTTP/1.1 200 OK |Server: akka-http/1.0.0 |Date: Thu, 25 Aug 2011 09:10:29 GMT @@ -182,8 +181,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "a chunked response" - { "with empty entity" in new TestSetup() { + pending // Disabled until #15981 is fixed HttpResponse(200, List(RawHeader("Age", "30")), - Chunked(ContentTypes.NoContentType, publisher())) should renderTo { + Chunked(ContentTypes.NoContentType, source())) should renderTo { """HTTP/1.1 200 OK |Age: 30 |Server: akka-http/1.0.0 @@ -194,8 +194,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll } "with empty entity but non-default Content-Type" in new TestSetup() { + pending // Disabled until #15981 is fixed HttpResponse(200, List(RawHeader("Age", "30")), - Chunked(ContentTypes.`application/json`, publisher())) should renderTo { + Chunked(ContentTypes.`application/json`, source())) should renderTo { """HTTP/1.1 200 OK |Age: 30 |Server: akka-http/1.0.0 @@ -208,7 +209,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "with one chunk and no explicit LastChunk" in new TestSetup() { HttpResponse(entity = Chunked(ContentTypes.`text/plain(UTF-8)`, - publisher("Yahoooo"))) should renderTo { + source("Yahoooo"))) should renderTo { """HTTP/1.1 200 OK |Server: akka-http/1.0.0 |Date: Thu, 25 Aug 2011 09:10:29 GMT @@ -225,7 +226,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "with one chunk and an explicit LastChunk" in new TestSetup() { HttpResponse(entity = Chunked(ContentTypes.`text/plain(UTF-8)`, - publisher(Chunk(ByteString("body123"), """key=value;another="tl;dr""""), + source(Chunk(ByteString("body123"), """key=value;another="tl;dr""""), LastChunk("foo=bar", List(RawHeader("Age", "30"), RawHeader("Cache-Control", "public")))))) should renderTo { """HTTP/1.1 200 OK |Server: akka-http/1.0.0 @@ -245,7 +246,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "with a custom Transfer-Encoding header" in new TestSetup() { HttpResponse(headers = List(`Transfer-Encoding`(TransferEncodings.Extension("fancy"))), - entity = Chunked(ContentTypes.`text/plain(UTF-8)`, publisher("Yahoooo"))) should renderTo { + entity = Chunked(ContentTypes.`text/plain(UTF-8)`, source("Yahoooo"))) should renderTo { """HTTP/1.1 200 OK |Transfer-Encoding: fancy, chunked |Server: akka-http/1.0.0 @@ -266,7 +267,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll ResponseRenderingContext( requestProtocol = HttpProtocols.`HTTP/1.0`, response = HttpResponse(entity = Chunked(ContentTypes.`application/json`, - publisher(Chunk("abc"), Chunk("defg"))))) should renderTo( + source(Chunk("abc"), Chunk("defg"))))) should renderTo( """HTTP/1.1 200 OK |Server: akka-http/1.0.0 |Date: Thu, 25 Aug 2011 09:10:29 GMT @@ -279,7 +280,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll ResponseRenderingContext( requestProtocol = HttpProtocols.`HTTP/1.0`, response = HttpResponse(entity = Chunked(ContentTypes.`text/plain(UTF-8)`, - publisher(Chunk(ByteString("body123"), """key=value;another="tl;dr""""), + source(Chunk(ByteString("body123"), """key=value;another="tl;dr""""), LastChunk("foo=bar", List(RawHeader("Age", "30"), RawHeader("Cache-Control", "public"))))))) should renderTo( """HTTP/1.1 200 OK |Server: akka-http/1.0.0 @@ -367,13 +368,13 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] = equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx ⇒ val renderer = newRenderer - val byteStringPublisher :: Nil = renderer.onNext(ctx) - val future = Flow(byteStringPublisher).grouped(1000).toFuture().map(_.reduceLeft(_ ++ _).utf8String) + val byteStringSource :: Nil = renderer.onNext(ctx) + val future = byteStringSource.grouped(1000).runWith(FutureDrain()).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) -> renderer.isComplete } override def dateTime(now: Long) = DateTime(2011, 8, 25, 9, 10, 29) // provide a stable date for testing } - def publisher[T](elems: T*) = SynchronousPublisherFromIterable(elems.toList) + def source[T](elems: T*) = Source(elems.toList) } diff --git a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala index 3732befa56..dedad1a2a9 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala @@ -11,7 +11,7 @@ import akka.http.model._ import akka.http.model.headers.{ ProductVersion, Server, Host } import akka.http.util._ import akka.http.Http -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2._ import akka.stream.io.StreamTcp import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.util.ByteString @@ -39,7 +39,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNoMsg(50.millis) @@ -75,7 +75,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -111,7 +111,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -133,7 +133,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -181,7 +181,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -217,7 +217,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -253,7 +253,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -275,7 +275,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -297,7 +297,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -319,7 +319,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.subscribe(dataProbe) + data.connect(SubscriberDrain(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -384,7 +384,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.GET, _, _, _, _) ⇒ - responsesSub.sendNext(HttpResponse(entity = HttpEntity.Default(ContentTypes.`text/plain`, 4, data))) + responsesSub.sendNext(HttpResponse(entity = HttpEntity.Default(ContentTypes.`text/plain`, 4, Source(data)))) netOutSub.request(1) @@ -413,7 +413,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.GET, _, _, _, _) ⇒ - responsesSub.sendNext(HttpResponse(entity = HttpEntity.CloseDelimited(ContentTypes.`text/plain`, data))) + responsesSub.sendNext(HttpResponse(entity = HttpEntity.CloseDelimited(ContentTypes.`text/plain`, Source(data)))) netOutSub.request(1) @@ -444,7 +444,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.GET, _, _, _, _) ⇒ - responsesSub.sendNext(HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, data))) + responsesSub.sendNext(HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, Source(data)))) netOutSub.request(1) @@ -474,7 +474,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.GET, _, _, _, _) ⇒ - responsesSub.sendNext(HttpResponse(entity = HttpEntity.CloseDelimited(ContentTypes.`text/plain`, data))) + responsesSub.sendNext(HttpResponse(entity = HttpEntity.CloseDelimited(ContentTypes.`text/plain`, Source(data)))) netOutSub.request(1) diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index 9a09e949ff..e75b1d48c9 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -14,9 +14,8 @@ import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } import org.scalatest.matchers.Matcher import akka.util.ByteString import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import akka.stream.{ Transformer, FlowMaterializer } -import akka.stream.impl.SynchronousPublisherFromIterable +import akka.stream.scaladsl2._ +import akka.stream.Transformer import akka.http.model.HttpEntity._ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { @@ -41,16 +40,16 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { Strict(tpe, abc) must collectBytesTo(abc) } "Default" in { - Default(tpe, 11, publisher(abc, de, fgh, ijk)) must collectBytesTo(abc, de, fgh, ijk) + Default(tpe, 11, source(abc, de, fgh, ijk)) must collectBytesTo(abc, de, fgh, ijk) } "CloseDelimited" in { - CloseDelimited(tpe, publisher(abc, de, fgh, ijk)) must collectBytesTo(abc, de, fgh, ijk) + CloseDelimited(tpe, source(abc, de, fgh, ijk)) must collectBytesTo(abc, de, fgh, ijk) } "Chunked w/o LastChunk" in { - Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk))) must collectBytesTo(abc, fgh, ijk) + Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk))) must collectBytesTo(abc, fgh, ijk) } "Chunked with LastChunk" in { - Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must collectBytesTo(abc, fgh, ijk) + Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must collectBytesTo(abc, fgh, ijk) } } "support toStrict" - { @@ -58,26 +57,25 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { Strict(tpe, abc) must strictifyTo(Strict(tpe, abc)) } "Default" in { - Default(tpe, 11, publisher(abc, de, fgh, ijk)) must + Default(tpe, 11, source(abc, de, fgh, ijk)) must strictifyTo(Strict(tpe, abc ++ de ++ fgh ++ ijk)) } "CloseDelimited" in { - CloseDelimited(tpe, publisher(abc, de, fgh, ijk)) must + CloseDelimited(tpe, source(abc, de, fgh, ijk)) must strictifyTo(Strict(tpe, abc ++ de ++ fgh ++ ijk)) } "Chunked w/o LastChunk" in { - Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk))) must + Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk))) must strictifyTo(Strict(tpe, abc ++ fgh ++ ijk)) } "Chunked with LastChunk" in { - Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must + Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must strictifyTo(Strict(tpe, abc ++ fgh ++ ijk)) } "Infinite data stream" in { val neverCompleted = Promise[ByteString]() - val stream: Publisher[ByteString] = Flow(neverCompleted.future).toPublisher() intercept[TimeoutException] { - Await.result(Default(tpe, 42, stream).toStrict(100.millis), 150.millis) + Await.result(Default(tpe, 42, Source(neverCompleted.future)).toStrict(100.millis), 150.millis) }.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data") } } @@ -86,29 +84,29 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { Strict(tpe, abc) must transformTo(Strict(tpe, doubleChars("abc") ++ trailer)) } "Default" in { - Default(tpe, 11, publisher(abc, de, fgh, ijk)) must + Default(tpe, 11, source(abc, de, fgh, ijk)) must transformTo(Strict(tpe, doubleChars("abcdefghijk") ++ trailer)) } "CloseDelimited" in { - CloseDelimited(tpe, publisher(abc, de, fgh, ijk)) must + CloseDelimited(tpe, source(abc, de, fgh, ijk)) must transformTo(Strict(tpe, doubleChars("abcdefghijk") ++ trailer)) } "Chunked w/o LastChunk" in { - Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk))) must + Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk))) must transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer)) } "Chunked with LastChunk" in { - Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must + Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer)) } } } - def publisher[T](elems: T*) = SynchronousPublisherFromIterable(elems.toList) + def source[T](elems: T*) = Source(elems.toList) def collectBytesTo(bytes: ByteString*): Matcher[HttpEntity] = equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity ⇒ - val future = Flow(entity.dataBytes).grouped(1000).toFuture() + val future = entity.dataBytes.grouped(1000).runWith(FutureDrain()) Await.result(future, 250.millis) } diff --git a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala index 4753908b5c..0fbeb900b3 100644 --- a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala @@ -62,7 +62,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte entity = HttpEntity.Default( contentType = ContentTypes.`application/json`, contentLength = 5, - Source(List(ByteString("hello"))).runWith(PublisherDrain()))) + Source(List(ByteString("hello"))))) // Our library uses a simple model of headers: a Seq[(String, String)]. // The body is represented as an Array[Byte]. To get the headers in @@ -141,7 +141,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte // convert the body into a Publisher[ByteString]. val byteStringBody = ByteString(byteArrayBody) - val publisherBody = Source(List(byteStringBody)).runWith(PublisherDrain()) + val publisherBody = Source(List(byteStringBody)) // Finally we can create our HttpResponse. diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala index 69867285df..69624ceed0 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTest.scala @@ -12,7 +12,7 @@ import scala.util.DynamicVariable import scala.reflect.ClassTag import org.scalatest.Suite import akka.actor.ActorSystem -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.FlowMaterializer import akka.http.client.RequestBuilding import akka.http.util.FastFuture import akka.http.server._ @@ -121,7 +121,7 @@ trait RouteTest extends RequestBuilding with RouteTestResultComponent { new TildeArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult def apply(request: HttpRequest, route: Route): Out = { - val routeTestResult = new RouteTestResult(timeout.duration)(setup.materializer) + val routeTestResult = new RouteTestResult(timeout.duration) val effectiveRequest = request.withEffectiveUri( securedConnection = defaultHostInfo.securedConnection, diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index 3f84c9dc1d..681cf711b5 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -5,12 +5,10 @@ package akka.http.testkit import java.util.concurrent.CountDownLatch -import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext } -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2._ import akka.http.model.HttpEntity.ChunkStreamPart import akka.http.server._ import akka.http.model._ @@ -82,21 +80,21 @@ trait RouteTestResultComponent { case HttpEntity.Default(contentType, contentLength, data) ⇒ val dataChunks = awaitAllElements(data); - { () ⇒ HttpEntity.Default(contentType, contentLength, Flow(dataChunks).toPublisher()) } + { () ⇒ HttpEntity.Default(contentType, contentLength, Source(dataChunks)) } case HttpEntity.CloseDelimited(contentType, data) ⇒ val dataChunks = awaitAllElements(data); - { () ⇒ HttpEntity.CloseDelimited(contentType, Flow(dataChunks).toPublisher()) } + { () ⇒ HttpEntity.CloseDelimited(contentType, Source(dataChunks)) } case HttpEntity.Chunked(contentType, chunks) ⇒ val dataChunks = awaitAllElements(chunks); - { () ⇒ HttpEntity.Chunked(contentType, Flow(dataChunks).toPublisher()) } + { () ⇒ HttpEntity.Chunked(contentType, Source(dataChunks)) } } private def failNeitherCompletedNorRejected(): Nothing = failTest("Request was neither completed nor rejected within " + timeout) - private def awaitAllElements[T](data: Publisher[T]): immutable.Seq[T] = - Await.result(Flow(data).grouped(Int.MaxValue).toFuture(), timeout) + private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] = + Await.result(data.grouped(Int.MaxValue).runWith(FutureDrain()), timeout) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/FormDataSpec.scala b/akka-http-tests/src/test/scala/akka/http/FormDataSpec.scala index 81b2efb471..62957d1e9e 100644 --- a/akka-http-tests/src/test/scala/akka/http/FormDataSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/FormDataSpec.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import org.scalatest.concurrent.ScalaFutures import akka.actor.ActorSystem -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.FlowMaterializer import akka.http.unmarshalling.Unmarshal import akka.http.marshalling.Marshal import akka.http.model._ diff --git a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala index 98d838c343..cd663aec54 100644 --- a/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/marshalling/MarshallingSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.FlowMaterializer import akka.http.util._ import akka.http.model._ import headers._ diff --git a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala index 3538dd3200..4fbac9bf17 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala @@ -8,7 +8,7 @@ import com.typesafe.config.{ ConfigFactory, Config } import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.io.IO -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2.FlowMaterializer import akka.util.Timeout import akka.pattern.ask import akka.http.Http diff --git a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala index f96b4d3f90..c93f831899 100644 --- a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala @@ -10,8 +10,7 @@ import scala.concurrent.{ Future, Await } import org.scalatest.matchers.Matcher import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import akka.stream.FlowMaterializer +import akka.stream.scaladsl2._ import akka.http.model._ import akka.http.util._ import headers._ @@ -96,7 +95,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |test@there.com |-----""".stripMarginWithNewline("\r\n"))) .to[MultipartContent], 1.second) - Await.result(Flow(mpc.parts).toFuture().failed, 1.second).getMessage shouldEqual + Await.result(mpc.parts.runWith(FutureDrain()).failed, 1.second).getMessage shouldEqual "multipart part must not contain more than one Content-Type header" } @@ -174,14 +173,14 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def haveParts[T <: MultipartParts](parts: BodyPart*): Matcher[Future[T]] = equal(parts).matcher[Seq[BodyPart]] compose { x ⇒ Await.result(x - .fast.flatMap(x ⇒ Flow(x.parts).grouped(100).toFuture()) + .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(FutureDrain())) .fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second) } def haveFormData(fields: (String, BodyPart)*): Matcher[Future[MultipartFormData]] = equal(fields).matcher[Seq[(String, BodyPart)]] compose { x ⇒ Await.result(x - .fast.flatMap(x ⇒ Flow(x.parts).grouped(100).toFuture()) + .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(FutureDrain())) .fast.recover { case _: NoSuchElementException ⇒ Nil } .fast.map { _ map { part ⇒ diff --git a/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala b/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala index 2a1a8ba4b9..28a8fd4307 100644 --- a/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala +++ b/akka-http/src/main/scala/akka/http/client/RequestBuilding.scala @@ -62,7 +62,7 @@ trait RequestBuilding extends TransformerPipelineSupport { val Head = new RequestBuilder(HEAD) // TODO: reactivate after HTTP message encoding has been ported - //def encode(encoder: Encoder, flow: FlowMaterializer): RequestTransformer = encoder.encode(_, flow) + //def encode(encoder: Encoder): RequestTransformer = encoder.encode(_, flow) def addHeader(header: HttpHeader): RequestTransformer = _.mapHeaders(header +: _) diff --git a/akka-http/src/main/scala/akka/http/coding/DataMapper.scala b/akka-http/src/main/scala/akka/http/coding/DataMapper.scala index e3dce51d5b..32b997a7ae 100644 --- a/akka-http/src/main/scala/akka/http/coding/DataMapper.scala +++ b/akka-http/src/main/scala/akka/http/coding/DataMapper.scala @@ -5,22 +5,22 @@ package akka.http.coding import akka.http.model.{ HttpRequest, HttpResponse, ResponseEntity, RequestEntity } -import akka.stream.{ Transformer, FlowMaterializer } +import akka.stream.Transformer import akka.util.ByteString /** An abstraction to transform data bytes of HttpMessages or HttpEntities */ sealed trait DataMapper[T] { - def transformDataBytes(t: T, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): T + def transformDataBytes(t: T, transformer: () ⇒ Transformer[ByteString, ByteString]): T } object DataMapper { implicit val mapRequestEntity: DataMapper[RequestEntity] = new DataMapper[RequestEntity] { - def transformDataBytes(t: RequestEntity, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): RequestEntity = + def transformDataBytes(t: RequestEntity, transformer: () ⇒ Transformer[ByteString, ByteString]): RequestEntity = t.transformDataBytes(transformer) } implicit val mapResponseEntity: DataMapper[ResponseEntity] = new DataMapper[ResponseEntity] { - def transformDataBytes(t: ResponseEntity, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): ResponseEntity = + def transformDataBytes(t: ResponseEntity, transformer: () ⇒ Transformer[ByteString, ByteString]): ResponseEntity = t.transformDataBytes(transformer) } @@ -29,7 +29,7 @@ object DataMapper { def mapMessage[T, E](entityMapper: DataMapper[E])(mapEntity: (T, E ⇒ E) ⇒ T): DataMapper[T] = new DataMapper[T] { - def transformDataBytes(t: T, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): T = + def transformDataBytes(t: T, transformer: () ⇒ Transformer[ByteString, ByteString]): T = mapEntity(t, entityMapper.transformDataBytes(_, transformer)) } } \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/coding/Decoder.scala b/akka-http/src/main/scala/akka/http/coding/Decoder.scala index 46b0e80bdb..c5369b7d85 100644 --- a/akka-http/src/main/scala/akka/http/coding/Decoder.scala +++ b/akka-http/src/main/scala/akka/http/coding/Decoder.scala @@ -4,23 +4,21 @@ package akka.http.coding -import java.io.{ OutputStream, ByteArrayOutputStream } - import akka.http.model._ import akka.http.util.StreamUtils -import akka.stream.{ Transformer, FlowMaterializer } +import akka.stream.Transformer import akka.util.ByteString import headers.HttpEncoding trait Decoder { def encoding: HttpEncoding - def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = + def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T]): T#Self = if (message.headers exists Encoder.isContentEncodingHeader) decodeData(message).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader) else message.self - def decodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = + def decodeData[T](t: T)(implicit mapper: DataMapper[T]): T = mapper.transformDataBytes(t, newDecodeTransfomer) def newDecompressor: Decompressor diff --git a/akka-http/src/main/scala/akka/http/coding/Encoder.scala b/akka-http/src/main/scala/akka/http/coding/Encoder.scala index b148afeb5d..3f41ed36b0 100644 --- a/akka-http/src/main/scala/akka/http/coding/Encoder.scala +++ b/akka-http/src/main/scala/akka/http/coding/Encoder.scala @@ -7,7 +7,7 @@ package akka.http.coding import java.io.ByteArrayOutputStream import akka.http.model._ import akka.http.util.StreamUtils -import akka.stream.{ Transformer, FlowMaterializer } +import akka.stream.Transformer import akka.util.ByteString import headers._ @@ -16,12 +16,12 @@ trait Encoder { def messageFilter: HttpMessage ⇒ Boolean - def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = + def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T]): T#Self = if (messageFilter(message) && !message.headers.exists(Encoder.isContentEncodingHeader)) encodeData(message).withHeaders(`Content-Encoding`(encoding) +: message.headers) else message.self - def encodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = + def encodeData[T](t: T)(implicit mapper: DataMapper[T]): T = mapper.transformDataBytes(t, newEncodeTransformer) def newCompressor: Compressor diff --git a/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala b/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala index 5e736372c8..f945e30f33 100644 --- a/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala +++ b/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala @@ -5,7 +5,6 @@ package akka.http.coding import akka.http.model._ -import akka.stream.FlowMaterializer import akka.util.ByteString import headers.HttpEncodings @@ -15,10 +14,10 @@ import headers.HttpEncodings object NoEncoding extends Decoder with Encoder { val encoding = HttpEncodings.identity - override def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = message.self - override def encodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = t - override def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = message.self - override def decodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = t + override def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T]): T#Self = message.self + override def encodeData[T](t: T)(implicit mapper: DataMapper[T]): T = t + override def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T]): T#Self = message.self + override def decodeData[T](t: T)(implicit mapper: DataMapper[T]): T = t val messageFilter: HttpMessage ⇒ Boolean = _ ⇒ false diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 6b2b014766..9be6266ea7 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -8,8 +8,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.ActorRefFactory import akka.parboiled2.util.Base64 -import akka.stream.{ FlattenStrategy, FlowMaterializer } -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2._ import akka.http.engine.rendering.BodyPartRenderer import akka.http.util.actorSystem import akka.http.util.FastFuture._ @@ -28,19 +27,18 @@ trait MultipartMarshallers { Base64.custom.encodeToString(array, false) } - implicit def multipartByteRangesMarshaller(implicit fm: FlowMaterializer, refFactory: ActorRefFactory): ToEntityMarshaller[MultipartByteRanges] = - multipartPartsMarshaller[MultipartByteRanges](`multipart/byteranges`) - implicit def multipartContentMarshaller(implicit fm: FlowMaterializer, refFactory: ActorRefFactory): ToEntityMarshaller[MultipartContent] = - multipartPartsMarshaller[MultipartContent](`multipart/mixed`) + implicit def multipartByteRangesMarshaller(implicit refFactory: ActorRefFactory): ToEntityMarshaller[MultipartByteRanges] = + multipartPartsMarshaller[MultipartByteRanges](`multipart/byteranges`)(refFactory) + implicit def multipartContentMarshaller(implicit refFactory: ActorRefFactory): ToEntityMarshaller[MultipartContent] = + multipartPartsMarshaller[MultipartContent](`multipart/mixed`)(refFactory) - private def multipartPartsMarshaller[T <: MultipartParts](mediaType: MultipartMediaType)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): ToEntityMarshaller[T] = { + private def multipartPartsMarshaller[T <: MultipartParts](mediaType: MultipartMediaType)(implicit refFactory: ActorRefFactory): ToEntityMarshaller[T] = { val boundary = randomBoundary val mediaTypeWithBoundary = mediaType withBoundary boundary Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset) ⇒ val log = actorSystem(refFactory).log val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, log) - val chunks = Flow(value.parts).transform("bodyPartRenderer", () ⇒ bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher() + val chunks = value.parts.transform("bodyPartRenderer", () ⇒ bodyPartRenderer).flatten(FlattenStrategy.concat) HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks) } } diff --git a/akka-http/src/main/scala/akka/http/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/server/RequestContext.scala index d30aa36363..55d0b85935 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContext.scala @@ -7,7 +7,6 @@ package akka.http.server import scala.collection.immutable import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter -import akka.stream.FlowMaterializer import akka.http.marshalling.ToResponseMarshallable import akka.http.model._ @@ -28,11 +27,6 @@ trait RequestContext { */ def executionContext: ExecutionContext - /** - * The default FlowMaterializer to be used for stream-based logic related to this request. - */ - def flowMaterializer: FlowMaterializer - /** * The default LoggingAdapter to be used for logging messages related to this request. */ @@ -41,9 +35,7 @@ trait RequestContext { /** * Returns a copy of this context with the given fields updated. */ - def reconfigure(executionContext: ExecutionContext = executionContext, - flowMaterializer: FlowMaterializer = flowMaterializer, - log: LoggingAdapter = log): RequestContext + def reconfigure(executionContext: ExecutionContext = executionContext, log: LoggingAdapter = log): RequestContext /** * Completes the request with the given ToResponseMarshallable. diff --git a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala index bd2795472b..95241c601e 100644 --- a/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/server/RequestContextImpl.scala @@ -7,7 +7,6 @@ package akka.http.server import scala.collection.immutable import scala.concurrent.{ Future, ExecutionContext } import akka.event.LoggingAdapter -import akka.stream.FlowMaterializer import akka.http.marshalling.ToResponseMarshallable import akka.http.util.{ FastFuture, identityFunc } import akka.http.model._ @@ -20,17 +19,14 @@ private[http] class RequestContextImpl( val request: HttpRequest, val unmatchedPath: Uri.Path, val executionContext: ExecutionContext, - val flowMaterializer: FlowMaterializer, val log: LoggingAdapter, finish: RouteResult ⇒ Future[RouteResult] = FastFuture.successful) extends RequestContext { - def this(request: HttpRequest, log: LoggingAdapter)(implicit ec: ExecutionContext, fm: FlowMaterializer) = - this(request, request.uri.path, ec, fm, log) + def this(request: HttpRequest, log: LoggingAdapter)(implicit ec: ExecutionContext) = + this(request, request.uri.path, ec, log) - def reconfigure(executionContext: ExecutionContext, - flowMaterializer: FlowMaterializer, - log: LoggingAdapter): RequestContext = - copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log) + def reconfigure(executionContext: ExecutionContext, log: LoggingAdapter): RequestContext = + copy(executionContext = executionContext, log = log) override def complete(trm: ToResponseMarshallable): Future[RouteResult] = trm(request)(executionContext) @@ -107,8 +103,7 @@ private[http] class RequestContextImpl( private def copy(request: HttpRequest = request, unmatchedPath: Uri.Path = unmatchedPath, executionContext: ExecutionContext = executionContext, - flowMaterializer: FlowMaterializer = flowMaterializer, log: LoggingAdapter = log, finish: RouteResult ⇒ Future[RouteResult] = finish) = - new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, finish) + new RequestContextImpl(request, unmatchedPath, executionContext, log, finish) } diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala index 7d4e170f75..9708f9a398 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala @@ -7,7 +7,6 @@ package akka.http.server import scala.concurrent.ExecutionContext import akka.actor.{ ActorSystem, ActorContext } import akka.event.LoggingAdapter -import akka.stream.FlowMaterializer import akka.http.model.HttpRequest import akka.http.Http @@ -33,12 +32,10 @@ class RoutingSetup( val settings: RoutingSettings, val exceptionHandler: ExceptionHandler, val rejectionHandler: RejectionHandler, - val flowMaterializer: FlowMaterializer, val executionContext: ExecutionContext, val routingLog: RoutingLog) { // enable `import setup._` to properly bring implicits in scope - implicit def materializer: FlowMaterializer = flowMaterializer implicit def executor: ExecutionContext = executionContext } @@ -46,14 +43,12 @@ object RoutingSetup { implicit def apply(implicit routingSettings: RoutingSettings, exceptionHandler: ExceptionHandler = null, rejectionHandler: RejectionHandler = null, - flowMaterializer: FlowMaterializer, executionContext: ExecutionContext, routingLog: RoutingLog): RoutingSetup = new RoutingSetup( routingSettings, if (exceptionHandler ne null) exceptionHandler else ExceptionHandler.default(routingSettings), if (rejectionHandler ne null) rejectionHandler else RejectionHandler.default(executionContext), - flowMaterializer, executionContext, routingLog) } diff --git a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala index 4d98c81fd5..034fa602c2 100644 --- a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala +++ b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala @@ -5,8 +5,7 @@ package akka.http.server import scala.concurrent.{ ExecutionContext, Future } -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2._ import akka.http.util.FastFuture import akka.http.model.{ HttpRequest, HttpResponse } import akka.http.Http @@ -49,14 +48,14 @@ trait ScalaRoutingDSL extends Directives { run(_ ⇒ handler) private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Future[HttpResponse]): Unit = - Flow(binding.connectionStream).foreach { + Source(binding.connectionStream).foreach { case connection @ Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ val setup = setupProvider(connection) setup.routingLog.log.debug("Accepted new connection from " + remoteAddress) val runner = f(setup) - Flow(requestProducer) - .mapFuture(request ⇒ runner(request)) - .produceTo(responseConsumer)(setup.flowMaterializer) + Source(requestProducer) + .mapAsync(request ⇒ runner(request)) + .connect(SubscriberDrain(responseConsumer)).run()(fm) } } } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 9ee514d3db..29faa2be10 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -4,10 +4,8 @@ package akka.http.unmarshalling -import org.reactivestreams.Publisher import akka.actor.ActorRefFactory -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2._ import akka.http.engine.parsing.BodyPartParser import akka.http.model._ import akka.http.util._ @@ -17,48 +15,41 @@ import HttpCharsets._ trait MultipartUnmarshallers { - implicit def defaultMultipartContentUnmarshaller(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory) = multipartContentUnmarshaller(`UTF-8`) - def multipartContentUnmarshaller(defaultCharset: HttpCharset)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartContent] = + implicit def defaultMultipartContentUnmarshaller(implicit refFactory: ActorRefFactory) = multipartContentUnmarshaller(`UTF-8`) + def multipartContentUnmarshaller(defaultCharset: HttpCharset)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartContent] = multipartPartsUnmarshaller[MultipartContent](`multipart/*`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartContent(_)) - implicit def defaultMultipartByteRangesUnmarshaller(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory) = multipartByteRangesUnmarshaller(`UTF-8`) - def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartByteRanges] = + implicit def defaultMultipartByteRangesUnmarshaller(implicit refFactory: ActorRefFactory) = multipartByteRangesUnmarshaller(`UTF-8`) + def multipartByteRangesUnmarshaller(defaultCharset: HttpCharset)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartByteRanges] = multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_)) - def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Publisher[BodyPart] ⇒ T)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[T] = + def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Source[BodyPart] ⇒ T)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[T] = Unmarshaller { entity ⇒ if (mediaRange matches entity.contentType.mediaType) { entity.contentType.mediaType.params.get("boundary") match { case None ⇒ UnmarshallingError.InvalidContent("Content-Type with a multipart media type must have a 'boundary' parameter") case Some(boundary) ⇒ - val bodyParts = Flow(entity.dataBytes(fm)) + val bodyParts = entity.dataBytes .transform("bodyPart", () ⇒ new BodyPartParser(defaultContentType, boundary, actorSystem(refFactory).log)) .splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart]) - .headAndTail(fm) + .headAndTail .collect { case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts) ⇒ BodyPart(createEntity(entityParts), headers) case (BodyPartParser.ParseError(errorInfo), _) ⇒ throw new ParsingException(errorInfo) - }.toPublisher()(fm) + } FastFuture.successful(create(bodyParts)) } } else UnmarshallingError.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil) } - implicit def defaultMultipartFormDataUnmarshaller(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = + implicit def defaultMultipartFormDataUnmarshaller(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = multipartFormDataUnmarshaller(verifyIntegrity = true) - def multipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = + def multipartFormDataUnmarshaller(verifyIntegrity: Boolean = true)(implicit refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts ⇒ def verify(part: BodyPart): BodyPart = part // TODO - val parts = if (verifyIntegrity) Flow(bodyParts).map(verify).toPublisher()(fm) else bodyParts + val parts = if (verifyIntegrity) bodyParts.map(verify) else bodyParts MultipartFormData(parts) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index 458b0f55ed..074db32637 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -7,8 +7,7 @@ package akka.http.unmarshalling import java.io.{ ByteArrayInputStream, InputStreamReader } import scala.concurrent.ExecutionContext import scala.xml.{ XML, NodeSeq } -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2._ import akka.util.ByteString import akka.http.util.FastFuture import akka.http.model._ @@ -19,7 +18,7 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = Unmarshaller { case HttpEntity.Strict(_, data) ⇒ FastFuture.successful(data) - case entity ⇒ Flow(entity.dataBytes(fm)).fold(ByteString.empty)(_ ++ _).toFuture() + case entity ⇒ entity.dataBytes.fold(ByteString.empty)(_ ++ _) } implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8d258d43d6..ad7d959309 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -52,8 +52,8 @@ object Flow { */ def apply[T](f: () ⇒ Option[T]): Flow[T] = FlowImpl(ThunkPublisherNode(() ⇒ f() match { - case Some(t) => t - case _ => throw Stop + case Some(t) ⇒ t + case _ ⇒ throw Stop }), Nil) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala index c0d4122d6c..326b17192a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala @@ -270,3 +270,18 @@ final case class FoldDrain[U, In](zero: U)(f: (U, In) ⇒ U) extends DrainWithKe } } +/** + * A drain that immediately cancels its upstream upon materialization. + */ +final case object CancelDrain extends SimpleDrain[Any] { + + override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = { + flowPublisher.subscribe(new Subscriber[Any] { + override def onError(t: Throwable): Unit = () + override def onSubscribe(s: Subscription): Unit = s.cancel() + override def onComplete(): Unit = () + override def onNext(t: Any): Unit = () + }) + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index c982edd1cd..6f009bfd45 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -43,6 +43,11 @@ object Sink { def apply[T](graph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ UndefinedSource[T]): Sink[T] = createSinkFromBuilder(new FlowGraphBuilder(graph.graph), block) + /** + * A `Sink` that immediately cancels its upstream after materialization. + */ + def cancelled[T]: Drain[T] = CancelDrain + private def createSinkFromBuilder[T](builder: FlowGraphBuilder, block: FlowGraphBuilder ⇒ UndefinedSource[T]): Sink[T] = { val in = block(builder) builder.partialBuild().toSink(in) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index 12f83729ec..973c6a3eee 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl2 +import akka.stream.impl.{ ErrorPublisher, EmptyPublisher, SynchronousPublisherFromIterable } import org.reactivestreams.{ Subscriber, Publisher } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable @@ -59,6 +60,22 @@ trait Source[+Out] extends FlowOps[Out] { def foreach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachDrain(f)) + /** + * Concatenates a second source so that the first element + * emitted by that source is emitted after the last element of this + * source. + */ + def concat[Out2 >: Out](second: Source[Out2]): Source[Out2] = Source.concat(this, second) + + /** + * Concatenates a second source so that the first element + * emitted by that source is emitted after the last element of this + * source. + * + * This is a shorthand for [[concat]] + */ + def ++[Out2 >: Out](second: Source[Out2]): Source[Out2] = concat(second) + } object Source { @@ -155,4 +172,10 @@ object Source { val out = block(builder) builder.partialBuild().toSource(out) } + /** + * Concatenates two sources so that the first element + * emitted by the second source is emitted after the last element of the first + * source. + */ + def concat[T](source1: Source[T], source2: Source[T]): Source[T] = ConcatTap(source1, source2) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala index 402ce40b37..159bf151d9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala @@ -229,3 +229,22 @@ final case class TickTap[Out](initialDelay: FiniteDuration, interval: FiniteDura name = s"$flowName-0-tick")) } +/** + * This tap takes two Sources and concatenates them together by draining the elements coming from the first Source + * completely, then draining the elements arriving from the second Source. If the first Source is infinite then the + * second Source will be never drained. + */ +final case class ConcatTap[Out](source1: Source[Out], source2: Source[Out]) extends SimpleTap[Out] { + + override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = { + val concatter = Concat[Out] + val concatGraph = FlowGraph { builder ⇒ + builder + .addEdge(source1, Pipe.empty[Out], concatter.first) + .addEdge(source2, Pipe.empty[Out], concatter.second) + .addEdge(concatter.out, SubscriberDrain(flowSubscriber)) + }.run()(materializer) + } + + override def isActive: Boolean = false +}