From e4dd3c24fccfe2b0b8cfae6941f000c2feca435c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 16 Jan 2018 18:28:10 +0100 Subject: [PATCH] add maxRestarts to RestartWithBackoff #24129 --- .../java/jdocs/stream/RestartDocTest.java | 1 + .../scala/docs/stream/RestartDocSpec.scala | 3 +- .../akka/stream/scaladsl/RestartSpec.scala | 121 +++++++++++++- .../mima-filters/2.5.9.backwards.excludes | 5 + .../scala/akka/stream/javadsl/Restart.scala | 133 ++++++++++++++- .../scala/akka/stream/scaladsl/Restart.scala | 152 ++++++++++++++++-- 6 files changed, 393 insertions(+), 22 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.5.9.backwards.excludes diff --git a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java index 615a1a5d79..7eacf8ab06 100644 --- a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java @@ -56,6 +56,7 @@ public class RestartDocTest { Duration.apply(3, TimeUnit.SECONDS), // min backoff Duration.apply(30, TimeUnit.SECONDS), // max backoff 0.2, // adds 20% "noise" to vary the intervals slightly + 20, // limits the amount of restarts to 20 () -> // Create a source from a future of a source Source.fromSourceCompletionStage( diff --git a/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala b/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala index 2e9dd531f3..681089d3cb 100644 --- a/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala @@ -37,7 +37,8 @@ class RestartDocSpec extends AkkaSpec with CompileOnlySpec { val restartSource = RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly + maxRestarts = 20 // limits the amount of restarts to 20 ) { () ⇒ // Create a source from a future of a source Source.fromFutureSource { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 6b790676c3..658c327c24 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -223,6 +223,46 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 } + "not restart the source when maxRestarts is reached" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { () ⇒ + created.incrementAndGet() + Source.single("a") + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("a") + probe.expectComplete() + + created.get() should ===(2) + + probe.cancel() + } + + "reset maxRestarts when source runs for at least minimum backoff without completing" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { () ⇒ + created.incrementAndGet() + Source(List("a")) + }.runWith(TestSink.probe) + + probe.requestNext("a") + // There should be minBackoff delay + probe.requestNext("a") + // The probe should now be backing off again with with increased backoff + + // Now wait for the delay to pass, then it will start the new source, we also want to wait for the + // subsequent backoff to pass + Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis) + + probe.requestNext("a") + // We now are able to trigger the third restart, since enough time has elapsed to reset the counter + probe.requestNext("a") + + created.get() should ===(4) + + probe.cancel() + } } "A restart with backoff sink" should { @@ -354,11 +394,66 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 sinkProbe.cancel() } + + "not restart the sink when maxRestarts is reached" in assertAllStagesStopped { + val created = new AtomicInteger() + val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { () ⇒ + created.incrementAndGet() + Flow[String].takeWhile(_ != "cancel", inclusive = true) + .to(Sink.foreach(queue.sendNext)) + })(Keep.left).run() + + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + + probe.expectCancellation() + + created.get() should ===(2) + + sinkProbe.cancel() + probe.sendComplete() + } + + "reset maxRestarts when sink runs for at least minimum backoff without completing" in assertAllStagesStopped { + val created = new AtomicInteger() + val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { () ⇒ + created.incrementAndGet() + Flow[String].takeWhile(_ != "cancel", inclusive = true) + .to(Sink.foreach(queue.sendNext)) + })(Keep.left).run() + + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + // There should be a minBackoff delay + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + // The probe should now be backing off for 2 * minBackoff + + // Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the + // subsequent minBackoff min backoff to pass, so it resets the restart count + Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis) + + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + + // We now are able to trigger the third restart, since enough time has elapsed to reset the counter + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + + created.get() should ===(4) + + sinkProbe.cancel() + probe.sendComplete() + } } "A restart with backoff flow" should { - def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration) = { + def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1) = { val created = new AtomicInteger() val (flowInSource, flowInProbe) = TestSource.probe[String] .buffer(4, OverflowStrategy.backpressure) @@ -367,7 +462,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we // simply use the probes as a message bus for feeding and capturing events. - val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ + val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0, maxRestarts) { () ⇒ created.incrementAndGet() Flow.fromSinkAndSource( Flow[String] @@ -550,6 +645,28 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 created.get() should ===(1) } + "not restart on completion when maxRestarts is reached" in { + val (created, _, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff, maxRestarts = 1) + + sink.request(1) + flowOutProbe.sendNext("complete") + + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.request(2) + Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete") + + // and it should restart + sink.request(1) + flowOutProbe.sendNext("complete") + + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.request(2) + flowInProbe.expectNext("out complete") + flowInProbe.expectNoMessage(shortMinBackoff * 3) + sink.expectComplete() + + created.get() should ===(2) + } } } diff --git a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes new file mode 100644 index 0000000000..6cc828a75b --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes @@ -0,0 +1,5 @@ +# #24129 Add maxRestarts to RestartSource, Sink and Flow +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSink.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala index 0a3dd88c14..01723e9c28 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala @@ -38,12 +38,43 @@ object RestartSource { * In order to skip this additional delay pass in `0`. * @param sourceFactory A factory for producing the [[Source]] to wrap. */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ sourceFactory.create().asScala }.asJava } + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sourceFactory.create().asScala + }.asJava + } + /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. * @@ -70,6 +101,35 @@ object RestartSource { sourceFactory.create().asScala }.asJava } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by + * introducing a [[KillSwitch]] right after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sourceFactory.create().asScala + }.asJava + } } /** @@ -105,11 +165,45 @@ object RestartSink { * In order to skip this additional delay pass in `0`. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ sinkFactory.create().asScala }.asJava } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] + * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into + * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. + * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right + * before this [[Sink]] in the graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sinkFactory.create().asScala + }.asJava + } } /** @@ -144,9 +238,42 @@ object RestartFlow { * In order to skip this additional delay pass in `0`. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ - def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ flowFactory.create().asScala }.asJava } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts + * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's + * running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + flowFactory.create().asScala + }.asJava + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala index d991d6bc01..6727de91a1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala @@ -40,7 +40,34 @@ object RestartSource { * @param sourceFactory A factory for producing the [[Source]] to wrap. */ def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false)) + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue)) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts)) } /** @@ -64,7 +91,34 @@ object RestartSource { * */ def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true)) + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, Int.MaxValue)) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts)) } } @@ -73,13 +127,14 @@ private final class RestartWithBackoffSource[T]( minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - onlyOnFailures: Boolean) extends GraphStage[SourceShape[T]] { self ⇒ + onlyOnFailures: Boolean, + maxRestarts: Int) extends GraphStage[SourceShape[T]] { self ⇒ val out = Outlet[T]("RestartWithBackoffSource.out") override def shape = SourceShape(out) override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures) { + "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) { override protected def logSource = self.getClass @@ -135,7 +190,37 @@ object RestartSink { * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { - Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor)) + Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue)) + } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] + * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into + * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. + * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right + * before this [[Sink]] in the graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { + Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts)) } } @@ -143,13 +228,14 @@ private final class RestartWithBackoffSink[T]( sinkFactory: () ⇒ Sink[T, _], minBackoff: FiniteDuration, maxBackoff: FiniteDuration, - randomFactor: Double) extends GraphStage[SinkShape[T]] { self ⇒ + randomFactor: Double, + maxRestarts: Int) extends GraphStage[SinkShape[T]] { self ⇒ val in = Inlet[T]("RestartWithBackoffSink.in") override def shape = SinkShape(in) override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) { + "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) { override protected def logSource = self.getClass override protected def startGraph() = { @@ -200,7 +286,36 @@ object RestartFlow { * @param flowFactory A factory for producing the [[Flow]] to wrap. */ def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () ⇒ Flow[In, Out, _]): Flow[In, Out, NotUsed] = { - Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor)) + Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue)) + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts + * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's + * running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(flowFactory: () ⇒ Flow[In, Out, _]): Flow[In, Out, NotUsed] = { + Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, maxRestarts)) } } @@ -208,14 +323,15 @@ private final class RestartWithBackoffFlow[In, Out]( flowFactory: () ⇒ Flow[In, Out, _], minBackoff: FiniteDuration, maxBackoff: FiniteDuration, - randomFactor: Double) extends GraphStage[FlowShape[In, Out]] { self ⇒ + randomFactor: Double, + maxRestarts: Int) extends GraphStage[FlowShape[In, Out]] { self ⇒ val in = Inlet[In]("RestartWithBackoffFlow.in") val out = Outlet[Out]("RestartWithBackoffFlow.out") override def shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) { + "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) { var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None @@ -266,7 +382,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - onlyOnFailures: Boolean) extends TimerGraphStageLogicWithLogging(shape) { + onlyOnFailures: Boolean, + maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) { var restartCount = 0 var resetDeadline = minBackoff.fromNow // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we @@ -282,7 +399,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( sinkIn.setHandler(new InHandler { override def onPush() = push(out, sinkIn.grab()) override def onUpstreamFinish() = { - if (finishing || onlyOnFailures) { + if (finishing || maxRestartsReached() || onlyOnFailures) { complete(out) } else { log.debug("Restarting graph due to finished upstream") @@ -290,7 +407,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( } } override def onUpstreamFailure(ex: Throwable) = { - if (finishing) { + if (finishing || maxRestartsReached()) { fail(out, ex) } else { log.error(ex, "Restarting graph due to failure") @@ -322,7 +439,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( } } override def onDownstreamFinish() = { - if (finishing) { + if (finishing || maxRestartsReached()) { cancel(in) } else { log.debug("Graph in finished") @@ -348,14 +465,17 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( sourceOut } - // Set a timer to restart after the calculated delay - protected final def scheduleRestartTimer() = { + protected final def maxRestartsReached() = { // Check if the last start attempt was more than the minimum backoff if (resetDeadline.isOverdue()) { log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) restartCount = 0 } + restartCount == maxRestarts + } + // Set a timer to restart after the calculated delay + protected final def scheduleRestartTimer() = { val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) log.debug("Restarting graph in {}", restartDelay) scheduleOnce("RestartTimer", restartDelay)