diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md index e30e8368c2..72e33853d3 100644 --- a/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md +++ b/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md @@ -14,11 +14,50 @@ Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when ## Description - This @apidoc[Source] will never emit a failure, since the failure of the wrapped @apidoc[Source] is always handled by - restarting. The wrapped @apidoc[Source] can be cancelled by cancelling this @apidoc[Source]. - When that happens, the wrapped @apidoc[Source], if currently running will be cancelled, and it will not be restarted. - This can be triggered simply by the downstream cancelling, or externally by introducing a @apidoc[KillSwitch] right - after this @apidoc[Source] in the graph. +Wraps the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. +The backoff resets back to `minBackoff` if there hasn't been a failure within `minBackoff`. + +This @apidoc[Source] will never emit a failure, since the failure of the wrapped @apidoc[Source] is always handled by +restarting. The wrapped @apidoc[Source] can be completed by completing this @apidoc[Source]. +When that happens, the wrapped @apidoc[Source], if currently running will be cancelled, and it will not be restarted. +This can be triggered by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right +after this @apidoc[Source] in the graph. + +See also: + +* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md) +* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md) +* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md) + +## Examples + +This shows that a Source is not restarted if it completes, only if it fails. Tick is only printed +three times as the `take(3)` means the inner source completes successfully after emitting the first 3 elements. + +Scala +: @@snip [Restart.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala) { #restart-failure-inner-complete } + +Java +: @@snip [Restart.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java) { #restart-failure-inner-complete } + +If the inner source instead fails, it will be restarted with an increasing backoff. The source emits 1, 2, 3, and then throws an exception. +The first time the exception is thrown the source is restarted after 1s, then 2s etc, until the `maxBackoff` of 10s. + +Scala +: @@snip [Restart.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala) { #restart-failure-inner-failure } + +Java +: @@snip [Restart.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java) { #restart-failure-inner-failure } + +Finally, to be able to stop the restarting, a kill switch can be used. The kill switch is inserted right after the restart +source. The inner source is the same as above so emits 3 elements and then fails. A killswitch is used to be able to stop the source +being restarted: + +Scala +: @@snip [Restart.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala) { #restart-failure-inner-complete-kill-switch } + +Java +: @@snip [Restart.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java) { #restart-failure-inner-complete-kill-switch } ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java new file mode 100644 index 0000000000..9e62ce9dd5 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.actor.Cancellable; +import akka.japi.Creator; +import akka.stream.KillSwitches; +import akka.stream.UniqueKillSwitch; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.RestartSource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.util.Arrays; + +public class Restart { + static akka.actor.ActorSystem system = akka.actor.ActorSystem.create(); + + public static void onRestartWithBackoffInnerFailure() { + // #restart-failure-inner-failure + // could throw if for example it used a database connection to get rows + Source, NotUsed> flakySource = + Source.from( + Arrays.>asList( + () -> 1, + () -> 2, + () -> 3, + () -> { + throw new RuntimeException("darn"); + })); + Source, NotUsed> forever = + RestartSource.onFailuresWithBackoff( + Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource); + forever.runWith( + Sink.foreach((Creator nr) -> system.log().info("{}", nr.create())), system); + // logs + // [INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7] + // [akka.actor.ActorSystemImpl(default)] 1 + // [INFO] [12/10/2019 13:51:58.301] [default-akka.test.stream-dispatcher-7] + // [akka.actor.ActorSystemImpl(default)] 2 + // [INFO] [12/10/2019 13:51:58.302] [default-akka.test.stream-dispatcher-7] + // [akka.actor.ActorSystemImpl(default)] 3 + // [WARN] [12/10/2019 13:51:58.310] [default-akka.test.stream-dispatcher-7] + // [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: + // (RuntimeException: darn) + // --> 1 second gap + // [INFO] [12/10/2019 13:51:59.379] [default-akka.test.stream-dispatcher-8] + // [akka.actor.ActorSystemImpl(default)] 1 + // [INFO] [12/10/2019 13:51:59.382] [default-akka.test.stream-dispatcher-8] + // [akka.actor.ActorSystemImpl(default)] 2 + // [INFO] [12/10/2019 13:51:59.383] [default-akka.test.stream-dispatcher-8] + // [akka.actor.ActorSystemImpl(default)] 3 + // [WARN] [12/10/2019 13:51:59.386] [default-akka.test.stream-dispatcher-8] + // [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: + // (RuntimeException: darn) + // --> 2 second gap + // [INFO] [12/10/2019 13:52:01.594] [default-akka.test.stream-dispatcher-8] + // [akka.actor.ActorSystemImpl(default)] 1 + // [INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8] + // [akka.actor.ActorSystemImpl(default)] 2 + // [INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8] + // [akka.actor.ActorSystemImpl(default)] 3 + // [WARN] [12/10/2019 13:52:01.596] [default-akka.test.stream-dispatcher-8] + // [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: + // (RuntimeException: darn) + // #restart-failure-inner-failure + + } + + public static void onRestartWithBackoffInnerComplete() { + // #restart-failure-inner-complete + Source finiteSource = + Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick").take(3); + Source forever = + RestartSource.onFailuresWithBackoff( + Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> finiteSource); + forever.runWith(Sink.foreach(System.out::println), system); + // prints + // tick + // tick + // tick + // #restart-failure-inner-complete + } + + public static void onRestartWitFailureKillSwitch() { + // #restart-failure-inner-complete-kill-switch + Source, NotUsed> flakySource = + Source.from( + Arrays.>asList( + () -> 1, + () -> 2, + () -> 3, + () -> { + throw new RuntimeException("darn"); + })); + UniqueKillSwitch stopRestarting = + RestartSource.onFailuresWithBackoff( + Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource) + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(Sink.foreach(nr -> System.out.println("nr " + nr.create())), Keep.left()) + .run(system); + // ... from some where else + // stop the source from restarting + stopRestarting.shutdown(); + // #restart-failure-inner-complete-kill-switch + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala new file mode 100644 index 0000000000..a6b6c77372 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.KillSwitches +import akka.stream.UniqueKillSwitch +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.RestartSource +import akka.stream.scaladsl.Sink + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +// #imports +import akka.stream.scaladsl.Source +// #imports + +object Restart extends App { + implicit val system: ActorSystem = ActorSystem() + + onRestartWitFailureKillSwitch() + + case class CantConnectToDatabase(msg: String) extends RuntimeException(msg) with NoStackTrace + + def onRestartWithBackoffInnerFailure(): Unit = { + //#restart-failure-inner-failure + // could throw if for example it used a database connection to get rows + val flakySource: Source[() => Int, NotUsed] = + Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn"))) + val forever = + RestartSource.onFailuresWithBackoff(minBackoff = 1.second, maxBackoff = 10.seconds, 0.1)(() => flakySource) + forever.runWith(Sink.foreach(nr => system.log.info("{}", nr()))) + // logs + //[INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 1 + //[INFO] [12/10/2019 13:51:58.301] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 2 + //[INFO] [12/10/2019 13:51:58.302] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 3 + //[WARN] [12/10/2019 13:51:58.310] [default-akka.test.stream-dispatcher-7] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: (docs.stream.operators.source.Restart$CantConnectToDatabase: darn) + // --> 1 second gap + //[INFO] [12/10/2019 13:51:59.379] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 1 + //[INFO] [12/10/2019 13:51:59.382] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 2 + //[INFO] [12/10/2019 13:51:59.383] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 3 + //[WARN] [12/10/2019 13:51:59.386] [default-akka.test.stream-dispatcher-8] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: (docs.stream.operators.source.Restart$CantConnectToDatabase: darn) + //--> 2 second gap + //[INFO] [12/10/2019 13:52:01.594] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 1 + //[INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 2 + //[INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 3 + //[WARN] [12/10/2019 13:52:01.596] [default-akka.test.stream-dispatcher-8] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: (docs.stream.operators.source.Restart$CantConnectToDatabase: darn) + //#restart-failure-inner-failure + + } + + def onRestartWithBackoffInnerComplete() { + + //#restart-failure-inner-complete + val finiteSource = Source.tick(1.second, 1.second, "tick").take(3) + val forever = RestartSource.onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => finiteSource) + forever.runWith(Sink.foreach(println)) + // prints + // tick + // tick + // tick + //#restart-failure-inner-complete + } + + def onRestartWitFailureKillSwitch(): Unit = { + //#restart-failure-inner-complete-kill-switch + val flakySource: Source[() => Int, NotUsed] = + Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn"))) + val stopRestarting: UniqueKillSwitch = + RestartSource + .onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => flakySource) + .viaMat(KillSwitches.single)(Keep.right) + .toMat(Sink.foreach(nr => println(s"Nr ${nr()}")))(Keep.left) + .run() + //... from some where else + // stop the source from restarting + stopRestarting.shutdown() + //#restart-failure-inner-complete-kill-switch + } +}