=htc #16574 fix large requests not being consumable on the server-side

This commit is contained in:
Mathias 2014-12-19 14:48:00 +01:00
parent 0f0ce513ff
commit 166be32ce6

View file

@ -67,6 +67,14 @@ private[http] object HttpServer {
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
// we need to make sure that only one element per incoming request is queueing up in front of
// the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout
// would eventually prevent us from reading the remaining request chunks from the transportIn
val bypass = Flow[RequestOutput].filter {
case (_: RequestStart | _: MessageStartError) true
case _ false
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.section(name("recover"))(_.transform(() new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed
@ -82,7 +90,7 @@ private[http] object HttpServer {
Flow() { implicit b
//FIXME: the graph is unnecessary after fixing #15957
transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut
bypassFanout ~> bypassMerge.bypassInput
bypassFanout ~> bypass ~> bypassMerge.bypassInput
oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput
b.allowCycles()
@ -107,7 +115,7 @@ private[http] object HttpServer {
override val initialState = State[Any](Read(bypassInput)) {
case (ctx, _, requestStart: RequestStart) waitingForApplicationResponse(requestStart)
case (ctx, _, MessageStartError(status, info)) finishWithError(ctx, "request", status, info)
case _ SameState // drop other parser output
case _ throw new IllegalStateException
}
def waitingForApplicationResponse(requestStart: RequestStart): State[Any] =