From 2d9a49d010f56dcbb26960a912c2c636d730a16d Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 14 Nov 2014 12:04:35 +0100 Subject: [PATCH] =htc replace `merge` + `zip` in server pipeline with new `FlexiMerge` --- .../engine/server/HttpServerPipeline.scala | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala index 5ed43394ef..7b2a930f26 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala @@ -9,10 +9,9 @@ import akka.stream.io.StreamTcp import akka.stream.FlattenStrategy import akka.stream.FlowMaterializer import akka.stream.scaladsl._ -import akka.stream.stage._ import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } -import akka.http.model.{ StatusCode, ErrorInfo, HttpRequest, HttpResponse, HttpMethods } +import akka.http.model.{ HttpRequest, HttpResponse, HttpMethods } import akka.http.engine.parsing.ParserOutput._ import akka.http.Http import akka.http.util._ @@ -51,7 +50,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda val pipeline = FlowGraph { implicit b ⇒ val bypassFanout = Broadcast[(RequestOutput, Source[RequestOutput])]("bypassFanout") - val bypassFanin = Merge[Any]("merge") + val bypassMerge = new BypassMerge val rootParsePipeline = Flow[ByteString] @@ -60,8 +59,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda .headAndTail val rendererPipeline = - Flow[Any] - .transform("applyApplicationBypass", () ⇒ applyApplicationBypass) + Flow[ResponseRenderingContext] .transform("renderer", () ⇒ responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) @@ -79,58 +77,54 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda //FIXME: the graph is unnecessary after fixing #15957 networkIn ~> rootParsePipeline ~> bypassFanout ~> requestTweaking ~> userIn - bypassFanout ~> bypass ~> bypassFanin - userOut ~> bypassFanin ~> rendererPipeline ~> networkOut + bypassFanout ~> bypass ~> bypassMerge.bypassInput + userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> networkOut }.run() Http.IncomingConnection(tcpConn.remoteAddress, pipeline.get(userIn), pipeline.get(userOut)) } - /** - * Combines the HttpResponse coming in from the application with the ParserOutput.RequestStart - * produced by the request parser into a ResponseRenderingContext. - * If the parser produced a ParserOutput.ParseError the error response is immediately dispatched to downstream. - */ - def applyApplicationBypass = - new PushStage[Any, ResponseRenderingContext] { - var applicationResponse: HttpResponse = _ + // A special merge that works similarly to a combined `zip` + `map` + // with the exception that certain elements on the bypass input of the `zip` (ParseErrors) cause + // an immediate emitting of an element to downstream, without waiting for the applicationInput + class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") { + import FlexiMerge._ + val bypassInput = createInputPort[MessageStart with RequestOutput]() + val applicationInput = createInputPort[HttpResponse]() + + def createMergeLogic() = new MergeLogic[ResponseRenderingContext] { var requestStart: RequestStart = _ - override def onPush(elem: Any, ctx: Context[ResponseRenderingContext]): Directive = elem match { - case response: HttpResponse ⇒ - requestStart match { - case null ⇒ - applicationResponse = response - ctx.pull() - case x: RequestStart ⇒ - requestStart = null - ctx.push(dispatch(x, response)) - } - - case requestStart: RequestStart ⇒ - applicationResponse match { - case null ⇒ - this.requestStart = requestStart - ctx.pull() - case response ⇒ - applicationResponse = null - ctx.push(dispatch(requestStart, response)) - } - - case ParseError(status, info) ⇒ - ctx.push(errorResponse(status, info)) + override def inputHandles(inputCount: Int) = { + require(inputCount == 2, s"BypassMerge must have two connected inputs, was $inputCount") + Vector(bypassInput, applicationInput) } - def dispatch(requestStart: RequestStart, response: HttpResponse): ResponseRenderingContext = { - import requestStart._ - ResponseRenderingContext(response, method, protocol, closeAfterResponseCompletion) + override def initialState = readBypass + + val readBypass = State[MessageStart with RequestOutput](Read(bypassInput)) { + case (ctx, _, rs: RequestStart) ⇒ + requestStart = rs + readApplicationInput + + case (ctx, _, ParseError(status, info)) ⇒ + log.warning("Illegal request, responding with status '{}': {}", status, info.formatPretty) + val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary + ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true) + ctx.complete() // shouldn't this return a `State` rather than `Unit`? + SameState // it seems weird to stay in the same state after completion } - def errorResponse(status: StatusCode, info: ErrorInfo): ResponseRenderingContext = { - log.warning("Illegal request, responding with status '{}': {}", status, info.formatPretty) - val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary - ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true) - } + val readApplicationInput: State[HttpResponse] = + State[HttpResponse](Read(applicationInput)) { (ctx, _, response) ⇒ + ctx.emit(ResponseRenderingContext(response, requestStart.method, requestStart.protocol, + requestStart.closeAfterResponseCompletion)) + requestStart = null + readBypass + } + + override def initialCompletionHandling = eagerClose } -} + } +} \ No newline at end of file