=str,htc apply review feedback: smaller improvement across the board

This commit is contained in:
Mathias 2015-11-25 16:33:13 +01:00
parent aeb05bd108
commit 87cf576b8d
5 changed files with 86 additions and 71 deletions

View file

@ -31,22 +31,22 @@ import akka.util.ByteString
* INTERNAL API * INTERNAL API
* *
* *
* HTTP pipeline setup: * HTTP pipeline setup (without the underlying SSL/TLS (un)wrapping and the websocket switch):
* *
* +-------------+ +-------------+ +-----------+ * +----------+ +-------------+ +-------------+ +-----------+
* HttpRequest | request- | Request- | | Request- | request- | ByteString * HttpRequest | | Http- | request- | Request- | | Request- | request- | ByteString
* | <-----------------+ Preparation <-----------------+ <-------------------+ Parsing <--------------- * | <------------+ <----------+ Preparation <----------+ <-------------+ Parsing <-----------
* | | | Output | | Output | | * | | | Request | | Output | | Output | |
* | +-------------+ | | +-----------+ * | | | +-------------+ | | +-----------+
* | | | * | | | | |
* | Application- | controller- | * | Application- | One2One- | | controller- |
* | Flow | Stage | * | Flow | Bidi | | Stage |
* | | | * | | | | |
* | | | +-----------+ * | | | | | +-----------+
* | HttpResponse | | Response- | renderer- | ByteString * | HttpResponse | | HttpResponse | | Response- | renderer- | ByteString
* v --------------------------------------------------> +-------------------> Pipeline +--------------> * v -------------> +-----------------------------------> +-------------> Pipeline +---------->
* | | Rendering- | | * | | | | Rendering- | |
* +-------------+ Context +-----------+ * +----------+ +-------------+ Context +-----------+
*/ */
private[http] object HttpServerBluePrint { private[http] object HttpServerBluePrint {
def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Http.ServerLayer = { def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Http.ServerLayer = {
@ -81,7 +81,7 @@ private[http] object HttpServerBluePrint {
rootParser.createShallowCopy().stage).named("rootParser") rootParser.createShallowCopy().stage).named("rootParser")
.map(establishAbsoluteUri) .map(establishAbsoluteUri)
val requestPreparation = val requestPreparationFlow =
Flow[RequestOutput] Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd) .splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.via(headAndTailFlow) .via(headAndTailFlow)
@ -105,34 +105,35 @@ private[http] object HttpServerBluePrint {
// `buffer` will ensure demand and therefore make sure that completion is reported eagerly. // `buffer` will ensure demand and therefore make sure that completion is reported eagerly.
.buffer(1, OverflowStrategy.backpressure) .buffer(1, OverflowStrategy.backpressure)
val rendererPipeline = val responseRenderingFlow =
Flow[ResponseRenderingContext] Flow[ResponseRenderingContext]
.via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer")) .via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer"))
.flatMapConcat(ConstantFun.scalaIdentityFunction) .flatMapConcat(ConstantFun.scalaIdentityFunction)
.via(Flow[ResponseRenderingOutput].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger")) .via(Flow[ResponseRenderingOutput].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline)(Keep.none) { implicit b BidiFlow.fromGraph(FlowGraph.create() { implicit b
(requestParsing, renderer)
import FlowGraph.Implicits._ import FlowGraph.Implicits._
// HTTP // HTTP
val requestPrep = b.add(requestPreparation) val requestParsing = b.add(requestParsingFlow)
val requestPreparation = b.add(requestPreparationFlow)
val responseRendering = b.add(responseRenderingFlow)
val controllerStage = b.add(new ControllerStage(settings, log)) val controllerStage = b.add(new ControllerStage(settings, log))
val csRequestParsingIn = controllerStage.in1 val csRequestParsingIn = controllerStage.in1
val csRequestPrepOut = controllerStage.out1 val csRequestPrepOut = controllerStage.out1
val csHttpResponseIn = controllerStage.in2 val csHttpResponseIn = controllerStage.in2
val csResponseCtxOut = controllerStage.out2 val csResponseCtxOut = controllerStage.out2
requestParsing.outlet ~> csRequestParsingIn requestParsing.outlet ~> csRequestParsingIn
csResponseCtxOut ~> renderer.inlet csResponseCtxOut ~> responseRendering.inlet
csRequestPrepOut ~> requestPrep csRequestPrepOut ~> requestPreparation
// One2OneBidi // One2OneBidi
val one2one = b.add(new One2OneBidi[HttpRequest, HttpResponse](settings.pipeliningLimit)) val one2one = b.add(new One2OneBidi[HttpRequest, HttpResponse](settings.pipeliningLimit))
requestPrep.outlet ~> one2one.in1 requestPreparation.outlet ~> one2one.in1
one2one.out2 ~> csHttpResponseIn one2one.out2 ~> csHttpResponseIn
// Websocket // Websocket
val http = FlowShape(requestParsing.inlet, renderer.outlet) val http = FlowShape(requestParsing.inlet, responseRendering.outlet)
val switchTokenBroadcast = b.add(Broadcast[ResponseRenderingOutput](2)) val switchTokenBroadcast = b.add(Broadcast[ResponseRenderingOutput](2))
val switchToWebsocket = b.add(Flow[ResponseRenderingOutput] val switchToWebsocket = b.add(Flow[ResponseRenderingOutput]
.collect { case _: ResponseRenderingOutput.SwitchToWebsocket SwitchToWebsocketToken }) .collect { case _: ResponseRenderingOutput.SwitchToWebsocket SwitchToWebsocketToken })
@ -140,10 +141,9 @@ private[http] object HttpServerBluePrint {
val protocolRouter = b.add(WebsocketSwitchRouter) val protocolRouter = b.add(WebsocketSwitchRouter)
val protocolMerge = b.add(new WebsocketMerge(ws.installHandler, settings.websocketRandomFactory, log)) val protocolMerge = b.add(new WebsocketMerge(ws.installHandler, settings.websocketRandomFactory, log))
val wsSwitchTokenMerge = b.add(WsSwitchTokenMerge) val wsSwitchTokenMerge = b.add(WsSwitchTokenMerge)
switchTokenBroadcast ~> switchToWebsocket switchTokenBroadcast ~> switchToWebsocket ~> wsSwitchTokenMerge.in1
protocolRouter.out0 ~> http ~> switchTokenBroadcast ~> protocolMerge.in0 protocolRouter.out0 ~> http ~> switchTokenBroadcast ~> protocolMerge.in0
protocolRouter.out1 ~> websocket ~> protocolMerge.in1 protocolRouter.out1 ~> websocket ~> protocolMerge.in1
switchToWebsocket ~> wsSwitchTokenMerge.in1
wsSwitchTokenMerge.out ~> protocolRouter.in wsSwitchTokenMerge.out ~> protocolRouter.in
// SSL/TLS // SSL/TLS
@ -169,6 +169,7 @@ private[http] object HttpServerBluePrint {
val shape = new BidiShape(requestParsingIn, requestPrepOut, httpResponseIn, responseCtxOut) val shape = new BidiShape(requestParsingIn, requestPrepOut, httpResponseIn, responseCtxOut)
def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
val pullHttpResponseIn = () pull(httpResponseIn)
var openRequests = immutable.Queue[RequestStart]() var openRequests = immutable.Queue[RequestStart]()
var oneHundredContinueResponsePending = false var oneHundredContinueResponsePending = false
var pullSuppressed = false var pullSuppressed = false
@ -218,7 +219,8 @@ private[http] object HttpServerBluePrint {
requestStart.expect100Continue && oneHundredContinueResponsePending || requestStart.expect100Continue && oneHundredContinueResponsePending ||
isClosed(requestParsingIn) && openRequests.isEmpty || isClosed(requestParsingIn) && openRequests.isEmpty ||
isEarlyResponse isEarlyResponse
push(responseCtxOut, ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close)) emit(responseCtxOut, ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close),
pullHttpResponseIn)
if (close) complete(responseCtxOut) if (close) complete(responseCtxOut)
} }
override def onUpstreamFinish() = override def onUpstreamFinish() =
@ -244,9 +246,17 @@ private[http] object HttpServerBluePrint {
} }
}) })
setHandler(responseCtxOut, new OutHandler { class ResponseCtxOutHandler extends OutHandler {
def onPull(): Unit = if (!hasBeenPulled(httpResponseIn)) pull(httpResponseIn) override def onPull() = {}
override def onDownstreamFinish() = cancel(httpResponseIn) override def onDownstreamFinish() =
cancel(httpResponseIn) // we cannot fully completeState() here as the websocket pipeline would not complete properly
}
setHandler(responseCtxOut, new ResponseCtxOutHandler {
override def onPull() = {
pull(httpResponseIn)
// after the initial pull here we only ever pull after having emitted in `onPush` of `httpResponseIn`
setHandler(responseCtxOut, new ResponseCtxOutHandler)
}
}) })
def finishWithIllegalRequestError(status: StatusCode, info: ErrorInfo): Unit = { def finishWithIllegalRequestError(status: StatusCode, info: ErrorInfo): Unit = {

View file

@ -46,7 +46,7 @@ class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
outOut.expectError(new One2OneBidiFlow.UnexpectedOutputException(3)) outOut.expectError(new One2OneBidiFlow.UnexpectedOutputException(3))
} }
"drop surplus output elements" in new Test() { "fully propagate cancellation" in new Test() {
inIn.sendNext(1) inIn.sendNext(1)
inOut.requestNext() should ===(1) inOut.requestNext() should ===(1)
@ -55,6 +55,9 @@ class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
outOut.cancel() outOut.cancel()
outIn.expectCancellation() outIn.expectCancellation()
inOut.cancel()
inIn.expectCancellation()
} }
"backpressure the input side if the maximum number of pending output elements has been reached" in { "backpressure the input side if the maximum number of pending output elements has been reached" in {

View file

@ -22,8 +22,6 @@ object One2OneBidiFlow {
* for every input element. * for every input element.
* 3. Backpressures the input side if the maximum number of pending output elements has been reached, * 3. Backpressures the input side if the maximum number of pending output elements has been reached,
* which is given via the ``maxPending`` parameter. You can use -1 to disable this feature. * which is given via the ``maxPending`` parameter. You can use -1 to disable this feature.
* 4. Drops surplus output elements, i.e. ones that the inner flow tries to produce after the input stream
* has signalled completion. Note that no error is triggered in this case!
*/ */
def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, Unit] = def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, Unit] =
BidiFlow.fromGraph(new One2OneBidi[I, O](maxPending)) BidiFlow.fromGraph(new One2OneBidi[I, O](maxPending))
@ -39,7 +37,7 @@ object One2OneBidiFlow {
override def createLogic(effectiveAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(effectiveAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var pending = 0 private var pending = 0
private var pullsSuppressed = 0 private var pullSuppressed = false
setHandler(inIn, new InHandler { setHandler(inIn, new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
@ -52,7 +50,7 @@ object One2OneBidiFlow {
setHandler(inOut, new OutHandler { setHandler(inOut, new OutHandler {
override def onPull(): Unit = override def onPull(): Unit =
if (pending < maxPending || maxPending == -1) pull(inIn) if (pending < maxPending || maxPending == -1) pull(inIn)
else pullsSuppressed += 1 else pullSuppressed = true
override def onDownstreamFinish(): Unit = cancel(inIn) override def onDownstreamFinish(): Unit = cancel(inIn)
}) })
@ -62,8 +60,8 @@ object One2OneBidiFlow {
if (pending > 0) { if (pending > 0) {
pending -= 1 pending -= 1
push(outOut, element) push(outOut, element)
if (pullsSuppressed > 0) { if (pullSuppressed) {
pullsSuppressed -= 1 pullSuppressed = false
pull(inIn) pull(inIn)
} }
} else throw new UnexpectedOutputException(element) } else throw new UnexpectedOutputException(element)

View file

@ -373,7 +373,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/** /**
* Signals that there will be no more elements emitted on the given port. * Signals that there will be no more elements emitted on the given port.
*/ */
final protected def complete[T](out: Outlet[T]): Unit = interpreter.complete(conn(out)) final protected def complete[T](out: Outlet[T]): Unit =
getHandler(out) match {
case e: Emitting[_] e.addFollowUp(new EmittingCompletion(e.out, e.previous))
case _ interpreter.complete(conn(out))
}
/** /**
* Signals failure through the given port. * Signals failure through the given port.