diff --git a/akka-docs/src/main/paradox/stream/stream-error.md b/akka-docs/src/main/paradox/stream/stream-error.md index 26864fd69c..13f4d06f71 100644 --- a/akka-docs/src/main/paradox/stream/stream-error.md +++ b/akka-docs/src/main/paradox/stream/stream-error.md @@ -67,7 +67,7 @@ Java Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff -supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts. +supervision strategy*, starting a stage again when it fails or completes, each time with a growing time delay between restarts. This pattern is useful when the stage fails or completes because some external resource is not available and we need to give it some time to start-up again. One of the prime examples when this is useful is diff --git a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java index bd909620c2..5459a5a19f 100644 --- a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java @@ -56,7 +56,6 @@ public class RestartDocTest { Duration.apply(3, TimeUnit.SECONDS), // min backoff Duration.apply(30, TimeUnit.SECONDS), // max backoff 0.2, // adds 20% "noise" to vary the intervals slightly - () -> // Create a source from a future of a source Source.fromSourceCompletionStage( diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 3793a8cf1f..91b7dbf771 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -175,6 +175,54 @@ class RestartSpec extends StreamSpec with DefaultTimeout { Thread.sleep((minBackoff + 100.millis).toMillis) created.get() should ===(1) } + + "stop on completion if it should only be restarted in failures" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ + created.incrementAndGet() + Source(List("a", "b", "c")) + .map { + case "c" ⇒ if (created.get() == 1) throw TE("failed") else "c" + case other ⇒ other + } + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("b") + // will fail, and will restart + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("c") + probe.expectComplete() + + created.get() should ===(2) + + probe.cancel() + } + + "restart on failure when only due to failures should be restarted" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { () ⇒ + created.incrementAndGet() + Source(List("a", "b", "c")) + .map { + case "c" ⇒ throw TE("failed") + case other ⇒ other + } + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + + created.get() should ===(3) + + probe.cancel() + + } + } "A restart with backoff sink" should { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala index 42d36bcf53..bc0e988b57 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala @@ -43,6 +43,33 @@ object RestartSource { sourceFactory.create().asScala }.asJava } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by + * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + sourceFactory.create().asScala + }.asJava + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala index 329e3eb583..8df82e6c92 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala @@ -40,23 +40,46 @@ object RestartSource { * @param sourceFactory A factory for producing the [[Source]] to wrap. */ def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor)) + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false)) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by + * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true)) } } private final class RestartWithBackoffSource[T]( - sourceFactory: () ⇒ Source[T, _], - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double -) extends GraphStage[SourceShape[T]] { self ⇒ + sourceFactory: () ⇒ Source[T, _], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + onlyOnFailures: Boolean) extends GraphStage[SourceShape[T]] { self ⇒ val out = Outlet[T]("RestartWithBackoffSource.out") override def shape = SourceShape(out) override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Source", shape, minBackoff, maxBackoff, randomFactor - ) { + "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures) { override protected def logSource = self.getClass @@ -120,15 +143,13 @@ private final class RestartWithBackoffSink[T]( sinkFactory: () ⇒ Sink[T, _], minBackoff: FiniteDuration, maxBackoff: FiniteDuration, - randomFactor: Double -) extends GraphStage[SinkShape[T]] { self ⇒ + randomFactor: Double) extends GraphStage[SinkShape[T]] { self ⇒ val in = Inlet[T]("RestartWithBackoffSink.in") override def shape = SinkShape(in) override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Sink", shape, minBackoff, maxBackoff, randomFactor - ) { + "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) { override protected def logSource = self.getClass override protected def startGraph() = { @@ -187,16 +208,14 @@ private final class RestartWithBackoffFlow[In, Out]( flowFactory: () ⇒ Flow[In, Out, _], minBackoff: FiniteDuration, maxBackoff: FiniteDuration, - randomFactor: Double -) extends GraphStage[FlowShape[In, Out]] { self ⇒ + randomFactor: Double) extends GraphStage[FlowShape[In, Out]] { self ⇒ val in = Inlet[In]("RestartWithBackoffFlow.in") val out = Outlet[Out]("RestartWithBackoffFlow.out") override def shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Flow", shape, minBackoff, maxBackoff, randomFactor - ) { + "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) { var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None @@ -242,12 +261,12 @@ private final class RestartWithBackoffFlow[In, Out]( * Shared logic for all restart with backoff logics. */ private abstract class RestartWithBackoffLogic[S <: Shape]( - name: String, - shape: S, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double -) extends TimerGraphStageLogicWithLogging(shape) { + name: String, + shape: S, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + onlyOnFailures: Boolean) extends TimerGraphStageLogicWithLogging(shape) { var restartCount = 0 var resetDeadline = minBackoff.fromNow // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we @@ -263,11 +282,11 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( sinkIn.setHandler(new InHandler { override def onPush() = push(out, sinkIn.grab()) override def onUpstreamFinish() = { - if (finishing) { + if (finishing || onlyOnFailures) { complete(out) } else { - log.debug("Graph out finished") - onCompleteOrFailure() + log.debug("Restarting graph due to finished upstream") + scheduleRestartTimer() } } override def onUpstreamFailure(ex: Throwable) = { @@ -275,7 +294,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( fail(out, ex) } else { log.error(ex, "Restarting graph due to failure") - onCompleteOrFailure() + scheduleRestartTimer() } } }) @@ -307,7 +326,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( cancel(in) } else { log.debug("Graph in finished") - onCompleteOrFailure() + scheduleRestartTimer() } } }) @@ -330,7 +349,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( } // Set a timer to restart after the calculated delay - protected final def onCompleteOrFailure() = { + protected final def scheduleRestartTimer() = { // Check if the last start attempt was more than the minimum backoff if (resetDeadline.isOverdue()) { log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)