Allow RestartSource.withBackoff to restart only on failures #23881 (#23911)

* Allow RestartSource.withBackoff to restart only on failures #23881
This commit is contained in:
nachinius 2017-11-13 09:47:27 -03:00 committed by Patrik Nordwall
parent bb9d3927c8
commit 53bbd5ab5f
5 changed files with 123 additions and 30 deletions

View file

@ -67,7 +67,7 @@ Java
Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams
also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts. supervision strategy*, starting a stage again when it fails or completes, each time with a growing time delay between restarts.
This pattern is useful when the stage fails or completes because some external resource is not available This pattern is useful when the stage fails or completes because some external resource is not available
and we need to give it some time to start-up again. One of the prime examples when this is useful is and we need to give it some time to start-up again. One of the prime examples when this is useful is

View file

@ -56,7 +56,6 @@ public class RestartDocTest {
Duration.apply(3, TimeUnit.SECONDS), // min backoff Duration.apply(3, TimeUnit.SECONDS), // min backoff
Duration.apply(30, TimeUnit.SECONDS), // max backoff Duration.apply(30, TimeUnit.SECONDS), // max backoff
0.2, // adds 20% "noise" to vary the intervals slightly 0.2, // adds 20% "noise" to vary the intervals slightly
() -> () ->
// Create a source from a future of a source // Create a source from a future of a source
Source.fromSourceCompletionStage( Source.fromSourceCompletionStage(

View file

@ -175,6 +175,54 @@ class RestartSpec extends StreamSpec with DefaultTimeout {
Thread.sleep((minBackoff + 100.millis).toMillis) Thread.sleep((minBackoff + 100.millis).toMillis)
created.get() should ===(1) created.get() should ===(1)
} }
"stop on completion if it should only be restarted in failures" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { ()
created.incrementAndGet()
Source(List("a", "b", "c"))
.map {
case "c" if (created.get() == 1) throw TE("failed") else "c"
case other other
}
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
// will fail, and will restart
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("c")
probe.expectComplete()
created.get() should ===(2)
probe.cancel()
}
"restart on failure when only due to failures should be restarted" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { ()
created.incrementAndGet()
Source(List("a", "b", "c"))
.map {
case "c" throw TE("failed")
case other other
}
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
created.get() should ===(3)
probe.cancel()
}
} }
"A restart with backoff sink" should { "A restart with backoff sink" should {

View file

@ -43,6 +43,33 @@ object RestartSource {
sourceFactory.create().asScala sourceFactory.create().asScala
}.asJava }.asJava
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { ()
sourceFactory.create().asScala
}.asJava
}
} }
/** /**

View file

@ -40,7 +40,31 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = { def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor)) Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false))
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true))
} }
} }
@ -48,15 +72,14 @@ private final class RestartWithBackoffSource[T](
sourceFactory: () Source[T, _], sourceFactory: () Source[T, _],
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double randomFactor: Double,
) extends GraphStage[SourceShape[T]] { self onlyOnFailures: Boolean) extends GraphStage[SourceShape[T]] { self
val out = Outlet[T]("RestartWithBackoffSource.out") val out = Outlet[T]("RestartWithBackoffSource.out")
override def shape = SourceShape(out) override def shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Source", shape, minBackoff, maxBackoff, randomFactor "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures) {
) {
override protected def logSource = self.getClass override protected def logSource = self.getClass
@ -120,15 +143,13 @@ private final class RestartWithBackoffSink[T](
sinkFactory: () Sink[T, _], sinkFactory: () Sink[T, _],
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double randomFactor: Double) extends GraphStage[SinkShape[T]] { self
) extends GraphStage[SinkShape[T]] { self
val in = Inlet[T]("RestartWithBackoffSink.in") val in = Inlet[T]("RestartWithBackoffSink.in")
override def shape = SinkShape(in) override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Sink", shape, minBackoff, maxBackoff, randomFactor "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
) {
override protected def logSource = self.getClass override protected def logSource = self.getClass
override protected def startGraph() = { override protected def startGraph() = {
@ -187,16 +208,14 @@ private final class RestartWithBackoffFlow[In, Out](
flowFactory: () Flow[In, Out, _], flowFactory: () Flow[In, Out, _],
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double randomFactor: Double) extends GraphStage[FlowShape[In, Out]] { self
) extends GraphStage[FlowShape[In, Out]] { self
val in = Inlet[In]("RestartWithBackoffFlow.in") val in = Inlet[In]("RestartWithBackoffFlow.in")
val out = Outlet[Out]("RestartWithBackoffFlow.out") val out = Outlet[Out]("RestartWithBackoffFlow.out")
override def shape = FlowShape(in, out) override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Flow", shape, minBackoff, maxBackoff, randomFactor "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
) {
var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
@ -246,8 +265,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
shape: S, shape: S,
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double randomFactor: Double,
) extends TimerGraphStageLogicWithLogging(shape) { onlyOnFailures: Boolean) extends TimerGraphStageLogicWithLogging(shape) {
var restartCount = 0 var restartCount = 0
var resetDeadline = minBackoff.fromNow var resetDeadline = minBackoff.fromNow
// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
@ -263,11 +282,11 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sinkIn.setHandler(new InHandler { sinkIn.setHandler(new InHandler {
override def onPush() = push(out, sinkIn.grab()) override def onPush() = push(out, sinkIn.grab())
override def onUpstreamFinish() = { override def onUpstreamFinish() = {
if (finishing) { if (finishing || onlyOnFailures) {
complete(out) complete(out)
} else { } else {
log.debug("Graph out finished") log.debug("Restarting graph due to finished upstream")
onCompleteOrFailure() scheduleRestartTimer()
} }
} }
override def onUpstreamFailure(ex: Throwable) = { override def onUpstreamFailure(ex: Throwable) = {
@ -275,7 +294,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
fail(out, ex) fail(out, ex)
} else { } else {
log.error(ex, "Restarting graph due to failure") log.error(ex, "Restarting graph due to failure")
onCompleteOrFailure() scheduleRestartTimer()
} }
} }
}) })
@ -307,7 +326,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
cancel(in) cancel(in)
} else { } else {
log.debug("Graph in finished") log.debug("Graph in finished")
onCompleteOrFailure() scheduleRestartTimer()
} }
} }
}) })
@ -330,7 +349,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
} }
// Set a timer to restart after the calculated delay // Set a timer to restart after the calculated delay
protected final def onCompleteOrFailure() = { protected final def scheduleRestartTimer() = {
// Check if the last start attempt was more than the minimum backoff // Check if the last start attempt was more than the minimum backoff
if (resetDeadline.isOverdue()) { if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)