#20123 Added recoverWithRetries which adds a parameter to limit the (#20137)

number of attempts. Existing recoverWith method behavior remains unchanged.
This commit is contained in:
David Knapp 2016-04-20 06:24:12 -07:00 committed by Patrik Nordwall
parent e0af113f17
commit d7fe98e1c4
8 changed files with 157 additions and 7 deletions

View file

@ -1239,10 +1239,16 @@ private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraph
/**
* INTERNAL API
*/
private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
private[stream] object RecoverWith {
val InfiniteRetries = -1
}
private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.recoverWith
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
var attempt = 0
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
@ -1252,7 +1258,13 @@ private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Gra
override def onPull(): Unit = pull(in)
})
def onFailure(ex: Throwable) = if (pf.isDefinedAt(ex)) switchTo(pf(ex)) else failStage(ex)
def onFailure(ex: Throwable) =
if ((maximumRetries == RecoverWith.InfiniteRetries || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
switchTo(pf(ex))
attempt += 1
}
else
failStage(ex)
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")