fix HttpResponseRenderer termination

- upstream termination arriving after a pull() would lead to deadlock
- downstream pull that goes to SubSink would lead to deadlock if followed
  by SubSink completion
This commit is contained in:
Roland Kuhn 2016-01-18 20:44:12 +01:00
parent 5523e1d70f
commit dd388d838b

View file

@ -62,13 +62,13 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { new GraphStageLogic(shape) {
private[this] var closeMode: CloseMode = DontClose // signals what to do after the current response var closeMode: CloseMode = DontClose // signals what to do after the current response
private[this] def close: Boolean = closeMode != DontClose def close: Boolean = closeMode != DontClose
private[this] def closeIf(cond: Boolean): Unit = def closeIf(cond: Boolean): Unit = if (cond) closeMode = CloseConnection
if (cond) closeMode = CloseConnection var transferring = false
setHandler(in, new InHandler { setHandler(in, new InHandler {
def onPush(): Unit = override def onPush(): Unit =
render(grab(in)) match { render(grab(in)) match {
case Strict(outElement) case Strict(outElement)
push(out, outElement) push(out, outElement)
@ -76,23 +76,36 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
case Streamed(outStream) transfer(outStream) case Streamed(outStream) transfer(outStream)
} }
override def onUpstreamFinish(): Unit = closeMode = CloseConnection override def onUpstreamFinish(): Unit =
if (transferring) closeMode = CloseConnection
else completeStage()
}) })
val waitForDemandHandler = new OutHandler { val waitForDemandHandler = new OutHandler {
def onPull(): Unit = if (close) completeStage() else pull(in) def onPull(): Unit = pull(in)
} }
setHandler(out, waitForDemandHandler) setHandler(out, waitForDemandHandler)
def transfer(outStream: Source[ResponseRenderingOutput, Any]): Unit = { def transfer(outStream: Source[ResponseRenderingOutput, Any]): Unit = {
transferring = true
val sinkIn = new SubSinkInlet[ResponseRenderingOutput]("RenderingSink") val sinkIn = new SubSinkInlet[ResponseRenderingOutput]("RenderingSink")
sinkIn.setHandler(new InHandler { sinkIn.setHandler(new InHandler {
def onPush(): Unit = push(out, sinkIn.grab()) override def onPush(): Unit = push(out, sinkIn.grab())
override def onUpstreamFinish(): Unit = if (close) completeStage() else setHandler(out, waitForDemandHandler) override def onUpstreamFinish(): Unit =
if (close) completeStage()
else {
transferring = false
setHandler(out, waitForDemandHandler)
if (isAvailable(out)) pull(in)
}
}) })
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
def onPull(): Unit = sinkIn.pull() override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = {
completeStage()
sinkIn.cancel()
}
}) })
sinkIn.pull() sinkIn.pull()
Source.fromGraph(outStream).runWith(sinkIn.sink)(interpreter.subFusingMaterializer) outStream.runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
} }
def render(ctx: ResponseRenderingContext): StrictOrStreamed = { def render(ctx: ResponseRenderingContext): StrictOrStreamed = {