Merge pull request #18143 from spray/w/http-stack-cleanups
A few HTTP/WS stack cleanups
This commit is contained in:
commit
560785eaaa
16 changed files with 271 additions and 187 deletions
|
|
@ -13,6 +13,7 @@ import akka.event.LoggingAdapter
|
|||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.ClientConnectionSettings
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model.headers.Host
|
||||
import akka.http.scaladsl.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse }
|
||||
import akka.http.impl.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
|
||||
|
|
@ -23,9 +24,6 @@ import akka.http.impl.util._
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[http] object OutgoingConnectionBlueprint {
|
||||
|
||||
type ClientShape = BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse]
|
||||
|
||||
/*
|
||||
Stream Setup
|
||||
============
|
||||
|
|
@ -45,7 +43,7 @@ private[http] object OutgoingConnectionBlueprint {
|
|||
*/
|
||||
def apply(hostHeader: Host,
|
||||
settings: ClientConnectionSettings,
|
||||
log: LoggingAdapter): Graph[ClientShape, Unit] = {
|
||||
log: LoggingAdapter): Http.ClientLayer = {
|
||||
import settings._
|
||||
|
||||
// the initial header parser we initially use for every connection,
|
||||
|
|
@ -59,7 +57,7 @@ private[http] object OutgoingConnectionBlueprint {
|
|||
|
||||
val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest]
|
||||
.map(RequestRenderingContext(_, hostHeader))
|
||||
.via(Flow[RequestRenderingContext].transform(() ⇒ requestRendererFactory.newRenderer).named("renderer"))
|
||||
.via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer"))
|
||||
.flatten(FlattenStrategy.concat)
|
||||
|
||||
val methodBypass = Flow[HttpRequest].map(_.method)
|
||||
|
|
@ -76,7 +74,7 @@ private[http] object OutgoingConnectionBlueprint {
|
|||
case (MessageStartError(_, info), _) ⇒ throw IllegalResponseException(info)
|
||||
}
|
||||
|
||||
FlowGraph.partial() { implicit b ⇒
|
||||
BidiFlow() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true))
|
||||
val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser))
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.http.impl.engine.rendering
|
||||
|
||||
import akka.http.ClientConnectionSettings
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.util.ByteString
|
||||
|
|
@ -20,110 +22,133 @@ import headers._
|
|||
private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`User-Agent`],
|
||||
requestHeaderSizeHint: Int,
|
||||
log: LoggingAdapter) {
|
||||
import HttpRequestRendererFactory.RequestRenderingOutput
|
||||
|
||||
def newRenderer: HttpRequestRenderer = new HttpRequestRenderer
|
||||
def renderToSource(ctx: RequestRenderingContext): Source[ByteString, Any] = render(ctx).byteStream
|
||||
|
||||
final class HttpRequestRenderer extends PushStage[RequestRenderingContext, Source[ByteString, Any]] {
|
||||
def render(ctx: RequestRenderingContext): RequestRenderingOutput = {
|
||||
val r = new ByteStringRendering(requestHeaderSizeHint)
|
||||
import ctx.request._
|
||||
|
||||
override def onPush(ctx: RequestRenderingContext, opCtx: Context[Source[ByteString, Any]]): SyncDirective = {
|
||||
val r = new ByteStringRendering(requestHeaderSizeHint)
|
||||
import ctx.request._
|
||||
def renderRequestLine(): Unit = {
|
||||
r ~~ method ~~ ' '
|
||||
val rawRequestUriRendered = headers.exists {
|
||||
case `Raw-Request-URI`(rawUri) ⇒
|
||||
r ~~ rawUri; true
|
||||
case _ ⇒ false
|
||||
}
|
||||
if (!rawRequestUriRendered) UriRendering.renderUriWithoutFragment(r, uri, UTF8)
|
||||
r ~~ ' ' ~~ protocol ~~ CrLf
|
||||
}
|
||||
|
||||
def renderRequestLine(): Unit = {
|
||||
r ~~ method ~~ ' '
|
||||
val rawRequestUriRendered = headers.exists {
|
||||
case `Raw-Request-URI`(rawUri) ⇒
|
||||
r ~~ rawUri; true
|
||||
case _ ⇒ false
|
||||
def render(h: HttpHeader) = r ~~ h ~~ CrLf
|
||||
|
||||
@tailrec def renderHeaders(remaining: List[HttpHeader], hostHeaderSeen: Boolean = false,
|
||||
userAgentSeen: Boolean = false, transferEncodingSeen: Boolean = false): Unit =
|
||||
remaining match {
|
||||
case head :: tail ⇒ head match {
|
||||
case x: `Content-Length` ⇒
|
||||
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x: `Content-Type` ⇒
|
||||
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.")
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x: `Transfer-Encoding` ⇒
|
||||
x.withChunkedPeeled match {
|
||||
case None ⇒
|
||||
suppressionWarning(log, head)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
case Some(te) ⇒
|
||||
// if the user applied some custom transfer-encoding we need to keep the header
|
||||
render(if (entity.isChunked && !entity.isKnownEmpty) te.withChunked else te)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen = true)
|
||||
}
|
||||
|
||||
case x: `Host` ⇒
|
||||
render(x)
|
||||
renderHeaders(tail, hostHeaderSeen = true, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x: `User-Agent` ⇒
|
||||
render(x)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen = true, transferEncodingSeen)
|
||||
|
||||
case x: `Raw-Request-URI` ⇒ // we never render this header
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x: CustomHeader ⇒
|
||||
if (!x.suppressRendering) render(x)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x: RawHeader if (x is "content-type") || (x is "content-length") || (x is "transfer-encoding") ||
|
||||
(x is "host") || (x is "user-agent") ⇒
|
||||
suppressionWarning(log, x, "illegal RawHeader")
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x ⇒
|
||||
render(x)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
}
|
||||
if (!rawRequestUriRendered) UriRendering.renderUriWithoutFragment(r, uri, UTF8)
|
||||
r ~~ ' ' ~~ protocol ~~ CrLf
|
||||
|
||||
case Nil ⇒
|
||||
if (!hostHeaderSeen) r ~~ ctx.hostHeader ~~ CrLf
|
||||
if (!userAgentSeen && userAgentHeader.isDefined) r ~~ userAgentHeader.get ~~ CrLf
|
||||
if (entity.isChunked && !entity.isKnownEmpty && !transferEncodingSeen)
|
||||
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf
|
||||
}
|
||||
|
||||
def render(h: HttpHeader) = r ~~ h ~~ CrLf
|
||||
def renderContentLength(contentLength: Long) =
|
||||
if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r
|
||||
|
||||
@tailrec def renderHeaders(remaining: List[HttpHeader], hostHeaderSeen: Boolean = false,
|
||||
userAgentSeen: Boolean = false, transferEncodingSeen: Boolean = false): Unit =
|
||||
remaining match {
|
||||
case head :: tail ⇒ head match {
|
||||
case x: `Content-Length` ⇒
|
||||
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
def renderStreamed(body: Source[ByteString, Any]): RequestRenderingOutput =
|
||||
RequestRenderingOutput.Streamed(renderByteStrings(r, body))
|
||||
|
||||
case x: `Content-Type` ⇒
|
||||
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.")
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
def completeRequestRendering(): RequestRenderingOutput =
|
||||
entity match {
|
||||
case x if x.isKnownEmpty ⇒
|
||||
renderContentLength(0) ~~ CrLf
|
||||
RequestRenderingOutput.Strict(r.get)
|
||||
|
||||
case x: `Transfer-Encoding` ⇒
|
||||
x.withChunkedPeeled match {
|
||||
case None ⇒
|
||||
suppressionWarning(log, head)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
case Some(te) ⇒
|
||||
// if the user applied some custom transfer-encoding we need to keep the header
|
||||
render(if (entity.isChunked && !entity.isKnownEmpty) te.withChunked else te)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen = true)
|
||||
}
|
||||
case HttpEntity.Strict(_, data) ⇒
|
||||
renderContentLength(data.length) ~~ CrLf
|
||||
RequestRenderingOutput.Strict(r.get ++ data)
|
||||
|
||||
case x: `Host` ⇒
|
||||
render(x)
|
||||
renderHeaders(tail, hostHeaderSeen = true, userAgentSeen, transferEncodingSeen)
|
||||
case HttpEntity.Default(_, contentLength, data) ⇒
|
||||
renderContentLength(contentLength) ~~ CrLf
|
||||
renderStreamed(data.via(CheckContentLengthTransformer.flow(contentLength)))
|
||||
|
||||
case x: `User-Agent` ⇒
|
||||
render(x)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen = true, transferEncodingSeen)
|
||||
case HttpEntity.Chunked(_, chunks) ⇒
|
||||
r ~~ CrLf
|
||||
renderStreamed(chunks.via(ChunkTransformer.flow))
|
||||
}
|
||||
|
||||
case x: `Raw-Request-URI` ⇒ // we never render this header
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
renderRequestLine()
|
||||
renderHeaders(headers.toList)
|
||||
renderEntityContentType(r, entity)
|
||||
completeRequestRendering()
|
||||
}
|
||||
|
||||
case x: CustomHeader ⇒
|
||||
if (!x.suppressRendering) render(x)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x: RawHeader if (x is "content-type") || (x is "content-length") || (x is "transfer-encoding") ||
|
||||
(x is "host") || (x is "user-agent") ⇒
|
||||
suppressionWarning(log, x, "illegal RawHeader")
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
|
||||
case x ⇒
|
||||
render(x)
|
||||
renderHeaders(tail, hostHeaderSeen, userAgentSeen, transferEncodingSeen)
|
||||
}
|
||||
|
||||
case Nil ⇒
|
||||
if (!hostHeaderSeen) r ~~ ctx.hostHeader ~~ CrLf
|
||||
if (!userAgentSeen && userAgentHeader.isDefined) r ~~ userAgentHeader.get ~~ CrLf
|
||||
if (entity.isChunked && !entity.isKnownEmpty && !transferEncodingSeen)
|
||||
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf
|
||||
}
|
||||
|
||||
def renderContentLength(contentLength: Long) =
|
||||
if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r
|
||||
|
||||
def completeRequestRendering(): Source[ByteString, Any] =
|
||||
entity match {
|
||||
case x if x.isKnownEmpty ⇒
|
||||
renderContentLength(0) ~~ CrLf
|
||||
Source.single(r.get)
|
||||
|
||||
case HttpEntity.Strict(_, data) ⇒
|
||||
renderContentLength(data.length) ~~ CrLf
|
||||
Source.single(r.get ++ data)
|
||||
|
||||
case HttpEntity.Default(_, contentLength, data) ⇒
|
||||
renderContentLength(contentLength) ~~ CrLf
|
||||
renderByteStrings(r, data.via(CheckContentLengthTransformer.flow(contentLength)))
|
||||
|
||||
case HttpEntity.Chunked(_, chunks) ⇒
|
||||
r ~~ CrLf
|
||||
renderByteStrings(r, chunks.via(ChunkTransformer.flow))
|
||||
}
|
||||
|
||||
renderRequestLine()
|
||||
renderHeaders(headers.toList)
|
||||
renderEntityContentType(r, entity)
|
||||
opCtx.push(completeRequestRendering())
|
||||
def renderStrict(ctx: RequestRenderingContext): ByteString =
|
||||
render(ctx) match {
|
||||
case RequestRenderingOutput.Strict(bytes) ⇒ bytes
|
||||
case _: RequestRenderingOutput.Streamed ⇒
|
||||
throw new IllegalArgumentException(s"Request entity was not Strict but ${ctx.request.entity.getClass.getSimpleName}")
|
||||
}
|
||||
}
|
||||
|
||||
private[http] object HttpRequestRendererFactory {
|
||||
def renderStrict(ctx: RequestRenderingContext, settings: ClientConnectionSettings, log: LoggingAdapter): ByteString =
|
||||
new HttpRequestRendererFactory(settings.userAgentHeader, settings.requestHeaderSizeHint, log).renderStrict(ctx)
|
||||
|
||||
sealed trait RequestRenderingOutput {
|
||||
def byteStream: Source[ByteString, Any]
|
||||
}
|
||||
object RequestRenderingOutput {
|
||||
case class Strict(bytes: ByteString) extends RequestRenderingOutput {
|
||||
def byteStream: Source[ByteString, Any] = Source.single(bytes)
|
||||
}
|
||||
case class Streamed(byteStream: Source[ByteString, Any]) extends RequestRenderingOutput
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import akka.stream.scaladsl.FlexiRoute.{ DemandFrom, RouteLogic }
|
|||
import akka.http.impl.engine.parsing._
|
||||
import akka.http.impl.engine.rendering.{ ResponseRenderingOutput, ResponseRenderingContext, HttpResponseRendererFactory }
|
||||
import akka.http.impl.engine.TokenSourceActor
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.impl.util._
|
||||
import akka.http.impl.engine.ws._
|
||||
|
|
@ -31,10 +32,7 @@ import ParserOutput._
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[http] object HttpServerBluePrint {
|
||||
|
||||
type ServerShape = BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest]
|
||||
|
||||
def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Graph[ServerShape, Unit] = {
|
||||
def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Http.ServerLayer = {
|
||||
import settings._
|
||||
|
||||
// the initial header parser we initially use for every connection,
|
||||
|
|
@ -98,7 +96,7 @@ private[http] object HttpServerBluePrint {
|
|||
.flatten(FlattenStrategy.concat)
|
||||
.via(Flow[ResponseRenderingOutput].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
|
||||
|
||||
FlowGraph.partial(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒
|
||||
BidiFlow(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒
|
||||
(requestParsing, renderer, oneHundreds) ⇒
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
|
|
@ -300,11 +298,7 @@ private[http] object HttpServerBluePrint {
|
|||
val sink = StreamUtils.oneTimePublisherSink[FrameEvent](sinkCell, "frameHandler.in")
|
||||
val source = StreamUtils.oneTimeSubscriberSource[FrameEvent](sourceCell, "frameHandler.out")
|
||||
|
||||
val flow =
|
||||
Flow[ByteString]
|
||||
.transform[FrameEvent](() ⇒ new FrameEventParser)
|
||||
.via(Flow.wrap(sink, source)(Keep.none))
|
||||
.transform(() ⇒ new FrameEventRenderer)
|
||||
val flow = Websocket.framing.join(Flow.wrap(sink, source)(Keep.none))
|
||||
|
||||
new WebsocketSetup {
|
||||
def websocketFlow: Flow[ByteString, ByteString, Any] = flow
|
||||
|
|
|
|||
|
|
@ -18,11 +18,12 @@ import scala.util.control.NonFatal
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[http] object FrameHandler {
|
||||
def create(server: Boolean): Flow[FrameEvent, Either[BypassEvent, MessagePart], Unit] =
|
||||
type Output = Either[BypassEvent, MessagePart]
|
||||
def create(server: Boolean): Flow[FrameEvent, Output, Unit] =
|
||||
Flow[FrameEvent].transform(() ⇒ new HandlerStage(server))
|
||||
|
||||
class HandlerStage(server: Boolean) extends StatefulStage[FrameEvent, Either[BypassEvent, MessagePart]] {
|
||||
type Ctx = Context[Either[BypassEvent, MessagePart]]
|
||||
class HandlerStage(server: Boolean) extends StatefulStage[FrameEvent, Output] {
|
||||
type Ctx = Context[Output]
|
||||
def initial: State = Idle
|
||||
|
||||
object Idle extends StateWithControlFrameHandling {
|
||||
|
|
@ -129,7 +130,7 @@ private[http] object FrameHandler {
|
|||
Right(ActivelyCloseWithCode(Some(closeCode), reason))), ctx, CloseAfterPeerClosed)
|
||||
|
||||
object CloseAfterPeerClosed extends State {
|
||||
def onPush(elem: FrameEvent, ctx: Context[Either[BypassEvent, MessagePart]]): SyncDirective =
|
||||
def onPush(elem: FrameEvent, ctx: Context[Output]): SyncDirective =
|
||||
elem match {
|
||||
case FrameStart(FrameHeader(Opcode.Close, _, length, _, _, _, _), data) ⇒
|
||||
become(WaitForPeerTcpClose)
|
||||
|
|
@ -139,7 +140,7 @@ private[http] object FrameHandler {
|
|||
}
|
||||
}
|
||||
object WaitForPeerTcpClose extends State {
|
||||
def onPush(elem: FrameEvent, ctx: Context[Either[BypassEvent, MessagePart]]): SyncDirective =
|
||||
def onPush(elem: FrameEvent, ctx: Context[Output]): SyncDirective =
|
||||
ctx.pull() // ignore
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.http.impl.engine.ws
|
||||
|
||||
import akka.stream.scaladsl.Flow
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.stream.stage._
|
||||
|
|
@ -16,7 +18,7 @@ import Websocket.Tick
|
|||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration) extends StatefulStage[AnyRef, FrameStart] {
|
||||
private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDuration) extends StatefulStage[FrameOutHandler.Input, FrameStart] {
|
||||
def initial: StageState[AnyRef, FrameStart] = Idle
|
||||
def closeTimeout: Timestamp = Timestamp.now + _closeTimeout
|
||||
|
||||
|
|
@ -129,4 +131,11 @@ private[http] class FrameOutHandler(serverSide: Boolean, _closeTimeout: FiniteDu
|
|||
ctx.absorbTermination()
|
||||
case _ ⇒ super.onUpstreamFailure(cause, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
private[http] object FrameOutHandler {
|
||||
type Input = AnyRef
|
||||
|
||||
def create(serverSide: Boolean, closeTimeout: FiniteDuration): Flow[Input, FrameStart, Unit] =
|
||||
Flow[Input].transform(() ⇒ new FrameOutHandler(serverSide, closeTimeout))
|
||||
}
|
||||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.http.impl.engine.ws
|
||||
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.{ Keep, BidiFlow, Flow }
|
||||
import akka.stream.stage.{ SyncDirective, Context, StageState, StatefulStage }
|
||||
|
||||
import scala.util.Random
|
||||
|
|
@ -15,6 +15,9 @@ import scala.util.Random
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[http] object Masking {
|
||||
def apply(serverSide: Boolean, maskRandom: () ⇒ Random): BidiFlow[ /* net in */ FrameEvent, /* app out */ FrameEvent, /* app in */ FrameEvent, /* net out */ FrameEvent, Unit] =
|
||||
BidiFlow.wrap(unmaskIf(serverSide), maskIf(!serverSide, maskRandom))(Keep.none)
|
||||
|
||||
def maskIf(condition: Boolean, maskRandom: () ⇒ Random): Flow[FrameEvent, FrameEvent, Unit] =
|
||||
if (condition) Flow[FrameEvent].transform(() ⇒ new Masking(maskRandom())) // new random per materialization
|
||||
else Flow[FrameEvent]
|
||||
|
|
|
|||
|
|
@ -28,5 +28,5 @@ private[http] abstract class UpgradeToWebsocketLowLevel extends InternalCustomHe
|
|||
private[http] def handleFrames(handlerFlow: Flow[FrameEvent, FrameEvent, Any], subprotocol: Option[String] = None): HttpResponse
|
||||
|
||||
override def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None): HttpResponse =
|
||||
handleFrames(Websocket.handleMessages(handlerFlow), subprotocol)
|
||||
handleFrames(Websocket.stack(serverSide = true).join(handlerFlow), subprotocol)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,9 +6,11 @@ package akka.http.impl.engine.ws
|
|||
|
||||
import java.security.SecureRandom
|
||||
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.stream.{ Attributes, FanOutShape2, FanInShape3, Inlet }
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.stage._
|
||||
import FlexiRoute.{ DemandFrom, DemandFromAny, RouteLogic }
|
||||
|
|
@ -19,13 +21,40 @@ import akka.http.scaladsl.model.ws._
|
|||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Defines components of the websocket stack.
|
||||
*/
|
||||
private[http] object Websocket {
|
||||
import FrameHandler._
|
||||
|
||||
def handleMessages[T](messageHandler: Flow[Message, Message, T],
|
||||
serverSide: Boolean = true,
|
||||
closeTimeout: FiniteDuration = 3.seconds): Flow[FrameEvent, FrameEvent, Unit] = {
|
||||
/** The lowest layer that implements the binary protocol */
|
||||
def framing: BidiFlow[ByteString, FrameEvent, FrameEvent, ByteString, Unit] =
|
||||
BidiFlow.wrap(
|
||||
Flow[ByteString].transform(() ⇒ new FrameEventParser),
|
||||
Flow[FrameEvent].transform(() ⇒ new FrameEventRenderer))(Keep.none)
|
||||
.named("ws-framing")
|
||||
|
||||
/** The layer that handles masking using the rules defined in the specification */
|
||||
def masking(serverSide: Boolean): BidiFlow[FrameEvent, FrameEvent, FrameEvent, FrameEvent, Unit] =
|
||||
Masking(serverSide, () ⇒ new SecureRandom())
|
||||
.named("ws-masking")
|
||||
|
||||
/**
|
||||
* The layer that implements all low-level frame handling, like handling control frames, collecting messages
|
||||
* from frames, decoding text messages, close handling, etc.
|
||||
*/
|
||||
def frameHandling(serverSide: Boolean = true,
|
||||
closeTimeout: FiniteDuration): BidiFlow[FrameEvent, FrameHandler.Output, FrameOutHandler.Input, FrameStart, Unit] =
|
||||
BidiFlow.wrap(
|
||||
FrameHandler.create(server = serverSide),
|
||||
FrameOutHandler.create(serverSide, closeTimeout))(Keep.none)
|
||||
.named("ws-frame-handling")
|
||||
|
||||
/**
|
||||
* The layer that provides the high-level user facing API on top of frame handling.
|
||||
*/
|
||||
def messageAPI(serverSide: Boolean,
|
||||
closeTimeout: FiniteDuration): BidiFlow[FrameHandler.Output, Message, Message, FrameOutHandler.Input, Unit] = {
|
||||
/** Completes this branch of the flow if no more messages are expected and converts close codes into errors */
|
||||
class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] {
|
||||
var inMessage = false
|
||||
|
|
@ -91,18 +120,6 @@ private[http] object Websocket {
|
|||
}
|
||||
}
|
||||
|
||||
lazy val userFlow =
|
||||
Flow[MessagePart]
|
||||
.transform(() ⇒ new PrepareForUserHandler)
|
||||
.splitWhen(_.isMessageEnd) // FIXME using splitAfter from #16885 would simplify protocol a lot
|
||||
.map(_.collect {
|
||||
case m: MessageDataPart ⇒ m
|
||||
})
|
||||
.via(collectMessage)
|
||||
.via(messageHandler)
|
||||
.via(MessageToFrameRenderer.create(serverSide))
|
||||
.transform(() ⇒ new LiftCompletions)
|
||||
|
||||
/**
|
||||
* Distributes output from the FrameHandler into bypass and userFlow.
|
||||
*/
|
||||
|
|
@ -158,27 +175,56 @@ private[http] object Websocket {
|
|||
}
|
||||
}
|
||||
|
||||
lazy val bypassAndUserHandler: Flow[Either[BypassEvent, MessagePart], AnyRef, Unit] =
|
||||
Flow(BypassRouter, Source(closeTimeout, closeTimeout, Tick), BypassMerge)((_, _, _) ⇒ ()) { implicit b ⇒
|
||||
(split, tick, merge) ⇒
|
||||
import FlowGraph.Implicits._
|
||||
def prepareMessages: Flow[MessagePart, Message, Unit] =
|
||||
Flow[MessagePart]
|
||||
.transform(() ⇒ new PrepareForUserHandler)
|
||||
.splitWhen(_.isMessageEnd) // FIXME using splitAfter from #16885 would simplify protocol a lot
|
||||
.map(_.collect {
|
||||
case m: MessageDataPart ⇒ m
|
||||
})
|
||||
.via(collectMessage)
|
||||
.named("ws-prepare-messages")
|
||||
|
||||
split.out0 ~> merge.in0
|
||||
split.out1 ~> userFlow ~> merge.in1
|
||||
tick.outlet ~> merge.in2
|
||||
def renderMessages: Flow[Message, FrameStart, Unit] =
|
||||
MessageToFrameRenderer.create(serverSide)
|
||||
.named("ws-render-messages")
|
||||
|
||||
(split.in, merge.out)
|
||||
}
|
||||
BidiFlow() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
Flow[FrameEvent]
|
||||
.via(Masking.unmaskIf(serverSide))
|
||||
.via(FrameHandler.create(server = serverSide))
|
||||
.mapConcat(x ⇒ x :: x :: Nil) // FIXME: #17004
|
||||
.via(bypassAndUserHandler)
|
||||
.transform(() ⇒ new FrameOutHandler(serverSide, closeTimeout))
|
||||
.via(Masking.maskIf(!serverSide, () ⇒ new SecureRandom()))
|
||||
val routePreparation = b.add(Flow[FrameHandler.Output].mapConcat(x ⇒ x :: x :: Nil))
|
||||
val split = b.add(BypassRouter)
|
||||
val tick = Source(closeTimeout, closeTimeout, Tick)
|
||||
val merge = b.add(BypassMerge)
|
||||
val messagePreparation = b.add(prepareMessages)
|
||||
val messageRendering = b.add(renderMessages.transform(() ⇒ new LiftCompletions))
|
||||
|
||||
routePreparation.outlet ~> split.in
|
||||
|
||||
// user handler
|
||||
split.out1 ~> messagePreparation
|
||||
messageRendering.outlet ~> merge.in1
|
||||
|
||||
// bypass
|
||||
split.out0 ~> merge.in0
|
||||
|
||||
// timeout support
|
||||
tick ~> merge.in2
|
||||
|
||||
BidiShape(
|
||||
routePreparation.inlet,
|
||||
messagePreparation.outlet,
|
||||
messageRendering.inlet,
|
||||
merge.out)
|
||||
}.named("ws-message-api")
|
||||
}
|
||||
|
||||
def stack(serverSide: Boolean = true,
|
||||
closeTimeout: FiniteDuration = 3.seconds): BidiFlow[FrameEvent, Message, Message, FrameEvent, Unit] =
|
||||
masking(serverSide) atop
|
||||
frameHandling(serverSide, closeTimeout) atop
|
||||
messageAPI(serverSide, closeTimeout)
|
||||
|
||||
object Tick
|
||||
case object SwitchToWebsocketToken
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,9 +113,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
|
||||
interface: String, port: Int,
|
||||
|
|
@ -128,9 +127,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
|
||||
interface: String, port: Int,
|
||||
|
|
@ -146,9 +144,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
|
||||
interface: String, port: Int,
|
||||
|
|
@ -160,9 +157,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
|
||||
interface: String, port: Int,
|
||||
|
|
@ -178,9 +174,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
|
||||
interface: String, port: Int,
|
||||
|
|
@ -192,9 +187,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
|
||||
interface: String, port: Int,
|
||||
|
|
|
|||
|
|
@ -74,9 +74,8 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any],
|
||||
interface: String, port: Int = -1,
|
||||
|
|
@ -113,9 +112,8 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse,
|
||||
interface: String, port: Int = -1,
|
||||
|
|
@ -128,9 +126,8 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
|
||||
* [[Flow]] for processing all incoming connections.
|
||||
*
|
||||
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
|
||||
* connections are being accepted at maximum rate, which, depending on the applications, might
|
||||
* present a DoS risk!
|
||||
* The number of concurrently accepted connections can be configured by overriding
|
||||
* the `akka.http.server.max-connections` setting.
|
||||
*/
|
||||
def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse],
|
||||
interface: String, port: Int = -1,
|
||||
|
|
@ -156,7 +153,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
def serverLayer(settings: ServerSettings,
|
||||
remoteAddress: Option[InetSocketAddress] = None,
|
||||
log: LoggingAdapter = system.log)(implicit mat: Materializer): ServerLayer =
|
||||
BidiFlow.wrap(HttpServerBluePrint(settings, remoteAddress, log))
|
||||
HttpServerBluePrint(settings, remoteAddress, log)
|
||||
|
||||
/**
|
||||
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
|
||||
|
|
@ -210,7 +207,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
def clientLayer(hostHeader: Host,
|
||||
settings: ClientConnectionSettings,
|
||||
log: LoggingAdapter = system.log): ClientLayer =
|
||||
BidiFlow.wrap(OutgoingConnectionBlueprint(hostHeader, settings, log))
|
||||
OutgoingConnectionBlueprint(hostHeader, settings, log)
|
||||
|
||||
/**
|
||||
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
|
||||
|
|
@ -493,7 +490,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
|
|||
|
||||
//#server-layer
|
||||
/**
|
||||
* The type of the server-side HTTP layer as a stand-alone BidiStage
|
||||
* The type of the server-side HTTP layer as a stand-alone BidiFlow
|
||||
* that can be put atop the TCP layer to form an HTTP server.
|
||||
*
|
||||
* {{{
|
||||
|
|
@ -509,7 +506,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
|
|||
|
||||
//#client-layer
|
||||
/**
|
||||
* The type of the client-side HTTP layer as a stand-alone BidiStage
|
||||
* The type of the client-side HTTP layer as a stand-alone BidiFlow
|
||||
* that can be put atop the TCP layer to form an HTTP client.
|
||||
*
|
||||
* {{{
|
||||
|
|
|
|||
|
|
@ -6,12 +6,15 @@ package akka.http.impl.engine.client
|
|||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import akka.http.scaladsl.{ Http, TestUtils }
|
||||
import akka.http.scaladsl.model._
|
||||
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
import akka.http.scaladsl.{ Http, TestUtils }
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.impl.util._
|
||||
|
||||
class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
package akka.http.impl.engine.client
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import akka.http.ClientConnectionSettings
|
||||
import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes }
|
||||
import org.scalatest.Inside
|
||||
|
|
@ -349,7 +348,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
|
|||
|
||||
class TestSetup {
|
||||
val requests = TestPublisher.manualProbe[HttpRequest]()
|
||||
val responses = TestSubscriber.manualProbe[HttpResponse]
|
||||
val responses = TestSubscriber.manualProbe[HttpResponse]()
|
||||
|
||||
def settings = ClientConnectionSettings(system)
|
||||
.copy(userAgentHeader = Some(`User-Agent`(List(ProductVersion("akka-http", "test")))))
|
||||
|
|
|
|||
|
|
@ -311,10 +311,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
|
|||
|
||||
def renderTo(expected: String): Matcher[HttpRequest] =
|
||||
equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒
|
||||
val renderer = newRenderer
|
||||
val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, Host(serverAddress)))
|
||||
.transform(() ⇒ renderer).named("renderer")
|
||||
.runWith(Sink.head), 1.second)
|
||||
val byteStringSource = renderToSource(RequestRenderingContext(request, Host(serverAddress)))
|
||||
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
|
||||
Await.result(future, 250.millis)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -307,8 +307,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
|||
// split up the code point
|
||||
val half1 = gclef.take(1)
|
||||
val half2 = gclef.drop(1)
|
||||
println(half1(0).toInt.toHexString)
|
||||
println(half2(0).toInt.toHexString)
|
||||
|
||||
val pub = TestPublisher.manualProbe[String]()
|
||||
val msg = TextMessage(Source(pub))
|
||||
|
|
@ -784,7 +782,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
|||
Source(netIn)
|
||||
.via(printEvent("netIn"))
|
||||
.transform(() ⇒ new FrameEventParser)
|
||||
.via(Websocket.handleMessages(messageHandler, serverSide, closeTimeout = closeTimeout))
|
||||
.via(Websocket.stack(serverSide, closeTimeout = closeTimeout).join(messageHandler))
|
||||
.via(printEvent("frameRendererIn"))
|
||||
.transform(() ⇒ new FrameEventRenderer)
|
||||
.via(printEvent("frameRendererOut"))
|
||||
|
|
|
|||
|
|
@ -18,12 +18,22 @@ trait FormFieldDirectives extends ToNameReceptacleEnhancements {
|
|||
/**
|
||||
* Extracts an HTTP form field from the request.
|
||||
* Rejects the request if the defined form field matcher(s) don't match.
|
||||
*
|
||||
* Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an
|
||||
* "too many arguments for method formField" or "type mismatch" error.
|
||||
*
|
||||
* As a workaround add an `import FormFieldDirectives.FieldMagnet` or use Scala 2.11.x.
|
||||
*/
|
||||
def formField(pdm: FieldMagnet): pdm.Out = pdm()
|
||||
|
||||
/**
|
||||
* Extracts a number of HTTP form field from the request.
|
||||
* Rejects the request if the defined form field matcher(s) don't match.
|
||||
*
|
||||
* Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an
|
||||
* "too many arguments for method formFields" or "type mismatch" error.
|
||||
*
|
||||
* As a workaround add an `import FormFieldDirectives.FieldMagnet` or use Scala 2.11.x.
|
||||
*/
|
||||
def formFields(pdm: FieldMagnet): pdm.Out = pdm()
|
||||
|
||||
|
|
|
|||
|
|
@ -32,12 +32,22 @@ trait ParameterDirectives extends ToNameReceptacleEnhancements {
|
|||
/**
|
||||
* Extracts a query parameter value from the request.
|
||||
* Rejects the request if the defined query parameter matcher(s) don't match.
|
||||
*
|
||||
* Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an
|
||||
* "too many arguments for method parameter" or "type mismatch" error.
|
||||
*
|
||||
* As a workaround add an `import ParameterDirectives.ParamMagnet` or use Scala 2.11.x.
|
||||
*/
|
||||
def parameter(pdm: ParamMagnet): pdm.Out = pdm()
|
||||
|
||||
/**
|
||||
* Extracts a number of query parameter values from the request.
|
||||
* Rejects the request if the defined query parameter matcher(s) don't match.
|
||||
*
|
||||
* Due to a bug in Scala 2.10, invocations of this method sometimes fail to compile with an
|
||||
* "too many arguments for method parameters" or "type mismatch" error.
|
||||
*
|
||||
* As a workaround add an `import ParameterDirectives.ParamMagnet` or use Scala 2.11.x.
|
||||
*/
|
||||
def parameters(pdm: ParamMagnet): pdm.Out = pdm()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue