add maxRestarts to RestartWithBackoff #24129

This commit is contained in:
Patrik Nordwall 2018-01-16 18:28:10 +01:00 committed by Johan Andrén
parent 32987c8704
commit e4dd3c24fc
6 changed files with 393 additions and 22 deletions

View file

@ -56,6 +56,7 @@ public class RestartDocTest {
Duration.apply(3, TimeUnit.SECONDS), // min backoff Duration.apply(3, TimeUnit.SECONDS), // min backoff
Duration.apply(30, TimeUnit.SECONDS), // max backoff Duration.apply(30, TimeUnit.SECONDS), // max backoff
0.2, // adds 20% "noise" to vary the intervals slightly 0.2, // adds 20% "noise" to vary the intervals slightly
20, // limits the amount of restarts to 20
() -> () ->
// Create a source from a future of a source // Create a source from a future of a source
Source.fromSourceCompletionStage( Source.fromSourceCompletionStage(

View file

@ -37,7 +37,8 @@ class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
val restartSource = RestartSource.withBackoff( val restartSource = RestartSource.withBackoff(
minBackoff = 3.seconds, minBackoff = 3.seconds,
maxBackoff = 30.seconds, maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
maxRestarts = 20 // limits the amount of restarts to 20
) { () ) { ()
// Create a source from a future of a source // Create a source from a future of a source
Source.fromFutureSource { Source.fromFutureSource {

View file

@ -223,6 +223,46 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
} }
"not restart the source when maxRestarts is reached" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { ()
created.incrementAndGet()
Source.single("a")
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("a")
probe.expectComplete()
created.get() should ===(2)
probe.cancel()
}
"reset maxRestarts when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { ()
created.incrementAndGet()
Source(List("a"))
}.runWith(TestSink.probe)
probe.requestNext("a")
// There should be minBackoff delay
probe.requestNext("a")
// The probe should now be backing off again with with increased backoff
// Now wait for the delay to pass, then it will start the new source, we also want to wait for the
// subsequent backoff to pass
Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis)
probe.requestNext("a")
// We now are able to trigger the third restart, since enough time has elapsed to reset the counter
probe.requestNext("a")
created.get() should ===(4)
probe.cancel()
}
} }
"A restart with backoff sink" should { "A restart with backoff sink" should {
@ -354,11 +394,66 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
sinkProbe.cancel() sinkProbe.cancel()
} }
"not restart the sink when maxRestarts is reached" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { ()
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true)
.to(Sink.foreach(queue.sendNext))
})(Keep.left).run()
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
probe.expectCancellation()
created.get() should ===(2)
sinkProbe.cancel()
probe.sendComplete()
}
"reset maxRestarts when sink runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { ()
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true)
.to(Sink.foreach(queue.sendNext))
})(Keep.left).run()
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// There should be a minBackoff delay
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// The probe should now be backing off for 2 * minBackoff
// Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the
// subsequent minBackoff min backoff to pass, so it resets the restart count
Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis)
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// We now are able to trigger the third restart, since enough time has elapsed to reset the counter
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
created.get() should ===(4)
sinkProbe.cancel()
probe.sendComplete()
}
} }
"A restart with backoff flow" should { "A restart with backoff flow" should {
def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration) = { def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1) = {
val created = new AtomicInteger() val created = new AtomicInteger()
val (flowInSource, flowInProbe) = TestSource.probe[String] val (flowInSource, flowInProbe) = TestSource.probe[String]
.buffer(4, OverflowStrategy.backpressure) .buffer(4, OverflowStrategy.backpressure)
@ -367,7 +462,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
// We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we
// simply use the probes as a message bus for feeding and capturing events. // simply use the probes as a message bus for feeding and capturing events.
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0) { () val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0, maxRestarts) { ()
created.incrementAndGet() created.incrementAndGet()
Flow.fromSinkAndSource( Flow.fromSinkAndSource(
Flow[String] Flow[String]
@ -550,6 +645,28 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
created.get() should ===(1) created.get() should ===(1)
} }
"not restart on completion when maxRestarts is reached" in {
val (created, _, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff, maxRestarts = 1)
sink.request(1)
flowOutProbe.sendNext("complete")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete")
// and it should restart
sink.request(1)
flowOutProbe.sendNext("complete")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
flowInProbe.expectNext("out complete")
flowInProbe.expectNoMessage(shortMinBackoff * 3)
sink.expectComplete()
created.get() should ===(2)
}
} }
} }

View file

@ -0,0 +1,5 @@
# #24129 Add maxRestarts to RestartSource, Sink and Flow
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSink.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this")

View file

