#20933: Make RecoverWith non-detached (also eliminates race in test)

This commit is contained in:
Endre Sándor Varga 2016-08-23 13:13:11 +02:00
parent 0d40f61dc4
commit 32455d3990

View file

@ -1404,30 +1404,21 @@ final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[T
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit =
if (isAvailable(out)) {
push(out, sinkIn.grab())
sinkIn.pull()
}
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) completeStage()
override def onPush(): Unit = push(out, sinkIn.grab())
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
})
def pushOut(): Unit = {
push(out, sinkIn.grab())
if (!sinkIn.isClosed) sinkIn.pull()
else completeStage()
}
val outHandler = new OutHandler {
override def onPull(): Unit = if (sinkIn.isAvailable) pushOut()
override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = sinkIn.cancel()
}
Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
setHandler(out, outHandler)
sinkIn.pull()
if (isAvailable(out)) sinkIn.pull()
}
}