From 94cb9ef808cf3bff3fa269021bbf655cbf91abd8 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 28 Jul 2015 16:21:53 +0200 Subject: [PATCH 1/6] =htc remove a layer of redundant wrapping --- .../http/impl/engine/server/HttpServerBluePrint.scala | 8 +++----- .../src/main/scala/akka/http/scaladsl/Http.scala | 5 ++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 0becef109d..2cd754eae6 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -21,6 +21,7 @@ import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic } import akka.http.impl.engine.parsing._ import akka.http.impl.engine.rendering.{ ResponseRenderingOutput, ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.impl.engine.TokenSourceActor +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.impl.util._ import akka.http.impl.engine.ws._ @@ -31,10 +32,7 @@ import ParserOutput._ * INTERNAL API */ private[http] object HttpServerBluePrint { - - type ServerShape = BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest] - - def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Graph[ServerShape, Unit] = { + def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Http.ServerLayer = { import settings._ // the initial header parser we initially use for every connection, @@ -98,7 +96,7 @@ private[http] object HttpServerBluePrint { .flatten(FlattenStrategy.concat) .via(Flow[ResponseRenderingOutput].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger")) - FlowGraph.partial(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒ + BidiFlow(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒ (requestParsing, renderer, oneHundreds) ⇒ import FlowGraph.Implicits._ diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index ef6b246beb..b66fa9ff8f 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -156,7 +156,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def serverLayer(settings: ServerSettings, remoteAddress: Option[InetSocketAddress] = None, log: LoggingAdapter = system.log)(implicit mat: Materializer): ServerLayer = - BidiFlow.wrap(HttpServerBluePrint(settings, remoteAddress, log)) + HttpServerBluePrint(settings, remoteAddress, log) /** * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. @@ -481,7 +481,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { //#server-layer /** - * The type of the server-side HTTP layer as a stand-alone BidiStage + * The type of the server-side HTTP layer as a stand-alone BidiFlow * that can be put atop the TCP layer to form an HTTP server. * * {{{ @@ -497,7 +497,6 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { //#client-layer /** - * The type of the client-side HTTP layer as a stand-alone BidiStage * that can be put atop the TCP layer to form an HTTP client. * * {{{ From 5c653f0641010101a4e4b1b3af7292816a6ce3f5 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 28 Jul 2015 16:22:35 +0200 Subject: [PATCH 2/6] =htc cosmetic fixes --- .../engine/client/HighLevelOutgoingConnectionSpec.scala | 7 +++++-- .../engine/client/LowLevelOutgoingConnectionSpec.scala | 3 +-- .../test/scala/akka/http/impl/engine/ws/MessageSpec.scala | 2 -- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index 981880eef4..7739f420da 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -6,12 +6,15 @@ package akka.http.impl.engine.client import scala.concurrent.Await import scala.concurrent.duration._ -import akka.http.scaladsl.{ Http, TestUtils } -import akka.http.scaladsl.model._ + import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec +import akka.http.scaladsl.{ Http, TestUtils } +import akka.http.scaladsl.model._ +import akka.http.impl.util._ + class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") { implicit val materializer = ActorMaterializer() diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 7fe46605da..fdcdc30e38 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -4,7 +4,6 @@ package akka.http.impl.engine.client -import java.net.InetSocketAddress import akka.http.ClientConnectionSettings import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes } import org.scalatest.Inside @@ -349,7 +348,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. class TestSetup { val requests = TestPublisher.manualProbe[HttpRequest]() - val responses = TestSubscriber.manualProbe[HttpResponse] + val responses = TestSubscriber.manualProbe[HttpResponse]() def settings = ClientConnectionSettings(system) .copy(userAgentHeader = Some(`User-Agent`(List(ProductVersion("akka-http", "test"))))) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 2154cb62e9..f50bc3b4d2 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -307,8 +307,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { // split up the code point val half1 = gclef.take(1) val half2 = gclef.drop(1) - println(half1(0).toInt.toHexString) - println(half2(0).toInt.toHexString) val pub = TestPublisher.manualProbe[String]() val msg = TextMessage(Source(pub)) From e34b86ba30ccc7b0aca76961228da3973e79c3d0 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 7 Aug 2015 12:48:52 +0200 Subject: [PATCH 3/6] =htc convert request rendering from a stage to a plain function --- .../client/OutgoingConnectionBlueprint.scala | 10 +- .../HttpRequestRendererFactory.scala | 203 ++++++++++-------- .../main/scala/akka/http/scaladsl/Http.scala | 3 +- .../rendering/RequestRendererSpec.scala | 5 +- 4 files changed, 121 insertions(+), 100 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index ad924c435f..07c1c7f9ef 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -13,6 +13,7 @@ import akka.event.LoggingAdapter import akka.stream._ import akka.stream.scaladsl._ import akka.http.ClientConnectionSettings +import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.Host import akka.http.scaladsl.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse } import akka.http.impl.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } @@ -23,9 +24,6 @@ import akka.http.impl.util._ * INTERNAL API */ private[http] object OutgoingConnectionBlueprint { - - type ClientShape = BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse] - /* Stream Setup ============ @@ -45,7 +43,7 @@ private[http] object OutgoingConnectionBlueprint { */ def apply(hostHeader: Host, settings: ClientConnectionSettings, - log: LoggingAdapter): Graph[ClientShape, Unit] = { + log: LoggingAdapter): Http.ClientLayer = { import settings._ // the initial header parser we initially use for every connection, @@ -59,7 +57,7 @@ private[http] object OutgoingConnectionBlueprint { val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] .map(RequestRenderingContext(_, hostHeader)) - .via(Flow[RequestRenderingContext].transform(() ⇒ requestRendererFactory.newRenderer).named("renderer")) + .via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer")) .flatten(FlattenStrategy.concat) val methodBypass = Flow[HttpRequest].map(_.method) @@ -76,7 +74,7 @@ private[http] object OutgoingConnectionBlueprint { case (MessageStartError(_, info), _) ⇒ throw IllegalResponseException(info) } - FlowGraph.partial() { implicit b ⇒ + BidiFlow() { implicit b ⇒ import FlowGraph.Implicits._ val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true)) val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser)) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpRequestRendererFactory.scala index 7458cfde6e..812a5e6383 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpRequestRendererFactory.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.rendering +import akka.http.ClientConnectionSettings + import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString @@ -20,110 +22,133 @@ import headers._ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`User-Agent`], requestHeaderSizeHint: Int, log: LoggingAdapter) { + import HttpRequestRendererFactory.RequestRenderingOutput - def newRenderer: HttpRequestRenderer = new HttpRequestRenderer + def renderToSource(ctx: RequestRenderingContext): Source[ByteString, Any] = render(ctx).byteStream - final class HttpRequestRenderer extends PushStage[RequestRenderingContext, Source[ByteString, Any]] { + def render(ctx: RequestRenderingContext): RequestRenderingOutput = { + val r = new ByteStringRendering(requestHeaderSizeHint) + import ctx.request._ - override def onPush(ctx: RequestRenderingContext, opCtx: Context[Source[ByteString, Any]]): SyncDirective = { - val r = new ByteStringRendering(requestHeaderSizeHint) - import ctx.request._ + def renderRequestLine(): Unit = { + r ~~ method ~~ ' ' + val rawRequestUriRendered = headers.exists { + case `Raw-Request-URI`(rawUri) ⇒ + r ~~ rawUri; true + case _ ⇒ false + } + if (!rawRequestUriRendered) UriRendering.renderUriWithoutFragment(r, uri, UTF8) + r ~~ ' ' ~~ protocol ~~ CrLf + } - def renderRequestLine(): Unit = { - r ~~ method ~~ ' ' - val rawRequestUriRendered = headers.exists { - case `Raw-Request-URI`(rawUri) ⇒ - r ~~ rawUri; true - case _ ⇒ false + def render(h: HttpHeader) = r ~~ h ~~ CrLf + + @tailrec def renderHeaders(remaining: List[HttpHeader], hostHeaderSeen: Boolean = false, + userAgentSeen: Boolean = false, transferEncodingSeen: Boolean = false): Unit = + remaining match { + case head :: tail ⇒ head match { + case x: `Content-Length` ⇒ + suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + + case x: `Content-Type` ⇒ + suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.") + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + + case x: `Transfer-Encoding` ⇒ + x.withChunkedPeeled match { + case None ⇒ + suppressionWarning(log, head) + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + case Some(te) ⇒ + // if the user applied some custom transfer-encoding we need to keep the header + render(if (entity.isChunked && !entity.isKnownEmpty) te.withChunked else te) + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen = true) + } + + case x: `Host` ⇒ + render(x) + renderHeaders(tail, hostHeaderSeen = true, userAgentSeen, transferEncodingSeen) + + case x: `User-Agent` ⇒ + render(x) + renderHeaders(tail, hostHeaderSeen, userAgentSeen = true, transferEncodingSeen) + + case x: `Raw-Request-URI` ⇒ // we never render this header + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + + case x: CustomHeader ⇒ + if (!x.suppressRendering) render(x) + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + + case x: RawHeader if (x is "content-type") || (x is "content-length") || (x is "transfer-encoding") || + (x is "host") || (x is "user-agent") ⇒ + suppressionWarning(log, x, "illegal RawHeader") + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + + case x ⇒ + render(x) + renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) } - if (!rawRequestUriRendered) UriRendering.renderUriWithoutFragment(r, uri, UTF8) - r ~~ ' ' ~~ protocol ~~ CrLf + + case Nil ⇒ + if (!hostHeaderSeen) r ~~ ctx.hostHeader ~~ CrLf + if (!userAgentSeen && userAgentHeader.isDefined) r ~~ userAgentHeader.get ~~ CrLf + if (entity.isChunked && !entity.isKnownEmpty && !transferEncodingSeen) + r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf } - def render(h: HttpHeader) = r ~~ h ~~ CrLf + def renderContentLength(contentLength: Long) = + if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r - @tailrec def renderHeaders(remaining: List[HttpHeader], hostHeaderSeen: Boolean = false, - userAgentSeen: Boolean = false, transferEncodingSeen: Boolean = false): Unit = - remaining match { - case head :: tail ⇒ head match { - case x: `Content-Length` ⇒ - suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + def renderStreamed(body: Source[ByteString, Any]): RequestRenderingOutput = + RequestRenderingOutput.Streamed(renderByteStrings(r, body)) - case x: `Content-Type` ⇒ - suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.") - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + def completeRequestRendering(): RequestRenderingOutput = + entity match { + case x if x.isKnownEmpty ⇒ + renderContentLength(0) ~~ CrLf + RequestRenderingOutput.Strict(r.get) - case x: `Transfer-Encoding` ⇒ - x.withChunkedPeeled match { - case None ⇒ - suppressionWarning(log, head) - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) - case Some(te) ⇒ - // if the user applied some custom transfer-encoding we need to keep the header - render(if (entity.isChunked && !entity.isKnownEmpty) te.withChunked else te) - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen = true) - } + case HttpEntity.Strict(_, data) ⇒ + renderContentLength(data.length) ~~ CrLf + RequestRenderingOutput.Strict(r.get ++ data) - case x: `Host` ⇒ - render(x) - renderHeaders(tail, hostHeaderSeen = true, userAgentSeen, transferEncodingSeen) + case HttpEntity.Default(_, contentLength, data) ⇒ + renderContentLength(contentLength) ~~ CrLf + renderStreamed(data.via(CheckContentLengthTransformer.flow(contentLength))) - case x: `User-Agent` ⇒ - render(x) - renderHeaders(tail, hostHeaderSeen, userAgentSeen = true, transferEncodingSeen) + case HttpEntity.Chunked(_, chunks) ⇒ + r ~~ CrLf + renderStreamed(chunks.via(ChunkTransformer.flow)) + } - case x: `Raw-Request-URI` ⇒ // we never render this header - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) + renderRequestLine() + renderHeaders(headers.toList) + renderEntityContentType(r, entity) + completeRequestRendering() + } - case x: CustomHeader ⇒ - if (!x.suppressRendering) render(x) - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) - - case x: RawHeader if (x is "content-type") || (x is "content-length") || (x is "transfer-encoding") || - (x is "host") || (x is "user-agent") ⇒ - suppressionWarning(log, x, "illegal RawHeader") - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) - - case x ⇒ - render(x) - renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen) - } - - case Nil ⇒ - if (!hostHeaderSeen) r ~~ ctx.hostHeader ~~ CrLf - if (!userAgentSeen && userAgentHeader.isDefined) r ~~ userAgentHeader.get ~~ CrLf - if (entity.isChunked && !entity.isKnownEmpty && !transferEncodingSeen) - r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf - } - - def renderContentLength(contentLength: Long) = - if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r - - def completeRequestRendering(): Source[ByteString, Any] = - entity match { - case x if x.isKnownEmpty ⇒ - renderContentLength(0) ~~ CrLf - Source.single(r.get) - - case HttpEntity.Strict(_, data) ⇒ - renderContentLength(data.length) ~~ CrLf - Source.single(r.get ++ data) - - case HttpEntity.Default(_, contentLength, data) ⇒ - renderContentLength(contentLength) ~~ CrLf - renderByteStrings(r, data.via(CheckContentLengthTransformer.flow(contentLength))) - - case HttpEntity.Chunked(_, chunks) ⇒ - r ~~ CrLf - renderByteStrings(r, chunks.via(ChunkTransformer.flow)) - } - - renderRequestLine() - renderHeaders(headers.toList) - renderEntityContentType(r, entity) - opCtx.push(completeRequestRendering()) + def renderStrict(ctx: RequestRenderingContext): ByteString = + render(ctx) match { + case RequestRenderingOutput.Strict(bytes) ⇒ bytes + case _: RequestRenderingOutput.Streamed ⇒ + throw new IllegalArgumentException(s"Request entity was not Strict but ${ctx.request.entity.getClass.getSimpleName}") } +} + +private[http] object HttpRequestRendererFactory { + def renderStrict(ctx: RequestRenderingContext, settings: ClientConnectionSettings, log: LoggingAdapter): ByteString = + new HttpRequestRendererFactory(settings.userAgentHeader, settings.requestHeaderSizeHint, log).renderStrict(ctx) + + sealed trait RequestRenderingOutput { + def byteStream: Source[ByteString, Any] + } + object RequestRenderingOutput { + case class Strict(bytes: ByteString) extends RequestRenderingOutput { + def byteStream: Source[ByteString, Any] = Source.single(bytes) + } + case class Streamed(byteStream: Source[ByteString, Any]) extends RequestRenderingOutput } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index b66fa9ff8f..8479b457bd 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -210,7 +210,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def clientLayer(hostHeader: Host, settings: ClientConnectionSettings, log: LoggingAdapter = system.log): ClientLayer = - BidiFlow.wrap(OutgoingConnectionBlueprint(hostHeader, settings, log)) + OutgoingConnectionBlueprint(hostHeader, settings, log) /** * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches @@ -497,6 +497,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { //#client-layer /** + * The type of the client-side HTTP layer as a stand-alone BidiFlow * that can be put atop the TCP layer to form an HTTP client. * * {{{ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala index f020f78b4d..81c5e4cfc2 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala @@ -311,10 +311,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll def renderTo(expected: String): Matcher[HttpRequest] = equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒ - val renderer = newRenderer - val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, Host(serverAddress))) - .transform(() ⇒ renderer).named("renderer") - .runWith(Sink.head), 1.second) + val byteStringSource = renderToSource(RequestRenderingContext(request, Host(serverAddress))) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) } From 6a2cd34139c4ffe3279073fa4c74f66c1ecaf611 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 7 Aug 2015 12:50:51 +0200 Subject: [PATCH 4/6] =htc restructuring of Websocket stack as stacked BidiFlows --- .../engine/server/HttpServerBluePrint.scala | 6 +- .../http/impl/engine/ws/FrameHandler.scala | 11 +- .../http/impl/engine/ws/FrameOutHandler.scala | 11 +- .../akka/http/impl/engine/ws/Masking.scala | 5 +- .../ws/UpgradeToWebsocketLowLevel.scala | 2 +- .../akka/http/impl/engine/ws/Websocket.scala | 110 +++++++++++++----- .../http/impl/engine/ws/MessageSpec.scala | 2 +- 7 files changed, 101 insertions(+), 46 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 2cd754eae6..0901234df2 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -298,11 +298,7 @@ private[http] object HttpServerBluePrint { val sink = StreamUtils.oneTimePublisherSink[FrameEvent](sinkCell, "frameHandler.in") val source = StreamUtils.oneTimeSubscriberSource[FrameEvent](sourceCell, "frameHandler.out") - val flow = - Flow[ByteString] - .transform[FrameEvent](() ⇒ new FrameEventParser) - .via(Flow.wrap(sink, source)(Keep.none)) - .transform(() ⇒ new FrameEventRenderer) + val flow = Websocket.framing.join(Flow.wrap(sink, source)(Keep.none)) new WebsocketSetup { def websocketFlow: Flow[ByteString, ByteString, Any] = flow diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala index 062c070330..dd4dac5714 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala @@ -18,11 +18,12 @@ import scala.util.control.NonFatal * INTERNAL API */ private[http] object FrameHandler { - def create(server: Boolean): Flow[FrameEvent, Either[BypassEvent, MessagePart], Unit] = + type Output = Either[BypassEvent, MessagePart] + def create(server: Boolean): Flow[FrameEvent, Output, Unit] = Flow[FrameEvent].transform(() ⇒ new HandlerStage(server)) - class HandlerStage(server: Boolean) extends StatefulStage[FrameEvent, Either[BypassEvent, MessagePart]] { - type Ctx = Context[Either[BypassEvent, MessagePart]] + class HandlerStage(server: Boolean) extends StatefulStage[FrameEvent, Output] { + type Ctx = Context[Output] def initial: State = Idle object Idle extends StateWithControlFrameHandling { @@ -129,7 +130,7 @@ private[http] object FrameHandler { Right(ActivelyCloseWithCode(Some(closeCode), reason))), ctx, CloseAfterPeerClosed) object CloseAfterPeerClosed extends State { - def onPush(elem: FrameEvent, ctx: Context[Either[BypassEvent, MessagePart]]): SyncDirective = + def onPush(elem: FrameEvent, ctx: Context[Output]): SyncDirective = elem match { case FrameStart(FrameHeader(Opcode.Close, _, length, _, _, _, _), data) ⇒ become(WaitForPeerTcpClose) @@ -139,7 +140,7 @@ private[http] object FrameHandler { } } object WaitForPeerTcpClose extends State { - def onPush(elem: FrameEvent, ctx: Context[Either[BypassEvent, MessagePart]]): SyncDirective = + def onPush(elem: FrameEvent, ctx: Context[Output]): SyncDirective = ctx.pull() // ignore } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala index 5145875afb..ed29f99736 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameOutHandler.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.ws +import akka.stream.scaladsl.Flow + import scala.concurrent.duration.FiniteDuration import akka.stream.stage._ @@ -16,7 +18,7 @@ import Websocket.Tick * * INTERNAL API */ -private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration) extends StatefulStage[AnyRef, FrameStart] { +private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration) extends StatefulStage[FrameOutHandler.Input, FrameStart] { def initial: StageState[AnyRef, FrameStart] = Idle def closeTimeout: Timestamp = Timestamp.now + _closeTimeout @@ -129,4 +131,11 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu ctx.absorbTermination() case _ ⇒ super.onUpstreamFailure(cause, ctx) } +} + +private[http] object FrameOutHandler { + type Input = AnyRef + + def create(serverSide: Boolean, closeTimeout: FiniteDuration): Flow[Input, FrameStart, Unit] = + Flow[Input].transform(() ⇒ new FrameOutHandler(serverSide, closeTimeout)) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala index 0784456f98..bb6b53494f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala @@ -4,7 +4,7 @@ package akka.http.impl.engine.ws -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{ Keep, BidiFlow, Flow } import akka.stream.stage.{ SyncDirective, Context, StageState, StatefulStage } import scala.util.Random @@ -15,6 +15,9 @@ import scala.util.Random * INTERNAL API */ private[http] object Masking { + def apply(serverSide: Boolean, maskRandom: () ⇒ Random): BidiFlow[ /* net in */ FrameEvent, /* app out */ FrameEvent, /* app in */ FrameEvent, /* net out */ FrameEvent, Unit] = + BidiFlow.wrap(unmaskIf(serverSide), maskIf(!serverSide, maskRandom))(Keep.none) + def maskIf(condition: Boolean, maskRandom: () ⇒ Random): Flow[FrameEvent, FrameEvent, Unit] = if (condition) Flow[FrameEvent].transform(() ⇒ new Masking(maskRandom())) // new random per materialization else Flow[FrameEvent] diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala index 77abd43b17..3f9179f859 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/UpgradeToWebsocketLowLevel.scala @@ -28,5 +28,5 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None): HttpResponse override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None): HttpResponse = - handleFrames(Websocket.handleMessages(handlerFlow), subprotocol) + handleFrames(Websocket.stack(serverSide = true).join(handlerFlow), subprotocol) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index ea45530393..6efa2adae0 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -6,9 +6,11 @@ package akka.http.impl.engine.ws import java.security.SecureRandom +import akka.util.ByteString + import scala.concurrent.duration._ -import akka.stream.{ Attributes, FanOutShape2, FanInShape3, Inlet } +import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import FlexiRoute.{ DemandFrom, DemandFromAny, RouteLogic } @@ -19,13 +21,40 @@ import akka.http.scaladsl.model.ws._ /** * INTERNAL API + * + * Defines components of the websocket stack. */ private[http] object Websocket { import FrameHandler._ - def handleMessages[T](messageHandler: Flow[Message, Message, T], - serverSide: Boolean = true, - closeTimeout: FiniteDuration = 3.seconds): Flow[FrameEvent, FrameEvent, Unit] = { + /** The lowest layer that implements the binary protocol */ + def framing: BidiFlow[ByteString, FrameEvent, FrameEvent, ByteString, Unit] = + BidiFlow.wrap( + Flow[ByteString].transform(() ⇒ new FrameEventParser), + Flow[FrameEvent].transform(() ⇒ new FrameEventRenderer))(Keep.none) + .named("ws-framing") + + /** The layer that handles masking using the rules defined in the specification */ + def masking(serverSide: Boolean): BidiFlow[FrameEvent, FrameEvent, FrameEvent, FrameEvent, Unit] = + Masking(serverSide, () ⇒ new SecureRandom()) + .named("ws-masking") + + /** + * The layer that implements all low-level frame handling, like handling control frames, collecting messages + * from frames, decoding text messages, close handling, etc. + */ + def frameHandling(serverSide: Boolean = true, + closeTimeout: FiniteDuration): BidiFlow[FrameEvent, FrameHandler.Output, FrameOutHandler.Input, FrameStart, Unit] = + BidiFlow.wrap( + FrameHandler.create(server = serverSide), + FrameOutHandler.create(serverSide, closeTimeout))(Keep.none) + .named("ws-frame-handling") + + /** + * The layer that provides the high-level user facing API on top of frame handling. + */ + def messageAPI(serverSide: Boolean, + closeTimeout: FiniteDuration): BidiFlow[FrameHandler.Output, Message, Message, FrameOutHandler.Input, Unit] = { /** Completes this branch of the flow if no more messages are expected and converts close codes into errors */ class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] { var inMessage = false @@ -91,18 +120,6 @@ private[http] object Websocket { } } - lazy val userFlow = - Flow[MessagePart] - .transform(() ⇒ new PrepareForUserHandler) - .splitWhen(_.isMessageEnd) // FIXME using splitAfter from #16885 would simplify protocol a lot - .map(_.collect { - case m: MessageDataPart ⇒ m - }) - .via(collectMessage) - .via(messageHandler) - .via(MessageToFrameRenderer.create(serverSide)) - .transform(() ⇒ new LiftCompletions) - /** * Distributes output from the FrameHandler into bypass and userFlow. */ @@ -158,27 +175,56 @@ private[http] object Websocket { } } - lazy val bypassAndUserHandler: Flow[Either[BypassEvent, MessagePart], AnyRef, Unit] = - Flow(BypassRouter, Source(closeTimeout, closeTimeout, Tick), BypassMerge)((_, _, _) ⇒ ()) { implicit b ⇒ - (split, tick, merge) ⇒ - import FlowGraph.Implicits._ + def prepareMessages: Flow[MessagePart, Message, Unit] = + Flow[MessagePart] + .transform(() ⇒ new PrepareForUserHandler) + .splitWhen(_.isMessageEnd) // FIXME using splitAfter from #16885 would simplify protocol a lot + .map(_.collect { + case m: MessageDataPart ⇒ m + }) + .via(collectMessage) + .named("ws-prepare-messages") - split.out0 ~> merge.in0 - split.out1 ~> userFlow ~> merge.in1 - tick.outlet ~> merge.in2 + def renderMessages: Flow[Message, FrameStart, Unit] = + MessageToFrameRenderer.create(serverSide) + .named("ws-render-messages") - (split.in, merge.out) - } + BidiFlow() { implicit b ⇒ + import FlowGraph.Implicits._ - Flow[FrameEvent] - .via(Masking.unmaskIf(serverSide)) - .via(FrameHandler.create(server = serverSide)) - .mapConcat(x ⇒ x :: x :: Nil) // FIXME: #17004 - .via(bypassAndUserHandler) - .transform(() ⇒ new FrameOutHandler(serverSide, closeTimeout)) - .via(Masking.maskIf(!serverSide, () ⇒ new SecureRandom())) + val routePreparation = b.add(Flow[FrameHandler.Output].mapConcat(x ⇒ x :: x :: Nil)) + val split = b.add(BypassRouter) + val tick = Source(closeTimeout, closeTimeout, Tick) + val merge = b.add(BypassMerge) + val messagePreparation = b.add(prepareMessages) + val messageRendering = b.add(renderMessages.transform(() ⇒ new LiftCompletions)) + + routePreparation.outlet ~> split.in + + // user handler + split.out1 ~> messagePreparation + messageRendering.outlet ~> merge.in1 + + // bypass + split.out0 ~> merge.in0 + + // timeout support + tick ~> merge.in2 + + BidiShape( + routePreparation.inlet, + messagePreparation.outlet, + messageRendering.inlet, + merge.out) + }.named("ws-message-api") } + def stack(serverSide: Boolean = true, + closeTimeout: FiniteDuration = 3.seconds): BidiFlow[FrameEvent, Message, Message, FrameEvent, Unit] = + masking(serverSide) atop + frameHandling(serverSide, closeTimeout) atop + messageAPI(serverSide, closeTimeout) + object Tick case object SwitchToWebsocketToken } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index f50bc3b4d2..7423c6b575 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -782,7 +782,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { Source(netIn) .via(printEvent("netIn")) .transform(() ⇒ new FrameEventParser) - .via(Websocket.handleMessages(messageHandler, serverSide, closeTimeout = closeTimeout)) + .via(Websocket.stack(serverSide, closeTimeout = closeTimeout).join(messageHandler)) .via(printEvent("frameRendererIn")) .transform(() ⇒ new FrameEventRenderer) .via(printEvent("frameRendererOut")) From 3c39f71cbaeae1f0f8142ef1abcdd863bf8999e7 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 10 Aug 2015 15:33:46 +0200 Subject: [PATCH 5/6] =htc fix scaladoc of Http.bindAndHandle* --- .../main/scala/akka/http/javadsl/Http.scala | 30 ++++++++----------- .../main/scala/akka/http/scaladsl/Http.scala | 15 ++++------ 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 6c7deabcc0..632c1fddca 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -113,9 +113,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int, @@ -128,9 +127,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int, @@ -146,9 +144,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], interface: String, port: Int, @@ -160,9 +157,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], interface: String, port: Int, @@ -178,9 +174,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], interface: String, port: Int, @@ -192,9 +187,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], interface: String, port: Int, diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 8479b457bd..b93cc8ace2 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -74,9 +74,8 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any], interface: String, port: Int = -1, @@ -113,9 +112,8 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse, interface: String, port: Int = -1, @@ -128,9 +126,8 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` * [[Flow]] for processing all incoming connections. * - * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all - * connections are being accepted at maximum rate, which, depending on the applications, might - * present a DoS risk! + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. */ def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse], interface: String, port: Int = -1, From 67c3b35542d6bd175d262d8b6004a5e05acf9854 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 10 Aug 2015 16:10:11 +0200 Subject: [PATCH 6/6] =htp #18070 add comment to scaladoc of formField/parameter how to work around compilation error --- .../server/directives/FormFieldDirectives.scala | 10 ++++++++++ .../server/directives/ParameterDirectives.scala | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FormFieldDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FormFieldDirectives.scala index fb03b647c3..e856ebbeca 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FormFieldDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FormFieldDirectives.scala @@ -18,12 +18,22 @@ trait FormFieldDirectives extends ToNameReceptacleEnhancements { /** * Extracts an HTTP form field from the request. * Rejects the request if the defined form field matcher(s) don't match. + * + * Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an + * "too many arguments for method formField" or "type mismatch" error. + * + * As a workaround add an `import FormFieldDirectives.FieldMagnet` or use Scala 2.11.x. */ def formField(pdm: FieldMagnet): pdm.Out = pdm() /** * Extracts a number of HTTP form field from the request. * Rejects the request if the defined form field matcher(s) don't match. + * + * Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an + * "too many arguments for method formFields" or "type mismatch" error. + * + * As a workaround add an `import FormFieldDirectives.FieldMagnet` or use Scala 2.11.x. */ def formFields(pdm: FieldMagnet): pdm.Out = pdm() diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ParameterDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ParameterDirectives.scala index c4a13b0197..5fa526d06a 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ParameterDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ParameterDirectives.scala @@ -32,12 +32,22 @@ trait ParameterDirectives extends ToNameReceptacleEnhancements { /** * Extracts a query parameter value from the request. * Rejects the request if the defined query parameter matcher(s) don't match. + * + * Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an + * "too many arguments for method parameter" or "type mismatch" error. + * + * As a workaround add an `import ParameterDirectives.ParamMagnet` or use Scala 2.11.x. */ def parameter(pdm: ParamMagnet): pdm.Out = pdm() /** * Extracts a number of query parameter values from the request. * Rejects the request if the defined query parameter matcher(s) don't match. + * + * Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an + * "too many arguments for method parameters" or "type mismatch" error. + * + * As a workaround add an `import ParameterDirectives.ParamMagnet` or use Scala 2.11.x. */ def parameters(pdm: ParamMagnet): pdm.Out = pdm()