From a4c279c9c5a7423bb34d9a985bca1c92360d5be9 Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 2 Apr 2015 16:54:12 +0200 Subject: [PATCH] !htc #17039, #16933 change default materialization placeholder type from `Unit` to `Any` --- .../akka/http/model/japi/HttpEntities.java | 8 +-- .../java/akka/http/model/japi/HttpEntity.java | 2 +- .../http/model/japi/HttpEntityChunked.java | 2 +- .../model/japi/HttpEntityCloseDelimited.java | 2 +- .../http/model/japi/HttpEntityDefault.java | 2 +- .../japi/HttpEntityIndefiniteLength.java | 2 +- .../src/main/scala/akka/http/Http.scala | 6 +- .../engine/parsing/HttpRequestParser.scala | 4 +- .../engine/rendering/BodyPartRenderer.scala | 16 ++--- .../HttpRequestRendererFactory.scala | 6 +- .../HttpResponseRendererFactory.scala | 8 +-- .../http/engine/rendering/RenderSupport.scala | 10 +-- .../engine/server/HttpServerBluePrint.scala | 2 +- .../scala/akka/http/model/HttpEntity.scala | 61 +++++++++---------- .../scala/akka/http/model/HttpMessage.scala | 6 ++ .../scala/akka/http/model/Multipart.scala | 24 ++++---- .../scala/akka/http/util/StreamUtils.scala | 5 +- .../main/scala/akka/http/util/package.scala | 4 +- .../engine/parsing/RequestParserSpec.scala | 2 +- .../engine/parsing/ResponseParserSpec.scala | 2 +- .../MultipartUnmarshallers.scala | 2 +- 21 files changed, 90 insertions(+), 86 deletions(-) diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java index 5092b9bb0b..48313c1742 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java @@ -42,19 +42,19 @@ public final class HttpEntities { return HttpEntity$.MODULE$.apply((akka.http.model.ContentType) contentType, file); } - public static HttpEntityDefault create(ContentType contentType, long contentLength, Source data) { + public static HttpEntityDefault create(ContentType contentType, long contentLength, Source data) { return new akka.http.model.HttpEntity.Default((akka.http.model.ContentType) contentType, contentLength, data); } - public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Source data) { + public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Source data) { return new akka.http.model.HttpEntity.CloseDelimited((akka.http.model.ContentType) contentType, data); } - public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Source data) { + public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Source data) { return new akka.http.model.HttpEntity.IndefiniteLength((akka.http.model.ContentType) contentType, data); } - public static HttpEntityChunked createChunked(ContentType contentType, Source data) { + public static HttpEntityChunked createChunked(ContentType contentType, Source data) { return akka.http.model.HttpEntity.Chunked$.MODULE$.fromData( (akka.http.model.ContentType) contentType, data); diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java index 9e5149f92c..47cbed0d4c 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java @@ -73,5 +73,5 @@ public interface HttpEntity { /** * Returns a stream of data bytes this entity consists of. */ - public abstract Source getDataBytes(); + public abstract Source getDataBytes(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java index fac52e1b84..76bc026b95 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java @@ -11,5 +11,5 @@ import akka.stream.scaladsl.Source; * stream of {@link ChunkStreamPart}. */ public abstract class HttpEntityChunked implements RequestEntity, ResponseEntity { - public abstract Source getChunks(); + public abstract Source getChunks(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java index 0762bc5799..f163347bd2 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java @@ -13,5 +13,5 @@ import akka.stream.scaladsl.Source; * available for Http responses. */ public abstract class HttpEntityCloseDelimited implements ResponseEntity { - public abstract Source data(); + public abstract Source data(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java index 9839a9a0a9..006db272f6 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java @@ -12,5 +12,5 @@ import akka.stream.scaladsl.Source; */ public abstract class HttpEntityDefault implements BodyPartEntity, RequestEntity, ResponseEntity { public abstract long contentLength(); - public abstract Source data(); + public abstract Source data(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java index 2b287ffabe..c889de5c14 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java @@ -11,5 +11,5 @@ import akka.stream.scaladsl.Source; * Represents an entity without a predetermined content-length to use in a BodyParts. */ public abstract class HttpEntityIndefiniteLength implements BodyPartEntity { - public abstract Source data(); + public abstract Source data(); } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index abe7c34059..2ff409ce32 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -110,7 +110,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * +------+ * }}} */ - type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Any] + type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Unit] /** * Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. @@ -158,7 +158,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * +------+ * }}} */ - type ClientLayer = BidiFlow[HttpRequest, ByteString, ByteString, HttpResponse, Any] + type ClientLayer = BidiFlow[HttpRequest, ByteString, ByteString, HttpResponse, Unit] /** * Constructs a [[ClientLayer]] stage using the configured default [[ClientConnectionSettings]]. @@ -200,7 +200,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { case class IncomingConnection( localAddress: InetSocketAddress, remoteAddress: InetSocketAddress, - flow: Flow[HttpResponse, HttpRequest, Any]) { + flow: Flow[HttpResponse, HttpRequest, Unit]) { /** * Handles the connection with the given flow, which is materialized exactly once diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index 3ad8397baf..858e65e2cc 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -9,7 +9,7 @@ import scala.annotation.tailrec import akka.actor.ActorRef import akka.stream.scaladsl.OperationAttributes._ import akka.stream.stage.{ Context, PushPullStage } -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Keep, Source } import akka.util.ByteString import akka.http.model.parser.CharacterClasses import akka.http.util.identityFunc @@ -126,7 +126,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, emit(RequestStart(method, uri, protocol, allHeaders, createEntity, expect100continue, closeAfterResponseCompletion)) } - def expect100continueHandling[T]: Source[T, Unit] ⇒ Source[T, Unit] = + def expect100continueHandling[T, Mat]: Source[T, Mat] ⇒ Source[T, Mat] = if (expect100continue) { _.section(name("expect100continueTrigger"))(_.transform(() ⇒ new PushPullStage[T, T] { private var oneHundredContinueSent = false diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala index 59376d0ad1..8994f4ddd5 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/BodyPartRenderer.scala @@ -24,19 +24,19 @@ private[http] object BodyPartRenderer { def streamed(boundary: String, nioCharset: Charset, partHeadersSizeHint: Int, - log: LoggingAdapter): PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Unit]] = - new PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Unit]] { + log: LoggingAdapter): PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Any]] = + new PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Any]] { var firstBoundaryRendered = false - override def onPush(bodyPart: Multipart.BodyPart, ctx: Context[Source[ChunkStreamPart, Unit]]): SyncDirective = { + override def onPush(bodyPart: Multipart.BodyPart, ctx: Context[Source[ChunkStreamPart, Any]]): SyncDirective = { val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint) - def bodyPartChunks(data: Source[ByteString, Unit]): Source[ChunkStreamPart, Unit] = { + def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = { val entityChunks = data.map[ChunkStreamPart](Chunk(_)) (chunkStream(r.get) ++ entityChunks).mapMaterialized((_) ⇒ ()) } - def completePartRendering(): Source[ChunkStreamPart, Unit] = + def completePartRendering(): Source[ChunkStreamPart, Any] = bodyPart.entity match { case x if x.isKnownEmpty ⇒ chunkStream(r.get) case Strict(_, data) ⇒ chunkStream((r ~~ data).get) @@ -51,7 +51,7 @@ private[http] object BodyPartRenderer { ctx.push(completePartRendering()) } - override def onPull(ctx: Context[Source[ChunkStreamPart, Unit]]): SyncDirective = { + override def onPull(ctx: Context[Source[ChunkStreamPart, Any]]): SyncDirective = { val finishing = ctx.isFinishing if (finishing && firstBoundaryRendered) { val r = new ByteStringRendering(boundary.length + 4) @@ -63,9 +63,9 @@ private[http] object BodyPartRenderer { ctx.pull() } - override def onUpstreamFinish(ctx: Context[Source[ChunkStreamPart, Unit]]): TerminationDirective = ctx.absorbTermination() + override def onUpstreamFinish(ctx: Context[Source[ChunkStreamPart, Any]]): TerminationDirective = ctx.absorbTermination() - private def chunkStream(byteString: ByteString): Source[ChunkStreamPart, Unit] = + private def chunkStream(byteString: ByteString): Source[ChunkStreamPart, Any] = Source.single(Chunk(byteString)) } diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala index f281d3f0c8..b3ef27450d 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala @@ -25,9 +25,9 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.` def newRenderer: HttpRequestRenderer = new HttpRequestRenderer - final class HttpRequestRenderer extends PushStage[RequestRenderingContext, Source[ByteString, Unit]] { + final class HttpRequestRenderer extends PushStage[RequestRenderingContext, Source[ByteString, Any]] { - override def onPush(ctx: RequestRenderingContext, opCtx: Context[Source[ByteString, Unit]]): SyncDirective = { + override def onPush(ctx: RequestRenderingContext, opCtx: Context[Source[ByteString, Any]]): SyncDirective = { val r = new ByteStringRendering(requestHeaderSizeHint) import ctx.request._ @@ -102,7 +102,7 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.` def renderContentLength(contentLength: Long) = if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r - def completeRequestRendering(): Source[ByteString, Unit] = + def completeRequestRendering(): Source[ByteString, Any] = entity match { case x if x.isKnownEmpty ⇒ renderContentLength(0) ~~ CrLf diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala index 41e1593bc4..fd1edc4738 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala @@ -51,14 +51,14 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser def newRenderer: HttpResponseRenderer = new HttpResponseRenderer - final class HttpResponseRenderer extends PushStage[ResponseRenderingContext, Source[ByteString, Unit]] { + final class HttpResponseRenderer extends PushStage[ResponseRenderingContext, Source[ByteString, Any]] { private[this] var close = false // signals whether the connection is to be closed after the current response // need this for testing private[http] def isComplete = close - override def onPush(ctx: ResponseRenderingContext, opCtx: Context[Source[ByteString, Unit]]): SyncDirective = { + override def onPush(ctx: ResponseRenderingContext, opCtx: Context[Source[ByteString, Any]]): SyncDirective = { val r = new ByteStringRendering(responseHeaderSizeHint) import ctx.response._ @@ -156,10 +156,10 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser def renderContentLengthHeader(contentLength: Long) = if (status.allowsEntity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r - def byteStrings(entityBytes: ⇒ Source[ByteString, Unit]): Source[ByteString, Unit] = + def byteStrings(entityBytes: ⇒ Source[ByteString, Any]): Source[ByteString, Any] = renderByteStrings(r, entityBytes, skipEntity = noEntity) - def completeResponseRendering(entity: ResponseEntity): Source[ByteString, Unit] = + def completeResponseRendering(entity: ResponseEntity): Source[ByteString, Any] = entity match { case HttpEntity.Strict(_, data) ⇒ renderHeaders(headers.toList) diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala index ae6d2709f2..0a878b64fb 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala @@ -29,24 +29,24 @@ private object RenderSupport { val defaultLastChunkBytes: ByteString = renderChunk(HttpEntity.LastChunk) - def CancelSecond[T](first: Source[T, _], second: Source[T, _]): Source[T, Unit] = { + def CancelSecond[T, Mat](first: Source[T, Mat], second: Source[T, Any]): Source[T, Mat] = { Source(first) { implicit b ⇒ frst ⇒ import FlowGraph.Implicits._ second ~> Sink.cancelled frst.outlet - }.mapMaterialized((_) ⇒ ()) + } } def renderEntityContentType(r: Rendering, entity: HttpEntity) = if (entity.contentType != ContentTypes.NoContentType) r ~~ headers.`Content-Type` ~~ entity.contentType ~~ CrLf else r - def renderByteStrings(r: ByteStringRendering, entityBytes: ⇒ Source[ByteString, Unit], - skipEntity: Boolean = false): Source[ByteString, Unit] = { + def renderByteStrings(r: ByteStringRendering, entityBytes: ⇒ Source[ByteString, Any], + skipEntity: Boolean = false): Source[ByteString, Any] = { val messageStart = Source.single(r.get) val messageBytes = - if (!skipEntity) (messageStart ++ entityBytes).mapMaterialized((_) ⇒ ()) + if (!skipEntity) (messageStart ++ entityBytes).mapMaterialized(_ ⇒ ()) else CancelSecond(messageStart, entityBytes) messageBytes } diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala index 89298df851..93a777a105 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala @@ -26,7 +26,7 @@ private[http] object HttpServerBluePrint { type ServerShape = BidiShape[HttpResponse, ByteString, ByteString, HttpRequest] - def apply(settings: ServerSettings, log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Any] = { + def apply(settings: ServerSettings, log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Unit] = { import settings._ // the initial header parser we initially use for every connection, diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 5beb23ea79..fb0a45dd9c 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -35,7 +35,7 @@ sealed trait HttpEntity extends japi.HttpEntity { /** * A stream of the data of this entity. */ - def dataBytes: Source[ByteString, Unit] + def dataBytes: Source[ByteString, Any] /** * Collects all possible parts and returns a potentially future Strict entity for easier processing. @@ -72,7 +72,7 @@ sealed trait HttpEntity extends japi.HttpEntity { * This method may only throw an exception if the `transformer` function throws an exception while creating the transformer. * Any other errors are reported through the new entity data stream. */ - def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): HttpEntity + def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): HttpEntity /** * Creates a copy of this HttpEntity with the `contentType` overridden with the given one. @@ -80,7 +80,7 @@ sealed trait HttpEntity extends japi.HttpEntity { def withContentType(contentType: ContentType): HttpEntity /** Java API */ - def getDataBytes: Source[ByteString, Unit] = dataBytes + def getDataBytes: Source[ByteString, _] = dataBytes // default implementations, should be overridden def isCloseDelimited: Boolean = false @@ -97,13 +97,13 @@ sealed trait BodyPartEntity extends HttpEntity with japi.BodyPartEntity { sealed trait RequestEntity extends HttpEntity with japi.RequestEntity with ResponseEntity { def withContentType(contentType: ContentType): RequestEntity - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): RequestEntity + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): RequestEntity } /* An entity that can be used for responses */ sealed trait ResponseEntity extends HttpEntity with japi.ResponseEntity { def withContentType(contentType: ContentType): ResponseEntity - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): ResponseEntity + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): ResponseEntity } /* An entity that can be used for requests, responses, and body parts */ sealed trait UniversalEntity extends japi.UniversalEntity with MessageEntity with BodyPartEntity { @@ -114,7 +114,7 @@ sealed trait UniversalEntity extends japi.UniversalEntity with MessageEntity wit * Transforms this' entities data bytes with a transformer that will produce exactly the number of bytes given as * ``newContentLength``. */ - def transformDataBytes(newContentLength: Long, transformer: Flow[ByteString, ByteString, _]): UniversalEntity + def transformDataBytes(newContentLength: Long, transformer: Flow[ByteString, ByteString, Any]): UniversalEntity } object HttpEntity { @@ -127,7 +127,7 @@ object HttpEntity { if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes)) def apply(contentType: ContentType, data: ByteString): Strict = if (data.isEmpty) empty(contentType) else Strict(contentType, data) - def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Unit]): UniversalEntity = + def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity = if (contentLength == 0) empty(contentType) else Default(contentType, contentLength, data) def apply(contentType: ContentType, file: File): UniversalEntity = { @@ -160,11 +160,11 @@ object HttpEntity { override def toStrict(timeout: FiniteDuration)(implicit fm: FlowMaterializer) = FastFuture.successful(this) - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): MessageEntity = + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): MessageEntity = Chunked.fromData(contentType, Source.single(data).via(transformer)) - override def transformDataBytes(newContentLength: Long, transformer: Flow[ByteString, ByteString, _]): UniversalEntity = - Default(contentType, newContentLength, Source.single(data).via(transformer)) + override def transformDataBytes(newContentLength: Long, transformer: Flow[ByteString, ByteString, Any]): UniversalEntity = + Default(contentType, newContentLength, Source.single(data) via transformer) def withContentType(contentType: ContentType): Strict = if (contentType == this.contentType) this else copy(contentType = contentType) @@ -177,20 +177,20 @@ object HttpEntity { */ final case class Default(contentType: ContentType, contentLength: Long, - data: Source[ByteString, Unit]) + data: Source[ByteString, Any]) extends japi.HttpEntityDefault with UniversalEntity { require(contentLength > 0, "contentLength must be positive (use `HttpEntity.empty(contentType)` for empty entities)") def isKnownEmpty = false override def isDefault: Boolean = true - def dataBytes: Source[ByteString, Unit] = data + def dataBytes: Source[ByteString, Any] = data - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): Chunked = - Chunked.fromData(contentType, data.viaMat(transformer)(Keep.left)) + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): Chunked = + Chunked.fromData(contentType, data via transformer) - override def transformDataBytes(newContentLength: Long, transformer: Flow[ByteString, ByteString, _]): UniversalEntity = - Default(contentType, newContentLength, data.viaMat(transformer)(Keep.left)) + override def transformDataBytes(newContentLength: Long, transformer: Flow[ByteString, ByteString, Any]): UniversalEntity = + Default(contentType, newContentLength, data via transformer) def withContentType(contentType: ContentType): Default = if (contentType == this.contentType) this else copy(contentType = contentType) @@ -205,11 +205,11 @@ object HttpEntity { */ private[http] sealed trait WithoutKnownLength extends HttpEntity { def contentType: ContentType - def data: Source[ByteString, Unit] + def data: Source[ByteString, Any] def isKnownEmpty = data eq Source.empty - def dataBytes: Source[ByteString, Unit] = data + def dataBytes: Source[ByteString, Any] = data } /** @@ -217,7 +217,7 @@ object HttpEntity { * The content-length of such responses is unknown at the time the response headers have been received. * Note that this type of HttpEntity can only be used for HttpResponses. */ - final case class CloseDelimited(contentType: ContentType, data: Source[ByteString, Unit]) + final case class CloseDelimited(contentType: ContentType, data: Source[ByteString, Any]) extends japi.HttpEntityCloseDelimited with ResponseEntity with WithoutKnownLength { type Self = CloseDelimited @@ -225,8 +225,8 @@ object HttpEntity { def withContentType(contentType: ContentType): CloseDelimited = if (contentType == this.contentType) this else copy(contentType = contentType) - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): CloseDelimited = - HttpEntity.CloseDelimited(contentType, data.viaMat(transformer)(Keep.left)) + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): CloseDelimited = + HttpEntity.CloseDelimited(contentType, data via transformer) override def productPrefix = "HttpEntity.CloseDelimited" } @@ -235,15 +235,15 @@ object HttpEntity { * The model for the entity of a BodyPart with an indefinite length. * Note that this type of HttpEntity can only be used for BodyParts. */ - final case class IndefiniteLength(contentType: ContentType, data: Source[ByteString, Unit]) + final case class IndefiniteLength(contentType: ContentType, data: Source[ByteString, Any]) extends japi.HttpEntityIndefiniteLength with BodyPartEntity with WithoutKnownLength { override def isIndefiniteLength: Boolean = true def withContentType(contentType: ContentType): IndefiniteLength = if (contentType == this.contentType) this else copy(contentType = contentType) - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): IndefiniteLength = - HttpEntity.IndefiniteLength(contentType, data.viaMat(transformer)(Keep.left)) + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): IndefiniteLength = + HttpEntity.IndefiniteLength(contentType, data via transformer) override def productPrefix = "HttpEntity.IndefiniteLength" } @@ -251,23 +251,22 @@ object HttpEntity { /** * The model for the entity of a chunked HTTP message (with `Transfer-Encoding: chunked`). */ - final case class Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart, Unit]) + final case class Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart, Any]) extends japi.HttpEntityChunked with MessageEntity { def isKnownEmpty = chunks eq Source.empty override def isChunked: Boolean = true - def dataBytes: Source[ByteString, Unit] = - chunks.map(_.data).filter(_.nonEmpty) + def dataBytes: Source[ByteString, Any] = chunks.map(_.data).filter(_.nonEmpty) - override def transformDataBytes(transformer: Flow[ByteString, ByteString, _]): Chunked = { + override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): Chunked = { val newData = chunks.map { case Chunk(data, "") ⇒ data case LastChunk("", Nil) ⇒ ByteString.empty case _ ⇒ throw new IllegalArgumentException("Chunked.transformDataBytes not allowed for chunks with metadata") - }.viaMat(transformer)(Keep.left) + } via transformer Chunked.fromData(contentType, newData) } @@ -278,14 +277,14 @@ object HttpEntity { override def productPrefix = "HttpEntity.Chunked" /** Java API */ - def getChunks: Source[japi.ChunkStreamPart, Unit] = chunks.asInstanceOf[Source[japi.ChunkStreamPart, Unit]] + def getChunks: Source[japi.ChunkStreamPart, Any] = chunks.asInstanceOf[Source[japi.ChunkStreamPart, Any]] } object Chunked { /** * Returns a ``Chunked`` entity where one Chunk is produced for every non-empty ByteString of the given * ``Publisher[ByteString]``. */ - def fromData(contentType: ContentType, chunks: Source[ByteString, Unit]): Chunked = + def fromData(contentType: ContentType, chunks: Source[ByteString, Any]): Chunked = Chunked(contentType, chunks.collect[ChunkStreamPart] { case b: ByteString if b.nonEmpty ⇒ Chunk(b) }) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index 7f9ba4624b..8988c401ee 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -143,6 +143,9 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET, /** * Resolve this request's URI according to the logic defined at * http://tools.ietf.org/html/rfc7230#section-5.5 + * + * Throws an [[IllegalUriException]] if the URI is relative and the `headers` don't + * include a valid [[Host]] header or if URI authority and [[Host]] header don't match. */ def effectiveUri(securedConnection: Boolean, defaultHostHeader: Host = Host.empty): Uri = HttpRequest.effectiveUri(uri, headers, securedConnection, defaultHostHeader) @@ -263,6 +266,9 @@ object HttpRequest { /** * Determines the effective request URI according to the logic defined at * http://tools.ietf.org/html/rfc7230#section-5.5 + * + * Throws an [[IllegalUriException]] if the URI is relative and the `headers` don't + * include a valid [[Host]] header or if URI authority and [[Host]] header don't match. */ def effectiveUri(uri: Uri, headers: immutable.Seq[HttpHeader], securedConnection: Boolean, defaultHostHeader: Host): Uri = { val hostHeader = headers.collectFirst { case x: Host ⇒ x } diff --git a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala index f20cf8e0e5..bd5455dc49 100644 --- a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala @@ -17,7 +17,7 @@ import FastFuture._ trait Multipart { def mediaType: MultipartMediaType - def parts: Source[Multipart.BodyPart, Unit] + def parts: Source[Multipart.BodyPart, Any] /** * Converts this content into its strict counterpart. @@ -56,7 +56,7 @@ object Multipart { } } - private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP, Unit])(f: BP ⇒ Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] = + private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP, Any])(f: BP ⇒ Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] = // TODO: move to Vector `:+` when https://issues.scala-lang.org/browse/SI-8930 is fixed parts.runFold(new VectorBuilder[Future[BPS]]) { case (builder, part) ⇒ builder += f(part) @@ -69,27 +69,27 @@ object Multipart { */ sealed abstract class General extends Multipart { def mediaType: MultipartMediaType - def parts: Source[General.BodyPart, Unit] + def parts: Source[General.BodyPart, Any] def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[General.Strict] = strictify(parts)(_.toStrict(timeout)).fast.map(General.Strict(mediaType, _)) } object General { def apply(mediaType: MultipartMediaType, parts: BodyPart.Strict*): Strict = Strict(mediaType, parts.toVector) - def apply(_mediaType: MultipartMediaType, _parts: Source[BodyPart, Unit]): General = + def apply(_mediaType: MultipartMediaType, _parts: Source[BodyPart, Any]): General = new General { def mediaType = _mediaType def parts = _parts override def toString = s"General($mediaType, $parts)" } - def unapply(value: General): Option[(MultipartMediaType, Source[BodyPart, Unit])] = Some(value.mediaType -> value.parts) + def unapply(value: General): Option[(MultipartMediaType, Source[BodyPart, Any])] = Some(value.mediaType -> value.parts) /** * Strict [[General]]. */ case class Strict(mediaType: MultipartMediaType, strictParts: immutable.Seq[BodyPart.Strict]) extends General with Multipart.Strict { - def parts: Source[BodyPart.Strict, Unit] = Source(strictParts) + def parts: Source[BodyPart.Strict, Any] = Source(strictParts) override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) override def productPrefix = "General.Strict" @@ -148,7 +148,7 @@ object Multipart { */ sealed abstract class FormData extends Multipart { def mediaType = MediaTypes.`multipart/form-data` - def parts: Source[FormData.BodyPart, Unit] + def parts: Source[FormData.BodyPart, Any] def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[FormData.Strict] = strictify(parts)(_.toStrict(timeout)).fast.map(FormData.Strict(_)) } @@ -159,7 +159,7 @@ object Multipart { fields.map { case (name, entity) ⇒ BodyPart.Strict(name, entity) }(collection.breakOut) } - def apply(_parts: Source[BodyPart, Unit]): FormData = new FormData { + def apply(_parts: Source[BodyPart, Any]): FormData = new FormData { def parts = _parts override def toString = s"FormData($parts)" } @@ -168,7 +168,7 @@ object Multipart { * Strict [[FormData]]. */ case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends FormData with Multipart.Strict { - def parts: Source[BodyPart.Strict, Unit] = Source(strictParts) + def parts: Source[BodyPart.Strict, Any] = Source(strictParts) override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) override def productPrefix = "FormData.Strict" @@ -223,14 +223,14 @@ object Multipart { */ sealed abstract class ByteRanges extends Multipart { def mediaType = MediaTypes.`multipart/byteranges` - def parts: Source[ByteRanges.BodyPart, Unit] + def parts: Source[ByteRanges.BodyPart, Any] def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[ByteRanges.Strict] = strictify(parts)(_.toStrict(timeout)).fast.map(ByteRanges.Strict(_)) } object ByteRanges { def apply(parts: BodyPart.Strict*): Strict = Strict(parts.toVector) - def apply(_parts: Source[BodyPart, Unit]): ByteRanges = + def apply(_parts: Source[BodyPart, Any]): ByteRanges = new ByteRanges { def parts = _parts override def toString = s"ByteRanges($parts)" @@ -240,7 +240,7 @@ object Multipart { * Strict [[ByteRanges]]. */ case class Strict(strictParts: immutable.Seq[BodyPart.Strict]) extends ByteRanges with Multipart.Strict { - def parts: Source[BodyPart.Strict, Unit] = Source(strictParts) + def parts: Source[BodyPart.Strict, Any] = Source(strictParts) override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) override def productPrefix = "ByteRanges.Strict" diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 30af079575..9c37338445 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -128,7 +128,7 @@ private[http] object StreamUtils { * Applies a sequence of transformers on one source and returns a sequence of sources with the result. The input source * will only be traversed once. */ - def transformMultiple(input: Source[ByteString, Unit], transformers: immutable.Seq[Flow[ByteString, ByteString, _]])(implicit materializer: FlowMaterializer): immutable.Seq[Source[ByteString, Unit]] = + def transformMultiple(input: Source[ByteString, Any], transformers: immutable.Seq[Flow[ByteString, ByteString, Any]])(implicit materializer: FlowMaterializer): immutable.Seq[Source[ByteString, Any]] = transformers match { case Nil ⇒ Nil case Seq(one) ⇒ Vector(input.via(one)) @@ -137,12 +137,11 @@ private[http] object StreamUtils { val sources = transformers.map { flow ⇒ // Doubly wrap to ensure that subscription to the running publisher happens before the final sources // are exposed, so there is no race - Source(Source(fanoutPub).via(flow).runWith(Sink.publisher)) + Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher)) } // The fanout publisher must be wired to the original source after all fanout subscribers have been subscribed input.runWith(Sink(fanoutSub)) sources - } def mapEntityError(f: Throwable ⇒ Throwable): RequestEntity ⇒ RequestEntity = diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index 06431d3a5b..8cc9d5ecbe 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -41,13 +41,13 @@ package object util { private[http] implicit def enhanceByteStrings[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] = new EnhancedByteStringSource(byteStrings) - private[http] implicit class SourceWithHeadAndTail[T, Mat](val underlying: Source[Source[T, Unit], Mat]) extends AnyVal { + private[http] implicit class SourceWithHeadAndTail[T, Mat](val underlying: Source[Source[T, Any], Mat]) extends AnyVal { def headAndTail: Source[(T, Source[T, Unit]), Mat] = underlying.map { _.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } } .flatten(FlattenStrategy.concat) } - private[http] implicit class FlowWithHeadAndTail[In, Out, Mat](val underlying: Flow[In, Source[Out, Unit], Mat]) extends AnyVal { + private[http] implicit class FlowWithHeadAndTail[In, Out, Mat](val underlying: Flow[In, Source[Out, Any], Mat]) extends AnyVal { def headAndTail: Flow[In, (Out, Source[Out, Unit]), Mat] = underlying.map { _.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } } .flatten(FlattenStrategy.concat) diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 006eda03e7..00638f5ce7 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -474,7 +474,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { case _ ⇒ entity.toStrict(awaitAtMost) } - private def compactEntityChunks(data: Source[ChunkStreamPart, Unit]): Future[Seq[ChunkStreamPart]] = + private def compactEntityChunks(data: Source[ChunkStreamPart, Any]): Future[Seq[ChunkStreamPart]] = data.grouped(100000).runWith(Sink.head) .fast.recover { case _: NoSuchElementException ⇒ Nil } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index ddd8adaeb6..40f96f7948 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -297,7 +297,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { case _ ⇒ entity.toStrict(250.millis) } - private def compactEntityChunks(data: Source[ChunkStreamPart, Unit]): Future[Source[ChunkStreamPart, Unit]] = + private def compactEntityChunks(data: Source[ChunkStreamPart, Any]): Future[Source[ChunkStreamPart, Any]] = data.grouped(100000).runWith(Sink.head) .fast.map(source(_: _*)) .fast.recover { case _: NoSuchElementException ⇒ source() } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 225936b961..c46895c3e0 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -54,7 +54,7 @@ trait MultipartUnmarshallers { def multipartUnmarshaller[T <: Multipart, BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](mediaRange: MediaRange, defaultContentType: ContentType, createBodyPart: (BodyPartEntity, List[HttpHeader]) ⇒ BP, - createStreamed: (MultipartMediaType, Source[BP, Unit]) ⇒ T, + createStreamed: (MultipartMediaType, Source[BP, Any]) ⇒ T, createStrictBodyPart: (HttpEntity.Strict, List[HttpHeader]) ⇒ BPS, createStrict: (MultipartMediaType, immutable.Seq[BPS]) ⇒ T)(implicit ec: ExecutionContext, log: LoggingAdapter = NoLogging): FromEntityUnmarshaller[T] = Unmarshaller { entity ⇒