diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 6f1a0d6288..0b70c0a281 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -107,18 +107,25 @@ private[http] object HttpServer { val applicationInput = createInputPort[HttpResponse]() def createMergeLogic() = new MergeLogic[ResponseRenderingContext] { + var requestStart: RequestStart = _ + override def inputHandles(inputCount: Int) = { require(inputCount == 3, s"BypassMerge must have 3 connected inputs, was $inputCount") Vector(bypassInput, oneHundredContinueInput, applicationInput) } - override val initialState = State[Any](Read(bypassInput)) { - case (ctx, _, requestStart: RequestStart) ⇒ waitingForApplicationResponse(requestStart) + override val initialState: State[Any] = State[Any](Read(bypassInput)) { + case (ctx, _, requestStart: RequestStart) ⇒ + this.requestStart = requestStart + ctx.changeCompletionHandling(waitingForApplicationResponseCompletionHandling) + waitingForApplicationResponse case (ctx, _, MessageStartError(status, info)) ⇒ finishWithError(ctx, "request", status, info) case _ ⇒ throw new IllegalStateException } - def waitingForApplicationResponse(requestStart: RequestStart): State[Any] = + override val initialCompletionHandling = eagerClose + + val waitingForApplicationResponse = State[Any](ReadAny(oneHundredContinueInput, applicationInput)) { case (ctx, _, response: HttpResponse) ⇒ // see the comment on [[OneHundredContinue]] for an explanation of the closing logic here (and more) @@ -129,18 +136,20 @@ private[http] object HttpServer { case (ctx, _, OneHundredContinue) ⇒ assert(requestStart.expect100ContinueResponsePending) ctx.emit(ResponseRenderingContext(HttpResponse(StatusCodes.Continue))) - waitingForApplicationResponse(requestStart.copy(expect100ContinueResponsePending = false)) + requestStart = requestStart.copy(expect100ContinueResponsePending = false) + SameState } - override def initialCompletionHandling = CompletionHandling( - onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, + val waitingForApplicationResponseCompletionHandling = CompletionHandling( + onComplete = { + case (ctx, `bypassInput`) ⇒ { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState } + case (ctx, _) ⇒ { ctx.complete(); SameState } + }, onError = { case (ctx, _, EntityStreamException(errorInfo)) ⇒ // the application has forwarded a request entity stream error to the response stream finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo) - case (ctx, _, error) ⇒ - ctx.error(error) - SameState + case (ctx, _, error) ⇒ { ctx.error(error); SameState } }) def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {