diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index da46440efd..e2422aaf39 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1404,30 +1404,21 @@ final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[T def switchTo(source: Graph[SourceShape[T], M]): Unit = { val sinkIn = new SubSinkInlet[T]("RecoverWithSink") + sinkIn.setHandler(new InHandler { - override def onPush(): Unit = - if (isAvailable(out)) { - push(out, sinkIn.grab()) - sinkIn.pull() - } - override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) completeStage() + override def onPush(): Unit = push(out, sinkIn.grab()) + override def onUpstreamFinish(): Unit = completeStage() override def onUpstreamFailure(ex: Throwable) = onFailure(ex) }) - def pushOut(): Unit = { - push(out, sinkIn.grab()) - if (!sinkIn.isClosed) sinkIn.pull() - else completeStage() - } - val outHandler = new OutHandler { - override def onPull(): Unit = if (sinkIn.isAvailable) pushOut() + override def onPull(): Unit = sinkIn.pull() override def onDownstreamFinish(): Unit = sinkIn.cancel() } Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer) setHandler(out, outHandler) - sinkIn.pull() + if (isAvailable(out)) sinkIn.pull() } }