@ -38,12 +38,43 @@ object RestartSource {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { ()
sourceFactory.create().asScala sourceFactory.create().asScala
}.asJava }.asJava
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { ()
sourceFactory.create().asScala
}.asJava
}
/** /**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
* *
@ -70,6 +101,35 @@ object RestartSource {
sourceFactory.create().asScala sourceFactory.create().asScala
}.asJava }.asJava
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { ()
sourceFactory.create().asScala
}.asJava
}
} }
/** /**
@ -105,11 +165,45 @@ object RestartSink {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap. * @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/ */
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { ()
sinkFactory.create().asScala sinkFactory.create().asScala
}.asJava }.asJava
} }
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { ()
sinkFactory.create().asScala
}.asJava
}
} }
/** /**
@ -144,9 +238,42 @@ object RestartFlow {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { ()
flowFactory.create().asScala flowFactory.create().asScala
}.asJava }.asJava
} }
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { ()
flowFactory.create().asScala
}.asJava
}
} }

View file

@ -40,7 +40,34 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = { def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false)) Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue))
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts))
} }
/** /**
@ -64,7 +91,34 @@ object RestartSource {
* *
*/ */
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = { def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true)) Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, Int.MaxValue))
}
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts))
} }
} }
@ -73,13 +127,14 @@ private final class RestartWithBackoffSource[T](
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
onlyOnFailures: Boolean) extends GraphStage[SourceShape[T]] { self onlyOnFailures: Boolean,
maxRestarts: Int) extends GraphStage[SourceShape[T]] { self
val out = Outlet[T]("RestartWithBackoffSource.out") val out = Outlet[T]("RestartWithBackoffSource.out")
override def shape = SourceShape(out) override def shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures) { "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
override protected def logSource = self.getClass override protected def logSource = self.getClass
@ -135,7 +190,37 @@ object RestartSink {
* @param sinkFactory A factory for producing the [[Sink]] to wrap. * @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/ */
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () Sink[T, _]): Sink[T, NotUsed] = { def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () Sink[T, _]): Sink[T, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor)) Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue))
}
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sinkFactory: () Sink[T, _]): Sink[T, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts))
} }
} }
@ -143,13 +228,14 @@ private final class RestartWithBackoffSink[T](
sinkFactory: () Sink[T, _], sinkFactory: () Sink[T, _],
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double) extends GraphStage[SinkShape[T]] { self randomFactor: Double,
maxRestarts: Int) extends GraphStage[SinkShape[T]] { self
val in = Inlet[T]("RestartWithBackoffSink.in") val in = Inlet[T]("RestartWithBackoffSink.in")
override def shape = SinkShape(in) override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) { "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
override protected def logSource = self.getClass override protected def logSource = self.getClass
override protected def startGraph() = { override protected def startGraph() = {
@ -200,7 +286,36 @@ object RestartFlow {
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = { def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor)) Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue))
}
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, maxRestarts))
} }
} }
@ -208,14 +323,15 @@ private final class RestartWithBackoffFlow[In, Out](
flowFactory: () Flow[In, Out, _], flowFactory: () Flow[In, Out, _],
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double) extends GraphStage[FlowShape[In, Out]] { self randomFactor: Double,
maxRestarts: Int) extends GraphStage[FlowShape[In, Out]] { self
val in = Inlet[In]("RestartWithBackoffFlow.in") val in = Inlet[In]("RestartWithBackoffFlow.in")
val out = Outlet[Out]("RestartWithBackoffFlow.out") val out = Outlet[Out]("RestartWithBackoffFlow.out")
override def shape = FlowShape(in, out) override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) { "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
@ -266,7 +382,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
onlyOnFailures: Boolean) extends TimerGraphStageLogicWithLogging(shape) { onlyOnFailures: Boolean,
maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) {
var restartCount = 0 var restartCount = 0
var resetDeadline = minBackoff.fromNow var resetDeadline = minBackoff.fromNow
// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
@ -282,7 +399,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sinkIn.setHandler(new InHandler { sinkIn.setHandler(new InHandler {
override def onPush() = push(out, sinkIn.grab()) override def onPush() = push(out, sinkIn.grab())
override def onUpstreamFinish() = { override def onUpstreamFinish() = {
if (finishing || onlyOnFailures) { if (finishing || maxRestartsReached() || onlyOnFailures) {
complete(out) complete(out)
} else { } else {
log.debug("Restarting graph due to finished upstream") log.debug("Restarting graph due to finished upstream")
@ -290,7 +407,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
} }
} }
override def onUpstreamFailure(ex: Throwable) = { override def onUpstreamFailure(ex: Throwable) = {
if (finishing) { if (finishing || maxRestartsReached()) {
fail(out, ex) fail(out, ex)
} else { } else {
log.error(ex, "Restarting graph due to failure") log.error(ex, "Restarting graph due to failure")
@ -322,7 +439,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
} }
} }
override def onDownstreamFinish() = { override def onDownstreamFinish() = {
if (finishing) { if (finishing || maxRestartsReached()) {
cancel(in) cancel(in)
} else { } else {
log.debug("Graph in finished") log.debug("Graph in finished")
@ -348,14 +465,17 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sourceOut sourceOut
} }
// Set a timer to restart after the calculated delay protected final def maxRestartsReached() = {
protected final def scheduleRestartTimer() = {
// Check if the last start attempt was more than the minimum backoff // Check if the last start attempt was more than the minimum backoff
if (resetDeadline.isOverdue()) { if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)
restartCount = 0 restartCount = 0
} }
restartCount == maxRestarts
}
// Set a timer to restart after the calculated delay
protected final def scheduleRestartTimer() = {
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
log.debug("Restarting graph in {}", restartDelay) log.debug("Restarting graph in {}", restartDelay)
scheduleOnce("RestartTimer", restartDelay) scheduleOnce("RestartTimer", restartDelay)