diff --git a/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md
index 534591b1bb..d33566b282 100644
--- a/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md
+++ b/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md
@@ -6,22 +6,31 @@ Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it f
## Signature
-@apidoc[RestartFlow.onFailuresWithBackoff](RestartFlow$) { scala="#onFailuresWithBackoff[In,Out](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double,maxRestarts:Int)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#onFailuresWithBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" }
+@apidoc[RestartFlow.onFailuresWithBackoff](RestartFlow$) { scala="#onFailuresWithBackoff[In,Out](settings:akka.stream.RestartSettings)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#onFailuresWithBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description
-This @apidoc[Flow] will not emit any failure
-The failures by the wrapped @apidoc[Flow] will be handled by
-restarting the wrapping @apidoc[Flow] as long as maxRestarts is not reached.
-Any termination signals sent to this @apidoc[Flow] however will terminate the wrapped @apidoc[Flow], if it's
+Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using exponential backoff.
+The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff` if max restarts).
+
+This @apidoc[Flow] will not emit any failure as long as maxRestarts is not reached.
+The failure of the wrapped @apidoc[Flow] will be handled by restarting it.
+However, any termination signals sent to this @apidoc[Flow] will terminate the wrapped @apidoc[Flow], if it's
running, and then the @apidoc[Flow] will be allowed to terminate without being restarted.
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. A termination signal from either end of the wrapped @apidoc[Flow] will cause the other end to be terminated,
and any in transit messages will be lost. During backoff, this @apidoc[Flow] will backpressure.
-This uses the same exponential backoff algorithm as @apidoc[Backoff$].
+This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
+
+See also:
+
+* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
+* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
+* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
+* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
## Reactive Streams semantics
@@ -31,4 +40,6 @@ This uses the same exponential backoff algorithm as @apidoc[Backoff$].
**backpressures** during backoff and when the wrapped flow backpressures
+**completes** when the wrapped flow completes or `maxRestarts` are reached within the given time limit
+
@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md
index a0802fd180..e25e4a51a8 100644
--- a/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md
+++ b/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md
@@ -6,20 +6,30 @@ Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it f
## Signature
-@apidoc[RestartFlow.withBackoff](RestartFlow$) { scala="#withBackoff[In,Out](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#withBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" }
+@apidoc[RestartFlow.withBackoff](RestartFlow$) { scala="#withBackoff[In,Out](settings:akka.stream.RestartSettings)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#withBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description
-The resulting @apidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
-completed. Any termination by the @apidoc[Flow] before that time will be handled by restarting it. Any termination
-signals sent to this @apidoc[Flow] however will terminate the wrapped @apidoc[Flow], if it's running, and then the @apidoc[Flow]
-will be allowed to terminate without being restarted.
+Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it completes or fails using exponential backoff.
+The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
+
+This @apidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
+completed. Any termination by the @apidoc[Flow] before that time will be handled by restarting it as long as maxRestarts
+is not reached. Any termination signals sent to this @apidoc[Flow] however will terminate the wrapped @apidoc[Flow], if it's
+running, and then the @apidoc[Flow] will be allowed to terminate without being restarted.
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. A termination signal from either end of the wrapped @apidoc[Flow] will cause the other end to be terminated,
and any in transit messages will be lost. During backoff, this @apidoc[Flow] will backpressure.
-This uses the same exponential backoff algorithm as @apidoc[Backoff$].
+This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
+
+See also:
+
+* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
+* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
+* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
+* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
## Reactive Streams semantics
@@ -29,6 +39,6 @@ This uses the same exponential backoff algorithm as @apidoc[Backoff$].
**backpressures** during backoff and when the wrapped flow backpressures
-**completes** when the wrapped flow completes
+**completes** when `maxRestarts` are reached within the given time limit
@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md
index 4857764c23..5de0ce1f49 100644
--- a/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md
+++ b/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md
@@ -6,19 +6,38 @@ Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it f
## Signature
-@apidoc[RestartSink.withBackoff](RestartSink$) { scala="#withBackoff[T](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double,maxRestarts:Int)(sinkFactory:()=>akka.stream.scaladsl.Sink[T,_]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#withBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" }
-
+@apidoc[RestartSink.withBackoff](RestartSink$) { scala="#withBackoff[T](settings:akka.stream.RestartSettings)(sinkFactory:()=>akka.stream.scaladsl.Sink[T,_]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#withBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description
-This @apidoc[Sink] will never cancel, since cancellation by the wrapped @apidoc[Sink] is always handled by restarting it.
-The wrapped @apidoc[Sink] can however be completed by feeding a completion or error into this @apidoc[Sink]. When that
-happens, the @apidoc[Sink], if currently running, will terminate and will not be restarted. This can be triggered
-simply by the upstream completing, or externally by introducing a @apidoc[KillSwitch] right before this @apidoc[Sink] in the
-graph.
+Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it completes or fails using exponential backoff.
+The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
+
+This @apidoc[Sink] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped @apidoc[Sink]
+is handled by restarting it. The wrapped @apidoc[Sink] can however be completed by feeding a completion or error into
+this @apidoc[Sink]. When that happens, the @apidoc[Sink], if currently running, will terminate and will not be restarted.
+This can be triggered simply by the upstream completing, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right
+before this @apidoc[Sink] in the graph.
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. When the wrapped @apidoc[Sink] does cancel, this @apidoc[Sink] will backpressure, however any elements already
sent may have been lost.
-This uses the same exponential backoff algorithm as @apidoc[Backoff$].
+This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
+
+See also:
+
+* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
+* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
+* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
+* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**backpressures** during backoff and when the wrapped sink backpressures
+
+**completes** when upstream completes or when `maxRestarts` are reached within the given time limit
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md
index 48423c6e19..eea0f23158 100644
--- a/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md
+++ b/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md
@@ -1,26 +1,29 @@
# RestartSource.onFailuresWithBackoff
-Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff.
+Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.
@ref[Error handling](../index.md#error-handling)
## Signature
-@apidoc[RestartSource.onFailuresWithBackoff](RestartSource$) { scala="#onFailuresWithBackoff[T](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#onFailuresWithBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" }
+@apidoc[RestartSource.onFailuresWithBackoff](RestartSource$) { scala="#onFailuresWithBackoff[T](settings:akka.stream.RestartSettings)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#onFailuresWithBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description
-Wraps the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff.
-The backoff resets back to `minBackoff` if there hasn't been a failure within `minBackoff`.
+Wraps the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using exponential backoff.
+The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
-This @apidoc[Source] will never emit a failure, since the failure of the wrapped @apidoc[Source] is always handled by
-restarting. The wrapped @apidoc[Source] can be completed by completing this @apidoc[Source].
-When that happens, the wrapped @apidoc[Source], if currently running will be cancelled, and it will not be restarted.
-This can be triggered by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right
-after this @apidoc[Source] in the graph.
+This @apidoc[Source] will not emit a failure as long as maxRestarts is not reached.
+The failure of the wrapped @apidoc[Source] is handled by restarting it.
+However, the wrapped @apidoc[Source] can be cancelled by cancelling this @apidoc[Source].
+When that happens, the wrapped @apidoc[Source], if currently running will, be cancelled and not restarted.
+This can be triggered by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right after this @apidoc[Source] in the graph.
+
+This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
See also:
+* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
@@ -61,4 +64,10 @@ Java
**emits** when the wrapped source emits
+**backpressures** during backoff and when downstream backpressures
+
+**completes** when the wrapped source completes or `maxRestarts` are reached within the given time limit
+
+**cancels** when downstream cancels
+
@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md
index c114b024d2..de038f6a0d 100644
--- a/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md
+++ b/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md
@@ -1,22 +1,33 @@
# RestartSource.withBackoff
-Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or complete using an exponential backoff.
+Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or completes using an exponential backoff.
@ref[Error handling](../index.md#error-handling)
## Signature
-@apidoc[RestartSource.withBackoff](RestartSource$) { scala="#withBackoff[T](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double,maxRestarts:Int)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#withBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" }
+@apidoc[RestartSource.withBackoff](RestartSource$) { scala="#withBackoff[T](settings:akka.stream.RestartSettings)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#withBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description
-This @apidoc[Flow] will never emit a complete or failure, since the completion or failure of the wrapped @apidoc[Source]
-is always handled by restarting it. The wrapped @apidoc[Source] can however be cancelled by cancelling this @apidoc[Source].
-When that happens, the wrapped @apidoc[Source], if currently running will be cancelled, and it will not be restarted.
-This can be triggered simply by the downstream cancelling, or externally by introducing a @apidoc[KillSwitch] right
+Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it completes or fails using exponential backoff.
+The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
+
+This @apidoc[Source] will not emit a complete or fail as long as maxRestarts is not reached, since the completion
+or failure of the wrapped @apidoc[Source] is handled by restarting it. The wrapped @apidoc[Source] can however be cancelled
+by cancelling this @apidoc[Source]. When that happens, the wrapped @apidoc[Source], if currently running, will be cancelled,
+and it will not be restarted.
+This can be triggered simply by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right
after this @apidoc[Source] in the graph.
-This uses the same exponential backoff algorithm as @apidoc[Backoff$].
+This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
+
+See also:
+
+* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
+* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
+* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
+* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
## Reactive Streams semantics
@@ -24,6 +35,10 @@ This uses the same exponential backoff algorithm as @apidoc[Backoff$].
**emits** when the wrapped source emits
-**completes** when the wrapped source completes
+**backpressures** during backoff and when downstream backpressures
+
+**completes** when `maxRestarts` are reached within the given time limit
+
+**cancels** when downstream cancels
@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 903ca76d47..1470e0f089 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -344,9 +344,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
| |Operator|Description|
|--|--|--|
-|RestartSource|@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff.|
+|RestartSource|@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.|
|RestartFlow|@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.|
-|RestartSource|@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or complete using an exponential backoff.|
+|RestartSource|@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or completes using an exponential backoff.|
|RestartFlow|@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails or complete using an exponential backoff.|
|RestartSink|@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it fails or complete using an exponential backoff.|
|RetryFlow|@ref[withBackoff](RetryFlow/withBackoff.md)|Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.|
diff --git a/akka-docs/src/main/paradox/stream/stream-error.md b/akka-docs/src/main/paradox/stream/stream-error.md
index e6c8edb93a..146afae47d 100644
--- a/akka-docs/src/main/paradox/stream/stream-error.md
+++ b/akka-docs/src/main/paradox/stream/stream-error.md
@@ -114,6 +114,15 @@ when a WebSocket connection fails due to the HTTP server it's running on going d
By using an exponential backoff, we avoid going into a tight reconnect loop, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.
+The various restart shapes mentioned all expect an `akka.stream.RestartSettings` which configures the restart behaviour.
+Configurable parameters are:
+
+* `minBackoff` is the initial duration until the underlying stream is restarted
+* `maxBackoff` caps the exponential backoff
+* `randomFactor` allows addition of a random delay following backoff calculation
+* `maxRestarts` caps the total number of restarts
+* `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts`
+
The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`]
@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a
stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will
diff --git a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java
index aef2888b43..9825489864 100644
--- a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java
+++ b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java
@@ -9,6 +9,7 @@ import akka.actor.ActorSystem;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
+import akka.stream.RestartSettings;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
@@ -61,12 +62,18 @@ public class RestartDocTest {
public void recoverWithBackoffSource() {
// #restart-with-backoff-source
+ RestartSettings settings =
+ RestartSettings.create(
+ Duration.ofSeconds(3), // min backoff
+ Duration.ofSeconds(30), // max backoff
+ 0.2 // adds 20% "noise" to vary the intervals slightly
+ )
+ .withMaxRestarts(
+ 20, Duration.ofMinutes(5)); // limits the amount of restarts to 20 within 5 minutes
+
Source eventStream =
RestartSource.withBackoff(
- Duration.ofSeconds(3), // min backoff
- Duration.ofSeconds(30), // max backoff
- 0.2, // adds 20% "noise" to vary the intervals slightly
- 20, // limits the amount of restarts to 20
+ settings,
() ->
// Create a source from a future of a source
Source.completionStageSource(
diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java
index 9e62ce9dd5..93e69ecdc2 100644
--- a/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java
+++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Restart.java
@@ -8,6 +8,7 @@ import akka.NotUsed;
import akka.actor.Cancellable;
import akka.japi.Creator;
import akka.stream.KillSwitches;
+import akka.stream.RestartSettings;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource;
@@ -34,7 +35,8 @@ public class Restart {
}));
Source, NotUsed> forever =
RestartSource.onFailuresWithBackoff(
- Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource);
+ RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
+ () -> flakySource);
forever.runWith(
Sink.foreach((Creator nr) -> system.log().info("{}", nr.create())), system);
// logs
@@ -99,7 +101,8 @@ public class Restart {
}));
UniqueKillSwitch stopRestarting =
RestartSource.onFailuresWithBackoff(
- Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource)
+ RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
+ () -> flakySource)
.viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.foreach(nr -> System.out.println("nr " + nr.create())), Keep.left())
.run(system);
diff --git a/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala b/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala
index d89bc803f6..5f1dc0703b 100644
--- a/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala
+++ b/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala
@@ -5,7 +5,7 @@
package docs.stream
import akka.NotUsed
-import akka.stream.KillSwitches
+import akka.stream.{ KillSwitches, RestartSettings }
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
@@ -34,12 +34,13 @@ class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
"demonstrate a restart with backoff source" in compileOnlySpec {
//#restart-with-backoff-source
- val restartSource = RestartSource.withBackoff(
+ val settings = RestartSettings(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
- randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
- maxRestarts = 20 // limits the amount of restarts to 20
- ) { () =>
+ randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
+ ).withMaxRestarts(20, 5.minutes) // limits the amount of restarts to 20 within 5 minutes
+
+ val restartSource = RestartSource.withBackoff(settings) { () =>
// Create a source from a future of a source
Source.futureSource {
// Make a single request with akka-http
diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala
index fde89fde5e..45fc7218f2 100644
--- a/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala
+++ b/akka-docs/src/test/scala/docs/stream/operators/source/Restart.scala
@@ -6,8 +6,7 @@ package docs.stream.operators.source
import akka.NotUsed
import akka.actor.ActorSystem
-import akka.stream.KillSwitches
-import akka.stream.UniqueKillSwitch
+import akka.stream.{ KillSwitches, RestartSettings, UniqueKillSwitch }
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink
@@ -31,7 +30,8 @@ object Restart extends App {
val flakySource: Source[() => Int, NotUsed] =
Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val forever =
- RestartSource.onFailuresWithBackoff(minBackoff = 1.second, maxBackoff = 10.seconds, 0.1)(() => flakySource)
+ RestartSource.onFailuresWithBackoff(
+ RestartSettings(minBackoff = 1.second, maxBackoff = 10.seconds, randomFactor = 0.1))(() => flakySource)
forever.runWith(Sink.foreach(nr => system.log.info("{}", nr())))
// logs
//[INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 1
@@ -56,7 +56,7 @@ object Restart extends App {
//#restart-failure-inner-complete
val finiteSource = Source.tick(1.second, 1.second, "tick").take(3)
- val forever = RestartSource.onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => finiteSource)
+ val forever = RestartSource.onFailuresWithBackoff(RestartSettings(1.second, 10.seconds, 0.1))(() => finiteSource)
forever.runWith(Sink.foreach(println))
// prints
// tick
@@ -71,7 +71,7 @@ object Restart extends App {
Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val stopRestarting: UniqueKillSwitch =
RestartSource
- .onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => flakySource)
+ .onFailuresWithBackoff(RestartSettings(1.second, 10.seconds, 0.1))(() => flakySource)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(nr => println(s"Nr ${nr()}")))(Keep.left)
.run()
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala
index 9dc8567f50..dfcd3cb2f4 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala
@@ -55,8 +55,7 @@ import akka.persistence.typed.internal.JournalInteractions.EventToPersist
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
import akka.stream.scaladsl.Keep
-import akka.stream.SystemMaterializer
-import akka.stream.WatchedActorTerminatedException
+import akka.stream.{ RestartSettings, SystemMaterializer, WatchedActorTerminatedException }
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow
@@ -144,7 +143,7 @@ private[akka] object Running {
import akka.actor.typed.scaladsl.AskPattern._
val source = RestartSource
- .withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
+ .withBackoff(RestartSettings(2.seconds, 10.seconds, randomFactor = 0.2)) { () =>
Source.futureSource {
setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr =>
replication
diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala
index 4397b88458..03979c3735 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala
@@ -35,6 +35,7 @@ import akka.stream.Attributes.LogLevels
import akka.stream.IgnoreComplete
import akka.stream.KillSwitches
import akka.stream.Materializer
+import akka.stream.RestartSettings
import akka.stream.SharedKillSwitch
import akka.stream.SinkShape
import akka.stream.scaladsl.Flow
@@ -208,10 +209,8 @@ private[remote] class ArteryTcpTransport(
// stream. For message stream it's best effort retry a few times.
RestartFlow
.withBackoff[ByteString, ByteString](
- settings.Advanced.OutboundRestartBackoff,
- settings.Advanced.OutboundRestartBackoff * 5,
- 0.1,
- maxRestarts)(flowFactory)
+ RestartSettings(settings.Advanced.OutboundRestartBackoff, settings.Advanced.OutboundRestartBackoff * 5, 0.1)
+ .withMaxRestarts(maxRestarts, settings.Advanced.OutboundRestartBackoff))(flowFactory)
// silence "Restarting graph due to failure" logging by RestartFlow
.addAttributes(Attributes.logLevels(onFailure = LogLevels.Off))
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala
index 48ece36f03..f9bf9c548c 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala
@@ -10,11 +10,9 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
-
import akka.Done
import akka.NotUsed
-import akka.stream.Attributes
-import akka.stream.OverflowStrategy
+import akka.stream.{ Attributes, OverflowStrategy, RestartSettings }
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.TestPublisher
@@ -35,11 +33,14 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
private val minBackoff = 1.second.dilated
private val maxBackoff = 3.seconds.dilated
+ private val shortRestartSettings = RestartSettings(shortMinBackoff, shortMaxBackoff, 0)
+ private val restartSettings = RestartSettings(minBackoff, maxBackoff, 0)
+
"A restart with backoff source" should {
"run normally" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Source.repeat("a")
}
@@ -59,7 +60,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"restart on completion" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Source(List("a", "b"))
}
@@ -79,7 +80,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"restart on failure" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Source(List("a", "b", "c")).map {
case "c" => throw TE("failed")
@@ -102,7 +103,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"backoff before restart" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(minBackoff, maxBackoff, 0) { () =>
+ .withBackoff(restartSettings) { () =>
created.incrementAndGet()
Source(List("a", "b"))
}
@@ -126,7 +127,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(minBackoff, maxBackoff, 0) { () =>
+ .withBackoff(restartSettings) { () =>
created.incrementAndGet()
Source(List("a", "b"))
}
@@ -160,7 +161,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val created = new AtomicInteger()
val promise = Promise[Done]()
val probe = RestartSource
- .withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Source.repeat("a").watchTermination() { (_, term) =>
promise.completeWith(term)
@@ -181,7 +182,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"not restart the source when cancelled while backing off" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(minBackoff, maxBackoff, 0) { () =>
+ .withBackoff(restartSettings) { () =>
created.incrementAndGet()
Source.single("a")
}
@@ -200,7 +201,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"stop on completion if it should only be restarted in failures" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .onFailuresWithBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Source(List("a", "b", "c")).map {
case "c" => if (created.get() == 1) throw TE("failed") else "c"
@@ -225,7 +226,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"restart on failure when only due to failures should be restarted" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .onFailuresWithBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Source(List("a", "b", "c")).map {
case "c" => throw TE("failed")
@@ -249,7 +250,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"not restart the source when maxRestarts is reached" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { () =>
+ .withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
created.incrementAndGet()
Source.single("a")
}
@@ -267,7 +268,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"reset maxRestarts when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
- .withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { () =>
+ .withBackoff(restartSettings.withMaxRestarts(2, minBackoff)) { () =>
created.incrementAndGet()
Source(List("a"))
}
@@ -290,6 +291,30 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
probe.cancel()
}
+
+ "allow using withMaxRestarts instead of minBackoff to determine the maxRestarts reset time" in assertAllStagesStopped {
+ val created = new AtomicInteger()
+ val probe = RestartSource
+ .withBackoff(shortRestartSettings.withMaxRestarts(2, 1.second)) { () =>
+ created.incrementAndGet()
+ Source(List("a", "b")).takeWhile(_ != "b")
+ }
+ .runWith(TestSink.probe)
+
+ probe.requestNext("a")
+ probe.requestNext("a")
+
+ Thread.sleep((shortMinBackoff + (shortMinBackoff * 2) + shortMinBackoff).toMillis) // if using shortMinBackoff as deadline cause reset
+
+ probe.requestNext("a")
+
+ probe.request(1)
+ probe.expectComplete()
+
+ created.get() should ===(3)
+
+ probe.cancel()
+ }
}
"A restart with backoff sink" should {
@@ -298,7 +323,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val result = Promise[Seq[String]]()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .toMat(RestartSink.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Sink.seq.mapMaterializedValue(result.completeWith)
})(Keep.left)
@@ -318,7 +343,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .toMat(RestartSink.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
@@ -344,7 +369,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () =>
+ .toMat(RestartSink.withBackoff(restartSettings) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
@@ -371,7 +396,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () =>
+ .toMat(RestartSink.withBackoff(restartSettings) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
@@ -413,7 +438,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () =>
+ .toMat(RestartSink.withBackoff(restartSettings) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
@@ -438,7 +463,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { () =>
+ .toMat(RestartSink.withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
@@ -462,7 +487,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
- .toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { () =>
+ .toMat(RestartSink.withBackoff(restartSettings.withMaxRestarts(2, minBackoff)) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
@@ -476,7 +501,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
// The probe should now be backing off for 2 * minBackoff
// Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the
- // subsequent minBackoff min backoff to pass, so it resets the restart count
+ // subsequent minBackoff to pass, so it resets the restart count
Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis)
probe.sendNext("cancel")
@@ -491,20 +516,50 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
sinkProbe.cancel()
probe.sendComplete()
}
+
+ "allow using withMaxRestarts instead of minBackoff to determine the maxRestarts reset time" in assertAllStagesStopped {
+ val created = new AtomicInteger()
+ val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
+ val probe = TestSource
+ .probe[String]
+ .toMat(RestartSink.withBackoff(shortRestartSettings.withMaxRestarts(2, 1.second)) { () =>
+ created.incrementAndGet()
+ Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
+ })(Keep.left)
+ .run()
+
+ probe.sendNext("cancel")
+ sinkProbe.requestNext("cancel")
+ // There should be a shortMinBackoff delay
+ probe.sendNext("cancel")
+ sinkProbe.requestNext("cancel")
+ // The probe should now be backing off for 2 * shortMinBackoff
+
+ Thread.sleep((shortMinBackoff + (shortMinBackoff * 2) + minBackoff).toMillis) // if using shortMinBackoff as deadline cause reset
+
+ probe.sendNext("cancel")
+ sinkProbe.requestNext("cancel")
+
+ // We cannot get a final element
+ probe.sendNext("cancel")
+ sinkProbe.request(1)
+ sinkProbe.expectNoMessage()
+
+ created.get() should ===(3)
+
+ sinkProbe.cancel()
+ probe.sendComplete()
+ }
}
"A restart with backoff flow" should {
// helps reuse all the setupFlow code for both methods: withBackoff, and onlyOnFailuresWithBackoff
- def RestartFlowFactory[In, Out](onlyOnFailures: Boolean)
- : (FiniteDuration, FiniteDuration, Double, Int) => (() => Flow[In, Out, _]) => Flow[In, Out, NotUsed] =
- if (onlyOnFailures) {
- RestartFlow.onFailuresWithBackoff
- } else {
- // choose the correct backoff method
- (minBackoff, maxBackoff, randomFactor, maxRestarts) =>
- RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts)
- }
+ def RestartFlowFactory[In, Out](
+ onlyOnFailures: Boolean,
+ settings: RestartSettings): (() => Flow[In, Out, _]) => Flow[In, Out, NotUsed] =
+ if (onlyOnFailures) RestartFlow.onFailuresWithBackoff(settings)
+ else RestartFlow.withBackoff(settings)
def setupFlow(
minBackoff: FiniteDuration,
@@ -523,32 +578,35 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
// simply use the probes as a message bus for feeding and capturing events.
val (source, sink) = TestSource
.probe[String]
- .viaMat(RestartFlowFactory(onlyOnFailures)(minBackoff, maxBackoff, 0, maxRestarts) { () =>
- created.incrementAndGet()
- Flow.fromSinkAndSource(
- Flow[String]
- .takeWhile(_ != "cancel")
- .map {
- case "in error" => throw TE("in error")
- case other => other
- }
- .to(Sink
- .foreach(flowInSource.sendNext)
- .mapMaterializedValue(_.onComplete {
- case Success(_) => flowInSource.sendNext("in complete")
- case Failure(_) => flowInSource.sendNext("in error")
- })),
- flowOutSource
- .takeWhile(_ != "complete")
- .map {
- case "error" => throw TE("error")
- case other => other
- }
- .watchTermination()((_, term) =>
- term.foreach(_ => {
- flowInSource.sendNext("out complete")
- })))
- })(Keep.left)
+ .viaMat(
+ RestartFlowFactory(
+ onlyOnFailures,
+ RestartSettings(minBackoff, maxBackoff, 0).withMaxRestarts(maxRestarts, minBackoff)) { () =>
+ created.incrementAndGet()
+ Flow.fromSinkAndSource(
+ Flow[String]
+ .takeWhile(_ != "cancel")
+ .map {
+ case "in error" => throw TE("in error")
+ case other => other
+ }
+ .to(Sink
+ .foreach(flowInSource.sendNext)
+ .mapMaterializedValue(_.onComplete {
+ case Success(_) => flowInSource.sendNext("in complete")
+ case Failure(_) => flowInSource.sendNext("in error")
+ })),
+ flowOutSource
+ .takeWhile(_ != "complete")
+ .map {
+ case "error" => throw TE("error")
+ case other => other
+ }
+ .watchTermination()((_, term) =>
+ term.foreach(_ => {
+ flowInSource.sendNext("out complete")
+ })))
+ })(Keep.left)
.toMat(TestSink.probe[String])(Keep.both)
.run()
@@ -559,7 +617,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val created = new AtomicInteger()
val (source, sink) = TestSource
.probe[String]
- .viaMat(RestartFlow.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () =>
+ .viaMat(RestartFlow.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
Flow[String]
})(Keep.left)
@@ -810,7 +868,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val restartOnFailures =
RestartFlow
- .onFailuresWithBackoff(1.second, 2.seconds, 0.2, 2)(() => {
+ .onFailuresWithBackoff(RestartSettings(1.second, 2.seconds, 0.2).withMaxRestarts(2, 1.second))(() => {
flowCreations.incrementAndGet()
failsSomeTimes
})
diff --git a/akka-stream/src/main/mima-filters/2.6.9.backwards.excludes/29591-restart-deadline.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.9.backwards.excludes/29591-restart-deadline.backwards.excludes
new file mode 100644
index 0000000000..c3aa508a32
--- /dev/null
+++ b/akka-stream/src/main/mima-filters/2.6.9.backwards.excludes/29591-restart-deadline.backwards.excludes
@@ -0,0 +1,6 @@
+# Changes to private internals
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSink.this")
diff --git a/akka-stream/src/main/scala/akka/stream/RestartSettings.scala b/akka-stream/src/main/scala/akka/stream/RestartSettings.scala
new file mode 100644
index 0000000000..8c56e422d4
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/RestartSettings.scala
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2020 Lightbend Inc.
+ */
+
+package akka.stream
+
+import scala.concurrent.duration.FiniteDuration
+
+import akka.util.JavaDurationConverters._
+
+final class RestartSettings private (
+ val minBackoff: FiniteDuration,
+ val maxBackoff: FiniteDuration,
+ val randomFactor: Double,
+ val maxRestarts: Int,
+ val maxRestartsWithin: FiniteDuration) {
+
+ /** Scala API: minimum (initial) duration until the child actor will started again, if it is terminated */
+ def withMinBackoff(value: FiniteDuration): RestartSettings = copy(minBackoff = value)
+
+ /** Java API: minimum (initial) duration until the child actor will started again, if it is terminated */
+ def withMinBackoff(value: java.time.Duration): RestartSettings = copy(minBackoff = value.asScala)
+
+ /** Scala API: the exponential back-off is capped to this duration */
+ def withMaxBackoff(value: FiniteDuration): RestartSettings = copy(maxBackoff = value)
+
+ /** Java API: the exponential back-off is capped to this duration */
+ def withMaxBackoff(value: java.time.Duration): RestartSettings = copy(maxBackoff = value.asScala)
+
+ /**
+ * After calculation of the exponential back-off an additional random delay based on this factor is added
+ * e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`
+ */
+ def withRandomFactor(value: Double): RestartSettings = copy(randomFactor = value)
+
+ /** Scala API: The amount of restarts is capped to `count` within a timeframe of `within` */
+ def withMaxRestarts(count: Int, within: FiniteDuration): RestartSettings =
+ copy(maxRestarts = count, maxRestartsWithin = within)
+
+ /** Java API: The amount of restarts is capped to `count` within a timeframe of `within` */
+ def withMaxRestarts(count: Int, within: java.time.Duration): RestartSettings =
+ copy(maxRestarts = count, maxRestartsWithin = within.asScala)
+
+ override def toString: String =
+ "RestartSettings(" +
+ s"minBackoff=$minBackoff," +
+ s"maxBackoff=$maxBackoff," +
+ s"randomFactor=$randomFactor," +
+ s"maxRestarts=$maxRestarts," +
+ s"maxRestartsWithin=$maxRestartsWithin)"
+
+ private def copy(
+ minBackoff: FiniteDuration = minBackoff,
+ maxBackoff: FiniteDuration = maxBackoff,
+ randomFactor: Double = randomFactor,
+ maxRestarts: Int = maxRestarts,
+ maxRestartsWithin: FiniteDuration = maxRestartsWithin): RestartSettings =
+ new RestartSettings(minBackoff, maxBackoff, randomFactor, maxRestarts, maxRestartsWithin)
+
+}
+
+object RestartSettings {
+
+ /** Scala API */
+ def apply(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): RestartSettings =
+ new RestartSettings(
+ minBackoff = minBackoff,
+ maxBackoff = maxBackoff,
+ randomFactor = randomFactor,
+ maxRestarts = Int.MaxValue,
+ maxRestartsWithin = minBackoff)
+
+ /** Java API */
+ def create(minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double): RestartSettings =
+ new RestartSettings(
+ minBackoff = minBackoff.asScala,
+ maxBackoff = maxBackoff.asScala,
+ randomFactor = randomFactor,
+ maxRestarts = Int.MaxValue,
+ maxRestartsWithin = minBackoff.asScala)
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala
index 0dfc51ff62..04a04e02e5 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala
@@ -6,10 +6,9 @@ package akka.stream.javadsl
import scala.concurrent.duration.FiniteDuration
-import com.github.ghik.silencer.silent
-
import akka.NotUsed
import akka.japi.function.Creator
+import akka.stream.RestartSettings
/**
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
@@ -33,7 +32,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -50,11 +49,8 @@ object RestartFlow {
maxBackoff: FiniteDuration,
randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
- akka.stream.scaladsl.RestartFlow
- .withBackoff(minBackoff, maxBackoff, randomFactor) { () =>
- flowFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings, flowFactory)
}
/**
@@ -70,7 +66,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -80,14 +76,15 @@ object RestartFlow {
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
- import akka.util.JavaDurationConverters._
- withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings, flowFactory)
}
/**
@@ -103,7 +100,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -123,11 +120,8 @@ object RestartFlow {
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
- akka.stream.scaladsl.RestartFlow
- .withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
- flowFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings, flowFactory)
}
/**
@@ -143,7 +137,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -155,17 +149,43 @@ object RestartFlow {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
- import akka.util.JavaDurationConverters._
- withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings, flowFactory)
}
+ /**
+ * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
+ * backoff.
+ *
+ * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
+ * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
+ * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
+ * running, and then the [[Flow]] will be allowed to terminate without being restarted.
+ *
+ * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
+ * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
+ * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param flowFactory A factory for producing the [[Flow]] to wrap.
+ */
+ def withBackoff[In, Out](settings: RestartSettings, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] =
+ akka.stream.scaladsl.RestartFlow
+ .withBackoff(settings) { () =>
+ flowFactory.create().asScala
+ }
+ .asJava
+
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff.
@@ -179,7 +199,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -199,11 +219,8 @@ object RestartFlow {
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
- akka.stream.scaladsl.RestartFlow
- .onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
- flowFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ onFailuresWithBackoff(settings, flowFactory)
}
/**
@@ -219,7 +236,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -231,14 +248,42 @@ object RestartFlow {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[In, Out](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
- import akka.util.JavaDurationConverters._
- onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ onFailuresWithBackoff(settings, flowFactory)
}
+
+ /**
+ * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
+ * using an exponential backoff.
+ *
+ * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
+ * time will be handled by restarting it as long as maxRestarts is not reached.
+ * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
+ * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
+ *
+ * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
+ * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
+ * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param flowFactory A factory for producing the [[Flow]] to wrap.
+ */
+ def onFailuresWithBackoff[In, Out](
+ settings: RestartSettings,
+ flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] =
+ akka.stream.scaladsl.RestartFlow
+ .onFailuresWithBackoff(settings) { () =>
+ flowFactory.create().asScala
+ }
+ .asJava
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala
index 56bf831939..63479ae20b 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala
@@ -6,10 +6,9 @@ package akka.stream.javadsl
import scala.concurrent.duration.FiniteDuration
-import com.github.ghik.silencer.silent
-
import akka.NotUsed
import akka.japi.function.Creator
+import akka.stream.RestartSettings
/**
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
@@ -34,7 +33,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -51,11 +50,8 @@ object RestartSink {
maxBackoff: FiniteDuration,
randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
- akka.stream.scaladsl.RestartSink
- .withBackoff(minBackoff, maxBackoff, randomFactor) { () =>
- sinkFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings, sinkFactory)
}
/**
@@ -72,7 +68,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -82,14 +78,15 @@ object RestartSink {
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
- import akka.util.JavaDurationConverters._
- withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings, sinkFactory)
}
/**
@@ -106,7 +103,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -126,11 +123,8 @@ object RestartSink {
randomFactor: Double,
maxRestarts: Int,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
- akka.stream.scaladsl.RestartSink
- .withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
- sinkFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings, sinkFactory)
}
/**
@@ -147,7 +141,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -159,14 +153,41 @@ object RestartSink {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
- import akka.util.JavaDurationConverters._
- withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings, sinkFactory)
}
+
+ /**
+ * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
+ * backoff.
+ *
+ * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
+ * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
+ * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
+ * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
+ * before this [[Sink]] in the graph.
+ *
+ * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
+ * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
+ * sent may have been lost.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param sinkFactory A factory for producing the [[Sink]] to wrap.
+ */
+ def withBackoff[T](settings: RestartSettings, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] =
+ akka.stream.scaladsl.RestartSink
+ .withBackoff(settings) { () =>
+ sinkFactory.create().asScala
+ }
+ .asJava
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala
index fc533c4b59..f4ceb1d982 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala
@@ -6,10 +6,9 @@ package akka.stream.javadsl
import scala.concurrent.duration.FiniteDuration
-import com.github.ghik.silencer.silent
-
import akka.NotUsed
import akka.japi.function.Creator
+import akka.stream.RestartSettings
/**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
@@ -30,7 +29,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -47,11 +46,8 @@ object RestartSource {
maxBackoff: FiniteDuration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- akka.stream.scaladsl.RestartSource
- .withBackoff(minBackoff, maxBackoff, randomFactor) { () =>
- sourceFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings, sourceFactory)
}
/**
@@ -64,7 +60,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -74,14 +70,15 @@ object RestartSource {
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- import akka.util.JavaDurationConverters._
- withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings, sourceFactory)
}
/**
@@ -95,7 +92,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -115,11 +112,8 @@ object RestartSource {
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- akka.stream.scaladsl.RestartSource
- .withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
- sourceFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings, sourceFactory)
}
/**
@@ -133,7 +127,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -145,17 +139,41 @@ object RestartSource {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- import akka.util.JavaDurationConverters._
- withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings, sourceFactory)
}
+ /**
+ * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
+ * backoff.
+ *
+ * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
+ * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
+ * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
+ * and it will not be restarted.
+ * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
+ * after this [[Source]] in the graph.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param sourceFactory A factory for producing the [[Source]] to wrap.
+ */
+ def withBackoff[T](settings: RestartSettings, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] =
+ akka.stream.scaladsl.RestartSource
+ .withBackoff(settings) { () =>
+ sourceFactory.create().asScala
+ }
+ .asJava
+
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
@@ -165,7 +183,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -183,11 +201,8 @@ object RestartSource {
maxBackoff: FiniteDuration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- akka.stream.scaladsl.RestartSource
- .onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () =>
- sourceFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ onFailuresWithBackoff(settings, sourceFactory)
}
/**
@@ -199,7 +214,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -210,14 +225,15 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- import akka.util.JavaDurationConverters._
- onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
+ onFailuresWithBackoff(settings, sourceFactory)
}
/**
@@ -229,7 +245,7 @@ object RestartSource {
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -250,11 +266,8 @@ object RestartSource {
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- akka.stream.scaladsl.RestartSource
- .onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
- sourceFactory.create().asScala
- }
- .asJava
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ onFailuresWithBackoff(settings, sourceFactory)
}
/**
@@ -266,7 +279,7 @@ object RestartSource {
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -279,14 +292,37 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
- @silent("deprecated")
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
- import akka.util.JavaDurationConverters._
- onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory)
+ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ onFailuresWithBackoff(settings, sourceFactory)
}
+
+ /**
+ * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
+ *
+ * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
+ * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
+ * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
+ * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
+ * introducing a [[KillSwitch]] right after this [[Source]] in the graph.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param sourceFactory A factory for producing the [[Source]] to wrap.
+ *
+ */
+ def onFailuresWithBackoff[T](settings: RestartSettings, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] =
+ akka.stream.scaladsl.RestartSource
+ .onFailuresWithBackoff(settings) { () =>
+ sourceFactory.create().asScala
+ }
+ .asJava
}
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala
index 931a2819d2..78f2f61502 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala
@@ -39,7 +39,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -49,16 +49,12 @@ object RestartFlow {
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
- Flow.fromGraph(
- new RestartWithBackoffFlow(
- flowFactory,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = false,
- Int.MaxValue))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings)(flowFactory)
}
/**
@@ -74,7 +70,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -86,19 +82,73 @@ object RestartFlow {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
- Flow.fromGraph(
- new RestartWithBackoffFlow(
- flowFactory,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = false,
- maxRestarts))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings)(flowFactory)
+ }
+
+ /**
+ * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
+ * backoff.
+ *
+ * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
+ * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
+ * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
+ * running, and then the [[Flow]] will be allowed to terminate without being restarted.
+ *
+ * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
+ * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
+ * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param flowFactory A factory for producing the [[Flow]] to wrap.
+ */
+ def withBackoff[In, Out](settings: RestartSettings)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] =
+ Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, settings, onlyOnFailures = false))
+
+ /**
+ * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential
+ * backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow.
+ *
+ * This [[Flow]] will not emit any failure
+ * The failures by the wrapped [[Flow]] will be handled by
+ * restarting the wrapping [[Flow]] as long as maxRestarts is not reached.
+ * Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
+ * running, and then the [[Flow]] will be allowed to terminate without being restarted.
+ *
+ * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
+ * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
+ * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param minBackoff minimum (initial) duration until the child actor will
+ * started again, if it is terminated
+ * @param maxBackoff the exponential back-off is capped to this duration
+ * @param randomFactor after calculation of the exponential back-off an additional
+ * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
+ * In order to skip this additional delay pass in `0`.
+ * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
+ * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
+ * @param flowFactory A factory for producing the [[Flow]] to wrap.
+ */
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
+ def onFailuresWithBackoff[In, Out](
+ minBackoff: FiniteDuration,
+ maxBackoff: FiniteDuration,
+ randomFactor: Double,
+ maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ onFailuresWithBackoff(settings)(flowFactory)
}
/**
@@ -115,36 +165,21 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
- * @param minBackoff minimum (initial) duration until the child actor will
- * started again, if it is terminated
- * @param maxBackoff the exponential back-off is capped to this duration
- * @param randomFactor after calculation of the exponential back-off an additional
- * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
- * In order to skip this additional delay pass in `0`.
- * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
- * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
+ * @param settings [[RestartSettings]] defining restart configuration
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
- def onFailuresWithBackoff[In, Out](
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double,
- maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
- Flow.fromGraph(
- new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts))
- }
+ def onFailuresWithBackoff[In, Out](settings: RestartSettings)(
+ flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] =
+ Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, settings, onlyOnFailures = true))
}
private final class RestartWithBackoffFlow[In, Out](
flowFactory: () => Flow[In, Out, _],
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double,
- onlyOnFailures: Boolean,
- maxRestarts: Int)
+ settings: RestartSettings,
+ onlyOnFailures: Boolean)
extends GraphStage[FlowShape[In, Out]] { self =>
val in = Inlet[In]("RestartWithBackoffFlow.in")
@@ -153,15 +188,7 @@ private final class RestartWithBackoffFlow[In, Out](
override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) =
- new RestartWithBackoffLogic(
- "Flow",
- shape,
- inheritedAttributes,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures,
- maxRestarts) {
+ new RestartWithBackoffLogic("Flow", shape, inheritedAttributes, settings, onlyOnFailures) {
val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration
var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
@@ -218,14 +245,12 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
name: String,
shape: S,
inheritedAttributes: Attributes,
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double,
- onlyOnFailures: Boolean,
- maxRestarts: Int)
+ settings: RestartSettings,
+ onlyOnFailures: Boolean)
extends TimerGraphStageLogicWithLogging(shape) {
+ import settings._
var restartCount = 0
- var resetDeadline = minBackoff.fromNow
+ var resetDeadline = maxRestartsWithin.fromNow
// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
// don't want to restart the sub inlet when it finishes, we just finish normally.
@@ -335,9 +360,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
}
protected final def maxRestartsReached(): Boolean = {
- // Check if the last start attempt was more than the minimum backoff
+ // Check if the last start attempt was more than the reset deadline
if (resetDeadline.isOverdue()) {
- log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)
+ log.debug("Last restart attempt was more than {} ago, resetting restart count", maxRestartsWithin)
restartCount = 0
}
restartCount == maxRestarts
@@ -356,7 +381,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
// Invoked when the backoff timer ticks
override protected def onTimer(timerKey: Any) = {
startGraph()
- resetDeadline = minBackoff.fromNow
+ resetDeadline = maxRestartsWithin.fromNow
}
// When the stage starts, start the source
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala
index 8c48dea817..03af1ffb10 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala
@@ -7,7 +7,7 @@ package akka.stream.scaladsl
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
-import akka.stream.{ Attributes, Inlet, SinkShape }
+import akka.stream.{ Attributes, Inlet, RestartSettings, SinkShape }
import akka.stream.stage.{ GraphStage, InHandler }
/**
@@ -33,7 +33,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -43,9 +43,12 @@ object RestartSink {
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = {
- Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings)(sinkFactory)
}
/**
@@ -62,7 +65,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -74,33 +77,45 @@ object RestartSink {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(
sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = {
- Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings)(sinkFactory)
}
+
+ /**
+ * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
+ * backoff.
+ *
+ * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
+ * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
+ * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
+ * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
+ * before this [[Sink]] in the graph.
+ *
+ * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
+ * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
+ * sent may have been lost.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param sinkFactory A factory for producing the [[Sink]] to wrap.
+ */
+ def withBackoff[T](settings: RestartSettings)(sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] =
+ Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, settings))
}
-private final class RestartWithBackoffSink[T](
- sinkFactory: () => Sink[T, _],
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double,
- maxRestarts: Int)
+private final class RestartWithBackoffSink[T](sinkFactory: () => Sink[T, _], restartSettings: RestartSettings)
extends GraphStage[SinkShape[T]] { self =>
val in = Inlet[T]("RestartWithBackoffSink.in")
override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) =
- new RestartWithBackoffLogic(
- "Sink",
- shape,
- inheritedAttributes,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = false,
- maxRestarts) {
+ new RestartWithBackoffLogic("Sink", shape, inheritedAttributes, restartSettings, onlyOnFailures = false) {
override protected def logSource = self.getClass
override protected def startGraph() = {
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala
index 46c063940e..a4277ec3b2 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala
@@ -7,7 +7,7 @@ package akka.stream.scaladsl
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
-import akka.stream.{ Attributes, Outlet, SourceShape }
+import akka.stream.{ Attributes, Outlet, RestartSettings, SourceShape }
import akka.stream.stage.{ GraphStage, OutHandler }
/**
@@ -29,7 +29,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -39,16 +39,12 @@ object RestartSource {
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
- Source.fromGraph(
- new RestartWithBackoffSource(
- sourceFactory,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = false,
- Int.MaxValue))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ withBackoff(settings)(sourceFactory)
}
/**
@@ -62,7 +58,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -74,18 +70,33 @@ object RestartSource {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
- Source.fromGraph(
- new RestartWithBackoffSource(
- sourceFactory,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = false,
- maxRestarts))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ withBackoff(settings)(sourceFactory)
}
+ /**
+ * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
+ * backoff.
+ *
+ * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
+ * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
+ * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
+ * and it will not be restarted.
+ * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
+ * after this [[Source]] in the graph.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param sourceFactory A factory for producing the [[Source]] to wrap.
+ */
+ def withBackoff[T](settings: RestartSettings)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] =
+ Source.fromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures = false))
+
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
@@ -95,7 +106,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -106,16 +117,12 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
- Source.fromGraph(
- new RestartWithBackoffSource(
- sourceFactory,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = true,
- Int.MaxValue))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
+ onFailuresWithBackoff(settings)(sourceFactory)
}
/**
@@ -128,7 +135,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
- * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -141,44 +148,48 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
+ @Deprecated
+ @deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
- Source.fromGraph(
- new RestartWithBackoffSource(
- sourceFactory,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures = true,
- maxRestarts))
+ val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
+ onFailuresWithBackoff(settings)(sourceFactory)
}
+
+ /**
+ * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
+ *
+ * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
+ * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
+ * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
+ * and it will not be restarted.
+ * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
+ * after this [[Source]] in the graph.
+ *
+ * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
+ *
+ * @param settings [[RestartSettings]] defining restart configuration
+ * @param sourceFactory A factory for producing the [[Source]] to wrap.
+ *
+ */
+ def onFailuresWithBackoff[T](settings: RestartSettings)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] =
+ Source.fromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures = true))
}
private final class RestartWithBackoffSource[T](
sourceFactory: () => Source[T, _],
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double,
- onlyOnFailures: Boolean,
- maxRestarts: Int)
+ settings: RestartSettings,
+ onlyOnFailures: Boolean)
extends GraphStage[SourceShape[T]] { self =>
val out = Outlet[T]("RestartWithBackoffSource.out")
override def shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) =
- new RestartWithBackoffLogic(
- "Source",
- shape,
- inheritedAttributes,
- minBackoff,
- maxBackoff,
- randomFactor,
- onlyOnFailures,
- maxRestarts) {
+ new RestartWithBackoffLogic("Source", shape, inheritedAttributes, settings, onlyOnFailures) {
override protected def logSource = self.getClass