Merge pull request #21259 from drewhk/wip-20933-recoverWith-nondetached-drewhk
#20933: Make RecoverWith non-detached (also eliminates race in test)
This commit is contained in:
commit
f062687a0e
1 changed files with 5 additions and 14 deletions
|
|
@ -1493,30 +1493,21 @@ final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[T
|
|||
|
||||
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
|
||||
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
|
||||
|
||||
sinkIn.setHandler(new InHandler {
|
||||
override def onPush(): Unit =
|
||||
if (isAvailable(out)) {
|
||||
push(out, sinkIn.grab())
|
||||
sinkIn.pull()
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) completeStage()
|
||||
override def onPush(): Unit = push(out, sinkIn.grab())
|
||||
override def onUpstreamFinish(): Unit = completeStage()
|
||||
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
|
||||
})
|
||||
|
||||
def pushOut(): Unit = {
|
||||
push(out, sinkIn.grab())
|
||||
if (!sinkIn.isClosed) sinkIn.pull()
|
||||
else completeStage()
|
||||
}
|
||||
|
||||
val outHandler = new OutHandler {
|
||||
override def onPull(): Unit = if (sinkIn.isAvailable) pushOut()
|
||||
override def onPull(): Unit = sinkIn.pull()
|
||||
override def onDownstreamFinish(): Unit = sinkIn.cancel()
|
||||
}
|
||||
|
||||
Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
|
||||
setHandler(out, outHandler)
|
||||
sinkIn.pull()
|
||||
if (isAvailable(out)) sinkIn.pull()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue