=htc replace merge + zip in server pipeline with new FlexiMerge

This commit is contained in:
Mathias 2014-11-14 12:04:35 +01:00
parent eaa9f52891
commit 2d9a49d010

View file

@ -9,10 +9,9 @@ import akka.stream.io.StreamTcp
import akka.stream.FlattenStrategy import akka.stream.FlattenStrategy
import akka.stream.FlowMaterializer import akka.stream.FlowMaterializer
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.model.{ StatusCode, ErrorInfo, HttpRequest, HttpResponse, HttpMethods } import akka.http.model.{ HttpRequest, HttpResponse, HttpMethods }
import akka.http.engine.parsing.ParserOutput._ import akka.http.engine.parsing.ParserOutput._
import akka.http.Http import akka.http.Http
import akka.http.util._ import akka.http.util._
@ -51,7 +50,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda
val pipeline = FlowGraph { implicit b val pipeline = FlowGraph { implicit b
val bypassFanout = Broadcast[(RequestOutput, Source[RequestOutput])]("bypassFanout") val bypassFanout = Broadcast[(RequestOutput, Source[RequestOutput])]("bypassFanout")
val bypassFanin = Merge[Any]("merge") val bypassMerge = new BypassMerge
val rootParsePipeline = val rootParsePipeline =
Flow[ByteString] Flow[ByteString]
@ -60,8 +59,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda
.headAndTail .headAndTail
val rendererPipeline = val rendererPipeline =
Flow[Any] Flow[ResponseRenderingContext]
.transform("applyApplicationBypass", () applyApplicationBypass)
.transform("renderer", () responseRendererFactory.newRenderer) .transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat) .flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error")) .transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
@ -79,58 +77,54 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda
//FIXME: the graph is unnecessary after fixing #15957 //FIXME: the graph is unnecessary after fixing #15957
networkIn ~> rootParsePipeline ~> bypassFanout ~> requestTweaking ~> userIn networkIn ~> rootParsePipeline ~> bypassFanout ~> requestTweaking ~> userIn
bypassFanout ~> bypass ~> bypassFanin bypassFanout ~> bypass ~> bypassMerge.bypassInput
userOut ~> bypassFanin ~> rendererPipeline ~> networkOut userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> networkOut
}.run() }.run()
Http.IncomingConnection(tcpConn.remoteAddress, pipeline.get(userIn), pipeline.get(userOut)) Http.IncomingConnection(tcpConn.remoteAddress, pipeline.get(userIn), pipeline.get(userOut))
} }
/** // A special merge that works similarly to a combined `zip` + `map`
* Combines the HttpResponse coming in from the application with the ParserOutput.RequestStart // with the exception that certain elements on the bypass input of the `zip` (ParseErrors) cause
* produced by the request parser into a ResponseRenderingContext. // an immediate emitting of an element to downstream, without waiting for the applicationInput
* If the parser produced a ParserOutput.ParseError the error response is immediately dispatched to downstream. class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") {
*/ import FlexiMerge._
def applyApplicationBypass = val bypassInput = createInputPort[MessageStart with RequestOutput]()
new PushStage[Any, ResponseRenderingContext] { val applicationInput = createInputPort[HttpResponse]()
var applicationResponse: HttpResponse = _
def createMergeLogic() = new MergeLogic[ResponseRenderingContext] {
var requestStart: RequestStart = _ var requestStart: RequestStart = _
override def onPush(elem: Any, ctx: Context[ResponseRenderingContext]): Directive = elem match { override def inputHandles(inputCount: Int) = {
case response: HttpResponse require(inputCount == 2, s"BypassMerge must have two connected inputs, was $inputCount")
requestStart match { Vector(bypassInput, applicationInput)
case null
applicationResponse = response
ctx.pull()
case x: RequestStart
requestStart = null
ctx.push(dispatch(x, response))
}
case requestStart: RequestStart
applicationResponse match {
case null
this.requestStart = requestStart
ctx.pull()
case response
applicationResponse = null
ctx.push(dispatch(requestStart, response))
}
case ParseError(status, info)
ctx.push(errorResponse(status, info))
} }
def dispatch(requestStart: RequestStart, response: HttpResponse): ResponseRenderingContext = { override def initialState = readBypass
import requestStart._
ResponseRenderingContext(response, method, protocol, closeAfterResponseCompletion) val readBypass = State[MessageStart with RequestOutput](Read(bypassInput)) {
case (ctx, _, rs: RequestStart)
requestStart = rs
readApplicationInput
case (ctx, _, ParseError(status, info))
log.warning("Illegal request, responding with status '{}': {}", status, info.formatPretty)
val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary
ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true)
ctx.complete() // shouldn't this return a `State` rather than `Unit`?
SameState // it seems weird to stay in the same state after completion
} }
def errorResponse(status: StatusCode, info: ErrorInfo): ResponseRenderingContext = { val readApplicationInput: State[HttpResponse] =
log.warning("Illegal request, responding with status '{}': {}", status, info.formatPretty) State[HttpResponse](Read(applicationInput)) { (ctx, _, response)
val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary ctx.emit(ResponseRenderingContext(response, requestStart.method, requestStart.protocol,
ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true) requestStart.closeAfterResponseCompletion))
} requestStart = null
readBypass
}
override def initialCompletionHandling = eagerClose
} }
} }
}