=htc make server accept early incoming request stream close
This is the fix for #16510 on the server-side.
This commit is contained in:
parent
166be32ce6
commit
35d1855e08
1 changed files with 18 additions and 9 deletions
|
|
@ -107,18 +107,25 @@ private[http] object HttpServer {
|
|||
val applicationInput = createInputPort[HttpResponse]()
|
||||
|
||||
def createMergeLogic() = new MergeLogic[ResponseRenderingContext] {
|
||||
var requestStart: RequestStart = _
|
||||
|
||||
override def inputHandles(inputCount: Int) = {
|
||||
require(inputCount == 3, s"BypassMerge must have 3 connected inputs, was $inputCount")
|
||||
Vector(bypassInput, oneHundredContinueInput, applicationInput)
|
||||
}
|
||||
|
||||
override val initialState = State[Any](Read(bypassInput)) {
|
||||
case (ctx, _, requestStart: RequestStart) ⇒ waitingForApplicationResponse(requestStart)
|
||||
override val initialState: State[Any] = State[Any](Read(bypassInput)) {
|
||||
case (ctx, _, requestStart: RequestStart) ⇒
|
||||
this.requestStart = requestStart
|
||||
ctx.changeCompletionHandling(waitingForApplicationResponseCompletionHandling)
|
||||
waitingForApplicationResponse
|
||||
case (ctx, _, MessageStartError(status, info)) ⇒ finishWithError(ctx, "request", status, info)
|
||||
case _ ⇒ throw new IllegalStateException
|
||||
}
|
||||
|
||||
def waitingForApplicationResponse(requestStart: RequestStart): State[Any] =
|
||||
override val initialCompletionHandling = eagerClose
|
||||
|
||||
val waitingForApplicationResponse =
|
||||
State[Any](ReadAny(oneHundredContinueInput, applicationInput)) {
|
||||
case (ctx, _, response: HttpResponse) ⇒
|
||||
// see the comment on [[OneHundredContinue]] for an explanation of the closing logic here (and more)
|
||||
|
|
@ -129,18 +136,20 @@ private[http] object HttpServer {
|
|||
case (ctx, _, OneHundredContinue) ⇒
|
||||
assert(requestStart.expect100ContinueResponsePending)
|
||||
ctx.emit(ResponseRenderingContext(HttpResponse(StatusCodes.Continue)))
|
||||
waitingForApplicationResponse(requestStart.copy(expect100ContinueResponsePending = false))
|
||||
requestStart = requestStart.copy(expect100ContinueResponsePending = false)
|
||||
SameState
|
||||
}
|
||||
|
||||
override def initialCompletionHandling = CompletionHandling(
|
||||
onComplete = (ctx, _) ⇒ { ctx.complete(); SameState },
|
||||
val waitingForApplicationResponseCompletionHandling = CompletionHandling(
|
||||
onComplete = {
|
||||
case (ctx, `bypassInput`) ⇒ { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState }
|
||||
case (ctx, _) ⇒ { ctx.complete(); SameState }
|
||||
},
|
||||
onError = {
|
||||
case (ctx, _, EntityStreamException(errorInfo)) ⇒
|
||||
// the application has forwarded a request entity stream error to the response stream
|
||||
finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo)
|
||||
case (ctx, _, error) ⇒
|
||||
ctx.error(error)
|
||||
SameState
|
||||
case (ctx, _, error) ⇒ { ctx.error(error); SameState }
|
||||
})
|
||||
|
||||
def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue