Restart(Source|Flow|Sink): Configurable stream restart deadline (#29591)

This commit is contained in:
r-glyde 2020-10-05 08:12:15 +01:00 committed by GitHub
parent 4cc3c58a08
commit a4acf23d05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 727 additions and 347 deletions

View file

@ -6,22 +6,31 @@ Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it f
## Signature ## Signature
@apidoc[RestartFlow.onFailuresWithBackoff](RestartFlow$) { scala="#onFailuresWithBackoff[In,Out](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double,maxRestarts:Int)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#onFailuresWithBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" } @apidoc[RestartFlow.onFailuresWithBackoff](RestartFlow$) { scala="#onFailuresWithBackoff[In,Out](settings:akka.stream.RestartSettings)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#onFailuresWithBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description ## Description
This @apidoc[Flow] will not emit any failure Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using exponential backoff.
The failures by the wrapped @apidoc[Flow] will be handled by The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff` if max restarts).
restarting the wrapping @apidoc[Flow] as long as maxRestarts is not reached.
Any termination signals sent to this @apidoc[Flow] however will terminate the wrapped @apidoc[Flow], if it's This @apidoc[Flow] will not emit any failure as long as maxRestarts is not reached.
The failure of the wrapped @apidoc[Flow] will be handled by restarting it.
However, any termination signals sent to this @apidoc[Flow] will terminate the wrapped @apidoc[Flow], if it's
running, and then the @apidoc[Flow] will be allowed to terminate without being restarted. running, and then the @apidoc[Flow] will be allowed to terminate without being restarted.
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. A termination signal from either end of the wrapped @apidoc[Flow] will cause the other end to be terminated, messages. A termination signal from either end of the wrapped @apidoc[Flow] will cause the other end to be terminated,
and any in transit messages will be lost. During backoff, this @apidoc[Flow] will backpressure. and any in transit messages will be lost. During backoff, this @apidoc[Flow] will backpressure.
This uses the same exponential backoff algorithm as @apidoc[Backoff$]. This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
See also:
* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
## Reactive Streams semantics ## Reactive Streams semantics
@ -31,4 +40,6 @@ This uses the same exponential backoff algorithm as @apidoc[Backoff$].
**backpressures** during backoff and when the wrapped flow backpressures **backpressures** during backoff and when the wrapped flow backpressures
**completes** when the wrapped flow completes or `maxRestarts` are reached within the given time limit
@@@ @@@

View file

@ -6,20 +6,30 @@ Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it f
## Signature ## Signature
@apidoc[RestartFlow.withBackoff](RestartFlow$) { scala="#withBackoff[In,Out](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#withBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" } @apidoc[RestartFlow.withBackoff](RestartFlow$) { scala="#withBackoff[In,Out](settings:akka.stream.RestartSettings)(flowFactory:()=>akka.stream.scaladsl.Flow[In,Out,_]):akka.stream.scaladsl.Flow[In,Out,akka.NotUsed]" java="#withBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description ## Description
The resulting @apidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it completes or fails using exponential backoff.
completed. Any termination by the @apidoc[Flow] before that time will be handled by restarting it. Any termination The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
signals sent to this @apidoc[Flow] however will terminate the wrapped @apidoc[Flow], if it's running, and then the @apidoc[Flow]
will be allowed to terminate without being restarted. This @apidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
completed. Any termination by the @apidoc[Flow] before that time will be handled by restarting it as long as maxRestarts
is not reached. Any termination signals sent to this @apidoc[Flow] however will terminate the wrapped @apidoc[Flow], if it's
running, and then the @apidoc[Flow] will be allowed to terminate without being restarted.
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. A termination signal from either end of the wrapped @apidoc[Flow] will cause the other end to be terminated, messages. A termination signal from either end of the wrapped @apidoc[Flow] will cause the other end to be terminated,
and any in transit messages will be lost. During backoff, this @apidoc[Flow] will backpressure. and any in transit messages will be lost. During backoff, this @apidoc[Flow] will backpressure.
This uses the same exponential backoff algorithm as @apidoc[Backoff$]. This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
See also:
* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
## Reactive Streams semantics ## Reactive Streams semantics
@ -29,6 +39,6 @@ This uses the same exponential backoff algorithm as @apidoc[Backoff$].
**backpressures** during backoff and when the wrapped flow backpressures **backpressures** during backoff and when the wrapped flow backpressures
**completes** when the wrapped flow completes **completes** when `maxRestarts` are reached within the given time limit
@@@ @@@

View file

@ -6,19 +6,38 @@ Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it f
## Signature ## Signature
@apidoc[RestartSink.withBackoff](RestartSink$) { scala="#withBackoff[T](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double,maxRestarts:Int)(sinkFactory:()=>akka.stream.scaladsl.Sink[T,_]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#withBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" } @apidoc[RestartSink.withBackoff](RestartSink$) { scala="#withBackoff[T](settings:akka.stream.RestartSettings)(sinkFactory:()=>akka.stream.scaladsl.Sink[T,_]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#withBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description ## Description
This @apidoc[Sink] will never cancel, since cancellation by the wrapped @apidoc[Sink] is always handled by restarting it. Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it completes or fails using exponential backoff.
The wrapped @apidoc[Sink] can however be completed by feeding a completion or error into this @apidoc[Sink]. When that The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
happens, the @apidoc[Sink], if currently running, will terminate and will not be restarted. This can be triggered
simply by the upstream completing, or externally by introducing a @apidoc[KillSwitch] right before this @apidoc[Sink] in the This @apidoc[Sink] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped @apidoc[Sink]
graph. is handled by restarting it. The wrapped @apidoc[Sink] can however be completed by feeding a completion or error into
this @apidoc[Sink]. When that happens, the @apidoc[Sink], if currently running, will terminate and will not be restarted.
This can be triggered simply by the upstream completing, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right
before this @apidoc[Sink] in the graph.
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. When the wrapped @apidoc[Sink] does cancel, this @apidoc[Sink] will backpressure, however any elements already messages. When the wrapped @apidoc[Sink] does cancel, this @apidoc[Sink] will backpressure, however any elements already
sent may have been lost. sent may have been lost.
This uses the same exponential backoff algorithm as @apidoc[Backoff$]. This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
See also:
* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
## Reactive Streams semantics
@@@div { .callout }
**backpressures** during backoff and when the wrapped sink backpressures
**completes** when upstream completes or when `maxRestarts` are reached within the given time limit
@@@

View file

@ -1,26 +1,29 @@
# RestartSource.onFailuresWithBackoff # RestartSource.onFailuresWithBackoff
Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.
@ref[Error handling](../index.md#error-handling) @ref[Error handling](../index.md#error-handling)
## Signature ## Signature
@apidoc[RestartSource.onFailuresWithBackoff](RestartSource$) { scala="#onFailuresWithBackoff[T](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#onFailuresWithBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" } @apidoc[RestartSource.onFailuresWithBackoff](RestartSource$) { scala="#onFailuresWithBackoff[T](settings:akka.stream.RestartSettings)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#onFailuresWithBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description ## Description
Wraps the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Wraps the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using exponential backoff.
The backoff resets back to `minBackoff` if there hasn't been a failure within `minBackoff`. The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
This @apidoc[Source] will never emit a failure, since the failure of the wrapped @apidoc[Source] is always handled by This @apidoc[Source] will not emit a failure as long as maxRestarts is not reached.
restarting. The wrapped @apidoc[Source] can be completed by completing this @apidoc[Source]. The failure of the wrapped @apidoc[Source] is handled by restarting it.
When that happens, the wrapped @apidoc[Source], if currently running will be cancelled, and it will not be restarted. However, the wrapped @apidoc[Source] can be cancelled by cancelling this @apidoc[Source].
This can be triggered by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right When that happens, the wrapped @apidoc[Source], if currently running will, be cancelled and not restarted.
after this @apidoc[Source] in the graph. This can be triggered by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right after this @apidoc[Source] in the graph.
This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
See also: See also:
* @ref:[RestartSource.withBackoff](../RestartSource/withBackoff.md)
* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md) * @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md) * @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md) * @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
@ -61,4 +64,10 @@ Java
**emits** when the wrapped source emits **emits** when the wrapped source emits
**backpressures** during backoff and when downstream backpressures
**completes** when the wrapped source completes or `maxRestarts` are reached within the given time limit
**cancels** when downstream cancels
@@@ @@@

View file

@ -1,22 +1,33 @@
# RestartSource.withBackoff # RestartSource.withBackoff
Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or complete using an exponential backoff. Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or completes using an exponential backoff.
@ref[Error handling](../index.md#error-handling) @ref[Error handling](../index.md#error-handling)
## Signature ## Signature
@apidoc[RestartSource.withBackoff](RestartSource$) { scala="#withBackoff[T](minBackoff:scala.concurrent.duration.FiniteDuration,maxBackoff:scala.concurrent.duration.FiniteDuration,randomFactor:Double,maxRestarts:Int)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#withBackoff(java.time.Duration,java.time.Duration,double,int,akka.japi.function.Creator)" } @apidoc[RestartSource.withBackoff](RestartSource$) { scala="#withBackoff[T](settings:akka.stream.RestartSettings)(sourceFactory:()=>akka.stream.scaladsl.Source[T,_]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#withBackoff(akka.stream.RestartSettings,akka.japi.function.Creator)" }
## Description ## Description
This @apidoc[Flow] will never emit a complete or failure, since the completion or failure of the wrapped @apidoc[Source] Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it completes or fails using exponential backoff.
is always handled by restarting it. The wrapped @apidoc[Source] can however be cancelled by cancelling this @apidoc[Source]. The backoff resets back to `minBackoff` if there hasn't been a restart within `maxRestartsWithin` (which defaults to `minBackoff`).
When that happens, the wrapped @apidoc[Source], if currently running will be cancelled, and it will not be restarted.
This can be triggered simply by the downstream cancelling, or externally by introducing a @apidoc[KillSwitch] right This @apidoc[Source] will not emit a complete or fail as long as maxRestarts is not reached, since the completion
or failure of the wrapped @apidoc[Source] is handled by restarting it. The wrapped @apidoc[Source] can however be cancelled
by cancelling this @apidoc[Source]. When that happens, the wrapped @apidoc[Source], if currently running, will be cancelled,
and it will not be restarted.
This can be triggered simply by the downstream cancelling, or externally by introducing a @ref[KillSwitch](../../stream-dynamic.md#controlling-stream-completion-with-killswitch) right
after this @apidoc[Source] in the graph. after this @apidoc[Source] in the graph.
This uses the same exponential backoff algorithm as @apidoc[Backoff$]. This uses the same exponential backoff algorithm as @apidoc[BackoffOpts$].
See also:
* @ref:[RestartSource.onFailuresWithBackoff](../RestartSource/onFailuresWithBackoff.md)
* @ref:[RestartFlow.onFailuresWithBackoff](../RestartFlow/onFailuresWithBackoff.md)
* @ref:[RestartFlow.withBackoff](../RestartFlow/withBackoff.md)
* @ref:[RestartSink.withBackoff](../RestartSink/withBackoff.md)
## Reactive Streams semantics ## Reactive Streams semantics
@ -24,6 +35,10 @@ This uses the same exponential backoff algorithm as @apidoc[Backoff$].
**emits** when the wrapped source emits **emits** when the wrapped source emits
**completes** when the wrapped source completes **backpressures** during backoff and when downstream backpressures
**completes** when `maxRestarts` are reached within the given time limit
**cancels** when downstream cancels
@@@ @@@

View file

@ -344,9 +344,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
| |Operator|Description| | |Operator|Description|
|--|--|--| |--|--|--|
|RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff.| |RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.|
|RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.| |RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.|
|RestartSource|<a name="withbackoff"></a>@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or complete using an exponential backoff.| |RestartSource|<a name="withbackoff"></a>@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or completes using an exponential backoff.|
|RestartFlow|<a name="withbackoff"></a>@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails or complete using an exponential backoff.| |RestartFlow|<a name="withbackoff"></a>@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails or complete using an exponential backoff.|
|RestartSink|<a name="withbackoff"></a>@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it fails or complete using an exponential backoff.| |RestartSink|<a name="withbackoff"></a>@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it fails or complete using an exponential backoff.|
|RetryFlow|<a name="withbackoff"></a>@ref[withBackoff](RetryFlow/withBackoff.md)|Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.| |RetryFlow|<a name="withbackoff"></a>@ref[withBackoff](RetryFlow/withBackoff.md)|Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.|

View file

@ -114,6 +114,15 @@ when a WebSocket connection fails due to the HTTP server it's running on going d
By using an exponential backoff, we avoid going into a tight reconnect loop, which both gives the HTTP server some time By using an exponential backoff, we avoid going into a tight reconnect loop, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side. to recover, and it avoids using needless resources on the client side.
The various restart shapes mentioned all expect an `akka.stream.RestartSettings` which configures the restart behaviour.
Configurable parameters are:
* `minBackoff` is the initial duration until the underlying stream is restarted
* `maxBackoff` caps the exponential backoff
* `randomFactor` allows addition of a random delay following backoff calculation
* `maxRestarts` caps the total number of restarts
* `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts`
The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`] The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`]
@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a @java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a
stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will

View file

@ -9,6 +9,7 @@ import akka.actor.ActorSystem;
import akka.stream.KillSwitch; import akka.stream.KillSwitch;
import akka.stream.KillSwitches; import akka.stream.KillSwitches;
import akka.stream.Materializer; import akka.stream.Materializer;
import akka.stream.RestartSettings;
import akka.stream.javadsl.Keep; import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource; import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
@ -61,12 +62,18 @@ public class RestartDocTest {
public void recoverWithBackoffSource() { public void recoverWithBackoffSource() {
// #restart-with-backoff-source // #restart-with-backoff-source
RestartSettings settings =
RestartSettings.create(
Duration.ofSeconds(3), // min backoff
Duration.ofSeconds(30), // max backoff
0.2 // adds 20% "noise" to vary the intervals slightly
)
.withMaxRestarts(
20, Duration.ofMinutes(5)); // limits the amount of restarts to 20 within 5 minutes
Source<ServerSentEvent, NotUsed> eventStream = Source<ServerSentEvent, NotUsed> eventStream =
RestartSource.withBackoff( RestartSource.withBackoff(
Duration.ofSeconds(3), // min backoff settings,
Duration.ofSeconds(30), // max backoff
0.2, // adds 20% "noise" to vary the intervals slightly
20, // limits the amount of restarts to 20
() -> () ->
// Create a source from a future of a source // Create a source from a future of a source
Source.completionStageSource( Source.completionStageSource(

View file

@ -8,6 +8,7 @@ import akka.NotUsed;
import akka.actor.Cancellable; import akka.actor.Cancellable;
import akka.japi.Creator; import akka.japi.Creator;
import akka.stream.KillSwitches; import akka.stream.KillSwitches;
import akka.stream.RestartSettings;
import akka.stream.UniqueKillSwitch; import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Keep; import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource; import akka.stream.javadsl.RestartSource;
@ -34,7 +35,8 @@ public class Restart {
})); }));
Source<Creator<Integer>, NotUsed> forever = Source<Creator<Integer>, NotUsed> forever =
RestartSource.onFailuresWithBackoff( RestartSource.onFailuresWithBackoff(
Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource); RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
() -> flakySource);
forever.runWith( forever.runWith(
Sink.foreach((Creator<Integer> nr) -> system.log().info("{}", nr.create())), system); Sink.foreach((Creator<Integer> nr) -> system.log().info("{}", nr.create())), system);
// logs // logs
@ -99,7 +101,8 @@ public class Restart {
})); }));
UniqueKillSwitch stopRestarting = UniqueKillSwitch stopRestarting =
RestartSource.onFailuresWithBackoff( RestartSource.onFailuresWithBackoff(
Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource) RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
() -> flakySource)
.viaMat(KillSwitches.single(), Keep.right()) .viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.foreach(nr -> System.out.println("nr " + nr.create())), Keep.left()) .toMat(Sink.foreach(nr -> System.out.println("nr " + nr.create())), Keep.left())
.run(system); .run(system);

View file

@ -5,7 +5,7 @@
package docs.stream package docs.stream
import akka.NotUsed import akka.NotUsed
import akka.stream.KillSwitches import akka.stream.{ KillSwitches, RestartSettings }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import docs.CompileOnlySpec import docs.CompileOnlySpec
@ -34,12 +34,13 @@ class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
"demonstrate a restart with backoff source" in compileOnlySpec { "demonstrate a restart with backoff source" in compileOnlySpec {
//#restart-with-backoff-source //#restart-with-backoff-source
val restartSource = RestartSource.withBackoff( val settings = RestartSettings(
minBackoff = 3.seconds, minBackoff = 3.seconds,
maxBackoff = 30.seconds, maxBackoff = 30.seconds,
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
maxRestarts = 20 // limits the amount of restarts to 20 ).withMaxRestarts(20, 5.minutes) // limits the amount of restarts to 20 within 5 minutes
) { () =>
val restartSource = RestartSource.withBackoff(settings) { () =>
// Create a source from a future of a source // Create a source from a future of a source
Source.futureSource { Source.futureSource {
// Make a single request with akka-http // Make a single request with akka-http

View file

@ -6,8 +6,7 @@ package docs.stream.operators.source
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.KillSwitches import akka.stream.{ KillSwitches, RestartSettings, UniqueKillSwitch }
import akka.stream.UniqueKillSwitch
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RestartSource import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
@ -31,7 +30,8 @@ object Restart extends App {
val flakySource: Source[() => Int, NotUsed] = val flakySource: Source[() => Int, NotUsed] =
Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn"))) Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val forever = val forever =
RestartSource.onFailuresWithBackoff(minBackoff = 1.second, maxBackoff = 10.seconds, 0.1)(() => flakySource) RestartSource.onFailuresWithBackoff(
RestartSettings(minBackoff = 1.second, maxBackoff = 10.seconds, randomFactor = 0.1))(() => flakySource)
forever.runWith(Sink.foreach(nr => system.log.info("{}", nr()))) forever.runWith(Sink.foreach(nr => system.log.info("{}", nr())))
// logs // logs
//[INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 1 //[INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 1
@ -56,7 +56,7 @@ object Restart extends App {
//#restart-failure-inner-complete //#restart-failure-inner-complete
val finiteSource = Source.tick(1.second, 1.second, "tick").take(3) val finiteSource = Source.tick(1.second, 1.second, "tick").take(3)
val forever = RestartSource.onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => finiteSource) val forever = RestartSource.onFailuresWithBackoff(RestartSettings(1.second, 10.seconds, 0.1))(() => finiteSource)
forever.runWith(Sink.foreach(println)) forever.runWith(Sink.foreach(println))
// prints // prints
// tick // tick
@ -71,7 +71,7 @@ object Restart extends App {
Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn"))) Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val stopRestarting: UniqueKillSwitch = val stopRestarting: UniqueKillSwitch =
RestartSource RestartSource
.onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => flakySource) .onFailuresWithBackoff(RestartSettings(1.second, 10.seconds, 0.1))(() => flakySource)
.viaMat(KillSwitches.single)(Keep.right) .viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(nr => println(s"Nr ${nr()}")))(Keep.left) .toMat(Sink.foreach(nr => println(s"Nr ${nr()}")))(Keep.left)
.run() .run()

View file

@ -55,8 +55,7 @@ import akka.persistence.typed.internal.JournalInteractions.EventToPersist
import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.Effect
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
import akka.stream.SystemMaterializer import akka.stream.{ RestartSettings, SystemMaterializer, WatchedActorTerminatedException }
import akka.stream.WatchedActorTerminatedException
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ RestartSource, Sink } import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow import akka.stream.typed.scaladsl.ActorFlow
@ -144,7 +143,7 @@ private[akka] object Running {
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
val source = RestartSource val source = RestartSource
.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => .withBackoff(RestartSettings(2.seconds, 10.seconds, randomFactor = 0.2)) { () =>
Source.futureSource { Source.futureSource {
setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr => setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr =>
replication replication

View file

@ -35,6 +35,7 @@ import akka.stream.Attributes.LogLevels
import akka.stream.IgnoreComplete import akka.stream.IgnoreComplete
import akka.stream.KillSwitches import akka.stream.KillSwitches
import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.RestartSettings
import akka.stream.SharedKillSwitch import akka.stream.SharedKillSwitch
import akka.stream.SinkShape import akka.stream.SinkShape
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
@ -208,10 +209,8 @@ private[remote] class ArteryTcpTransport(
// stream. For message stream it's best effort retry a few times. // stream. For message stream it's best effort retry a few times.
RestartFlow RestartFlow
.withBackoff[ByteString, ByteString]( .withBackoff[ByteString, ByteString](
settings.Advanced.OutboundRestartBackoff, RestartSettings(settings.Advanced.OutboundRestartBackoff, settings.Advanced.OutboundRestartBackoff * 5, 0.1)
settings.Advanced.OutboundRestartBackoff * 5, .withMaxRestarts(maxRestarts, settings.Advanced.OutboundRestartBackoff))(flowFactory)
0.1,
maxRestarts)(flowFactory)
// silence "Restarting graph due to failure" logging by RestartFlow // silence "Restarting graph due to failure" logging by RestartFlow
.addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) .addAttributes(Attributes.logLevels(onFailure = LogLevels.Off))

View file

@ -10,11 +10,9 @@ import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.stream.Attributes import akka.stream.{ Attributes, OverflowStrategy, RestartSettings }
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestPublisher
@ -35,11 +33,14 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
private val minBackoff = 1.second.dilated private val minBackoff = 1.second.dilated
private val maxBackoff = 3.seconds.dilated private val maxBackoff = 3.seconds.dilated
private val shortRestartSettings = RestartSettings(shortMinBackoff, shortMaxBackoff, 0)
private val restartSettings = RestartSettings(minBackoff, maxBackoff, 0)
"A restart with backoff source" should { "A restart with backoff source" should {
"run normally" in assertAllStagesStopped { "run normally" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source.repeat("a") Source.repeat("a")
} }
@ -59,7 +60,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"restart on completion" in assertAllStagesStopped { "restart on completion" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a", "b")) Source(List("a", "b"))
} }
@ -79,7 +80,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"restart on failure" in assertAllStagesStopped { "restart on failure" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a", "b", "c")).map { Source(List("a", "b", "c")).map {
case "c" => throw TE("failed") case "c" => throw TE("failed")
@ -102,7 +103,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"backoff before restart" in assertAllStagesStopped { "backoff before restart" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(minBackoff, maxBackoff, 0) { () => .withBackoff(restartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a", "b")) Source(List("a", "b"))
} }
@ -126,7 +127,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped { "reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(minBackoff, maxBackoff, 0) { () => .withBackoff(restartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a", "b")) Source(List("a", "b"))
} }
@ -160,7 +161,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val created = new AtomicInteger() val created = new AtomicInteger()
val promise = Promise[Done]() val promise = Promise[Done]()
val probe = RestartSource val probe = RestartSource
.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source.repeat("a").watchTermination() { (_, term) => Source.repeat("a").watchTermination() { (_, term) =>
promise.completeWith(term) promise.completeWith(term)
@ -181,7 +182,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"not restart the source when cancelled while backing off" in assertAllStagesStopped { "not restart the source when cancelled while backing off" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(minBackoff, maxBackoff, 0) { () => .withBackoff(restartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source.single("a") Source.single("a")
} }
@ -200,7 +201,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"stop on completion if it should only be restarted in failures" in assertAllStagesStopped { "stop on completion if it should only be restarted in failures" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .onFailuresWithBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a", "b", "c")).map { Source(List("a", "b", "c")).map {
case "c" => if (created.get() == 1) throw TE("failed") else "c" case "c" => if (created.get() == 1) throw TE("failed") else "c"
@ -225,7 +226,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"restart on failure when only due to failures should be restarted" in assertAllStagesStopped { "restart on failure when only due to failures should be restarted" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.onFailuresWithBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .onFailuresWithBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a", "b", "c")).map { Source(List("a", "b", "c")).map {
case "c" => throw TE("failed") case "c" => throw TE("failed")
@ -249,7 +250,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"not restart the source when maxRestarts is reached" in assertAllStagesStopped { "not restart the source when maxRestarts is reached" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { () => .withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
created.incrementAndGet() created.incrementAndGet()
Source.single("a") Source.single("a")
} }
@ -267,7 +268,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
"reset maxRestarts when source runs for at least minimum backoff without completing" in assertAllStagesStopped { "reset maxRestarts when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger() val created = new AtomicInteger()
val probe = RestartSource val probe = RestartSource
.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { () => .withBackoff(restartSettings.withMaxRestarts(2, minBackoff)) { () =>
created.incrementAndGet() created.incrementAndGet()
Source(List("a")) Source(List("a"))
} }
@ -290,6 +291,30 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
probe.cancel() probe.cancel()
} }
"allow using withMaxRestarts instead of minBackoff to determine the maxRestarts reset time" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource
.withBackoff(shortRestartSettings.withMaxRestarts(2, 1.second)) { () =>
created.incrementAndGet()
Source(List("a", "b")).takeWhile(_ != "b")
}
.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("a")
Thread.sleep((shortMinBackoff + (shortMinBackoff * 2) + shortMinBackoff).toMillis) // if using shortMinBackoff as deadline cause reset
probe.requestNext("a")
probe.request(1)
probe.expectComplete()
created.get() should ===(3)
probe.cancel()
}
} }
"A restart with backoff sink" should { "A restart with backoff sink" should {
@ -298,7 +323,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val result = Promise[Seq[String]]() val result = Promise[Seq[String]]()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .toMat(RestartSink.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Sink.seq.mapMaterializedValue(result.completeWith) Sink.seq.mapMaterializedValue(result.completeWith)
})(Keep.left) })(Keep.left)
@ -318,7 +343,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .toMat(RestartSink.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext)) Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left) })(Keep.left)
@ -344,7 +369,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () => .toMat(RestartSink.withBackoff(restartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext)) Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left) })(Keep.left)
@ -371,7 +396,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () => .toMat(RestartSink.withBackoff(restartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext)) Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left) })(Keep.left)
@ -413,7 +438,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0) { () => .toMat(RestartSink.withBackoff(restartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext)) Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left) })(Keep.left)
@ -438,7 +463,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(shortMinBackoff, shortMaxBackoff, 0, maxRestarts = 1) { () => .toMat(RestartSink.withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext)) Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left) })(Keep.left)
@ -462,7 +487,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource val probe = TestSource
.probe[String] .probe[String]
.toMat(RestartSink.withBackoff(minBackoff, maxBackoff, 0, maxRestarts = 2) { () => .toMat(RestartSink.withBackoff(restartSettings.withMaxRestarts(2, minBackoff)) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext)) Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left) })(Keep.left)
@ -476,7 +501,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
// The probe should now be backing off for 2 * minBackoff // The probe should now be backing off for 2 * minBackoff
// Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the // Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the
// subsequent minBackoff min backoff to pass, so it resets the restart count // subsequent minBackoff to pass, so it resets the restart count
Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis) Thread.sleep((minBackoff + (minBackoff * 2) + minBackoff + 500.millis).toMillis)
probe.sendNext("cancel") probe.sendNext("cancel")
@ -491,20 +516,50 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
sinkProbe.cancel() sinkProbe.cancel()
probe.sendComplete() probe.sendComplete()
} }
"allow using withMaxRestarts instead of minBackoff to determine the maxRestarts reset time" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource
.probe[String]
.toMat(RestartSink.withBackoff(shortRestartSettings.withMaxRestarts(2, 1.second)) { () =>
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true).to(Sink.foreach(queue.sendNext))
})(Keep.left)
.run()
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// There should be a shortMinBackoff delay
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// The probe should now be backing off for 2 * shortMinBackoff
Thread.sleep((shortMinBackoff + (shortMinBackoff * 2) + minBackoff).toMillis) // if using shortMinBackoff as deadline cause reset
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// We cannot get a final element
probe.sendNext("cancel")
sinkProbe.request(1)
sinkProbe.expectNoMessage()
created.get() should ===(3)
sinkProbe.cancel()
probe.sendComplete()
}
} }
"A restart with backoff flow" should { "A restart with backoff flow" should {
// helps reuse all the setupFlow code for both methods: withBackoff, and onlyOnFailuresWithBackoff // helps reuse all the setupFlow code for both methods: withBackoff, and onlyOnFailuresWithBackoff
def RestartFlowFactory[In, Out](onlyOnFailures: Boolean) def RestartFlowFactory[In, Out](
: (FiniteDuration, FiniteDuration, Double, Int) => (() => Flow[In, Out, _]) => Flow[In, Out, NotUsed] = onlyOnFailures: Boolean,
if (onlyOnFailures) { settings: RestartSettings): (() => Flow[In, Out, _]) => Flow[In, Out, NotUsed] =
RestartFlow.onFailuresWithBackoff if (onlyOnFailures) RestartFlow.onFailuresWithBackoff(settings)
} else { else RestartFlow.withBackoff(settings)
// choose the correct backoff method
(minBackoff, maxBackoff, randomFactor, maxRestarts) =>
RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts)
}
def setupFlow( def setupFlow(
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
@ -523,32 +578,35 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
// simply use the probes as a message bus for feeding and capturing events. // simply use the probes as a message bus for feeding and capturing events.
val (source, sink) = TestSource val (source, sink) = TestSource
.probe[String] .probe[String]
.viaMat(RestartFlowFactory(onlyOnFailures)(minBackoff, maxBackoff, 0, maxRestarts) { () => .viaMat(
created.incrementAndGet() RestartFlowFactory(
Flow.fromSinkAndSource( onlyOnFailures,
Flow[String] RestartSettings(minBackoff, maxBackoff, 0).withMaxRestarts(maxRestarts, minBackoff)) { () =>
.takeWhile(_ != "cancel") created.incrementAndGet()
.map { Flow.fromSinkAndSource(
case "in error" => throw TE("in error") Flow[String]
case other => other .takeWhile(_ != "cancel")
} .map {
.to(Sink case "in error" => throw TE("in error")
.foreach(flowInSource.sendNext) case other => other
.mapMaterializedValue(_.onComplete { }
case Success(_) => flowInSource.sendNext("in complete") .to(Sink
case Failure(_) => flowInSource.sendNext("in error") .foreach(flowInSource.sendNext)
})), .mapMaterializedValue(_.onComplete {
flowOutSource case Success(_) => flowInSource.sendNext("in complete")
.takeWhile(_ != "complete") case Failure(_) => flowInSource.sendNext("in error")
.map { })),
case "error" => throw TE("error") flowOutSource
case other => other .takeWhile(_ != "complete")
} .map {
.watchTermination()((_, term) => case "error" => throw TE("error")
term.foreach(_ => { case other => other
flowInSource.sendNext("out complete") }
}))) .watchTermination()((_, term) =>
})(Keep.left) term.foreach(_ => {
flowInSource.sendNext("out complete")
})))
})(Keep.left)
.toMat(TestSink.probe[String])(Keep.both) .toMat(TestSink.probe[String])(Keep.both)
.run() .run()
@ -559,7 +617,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val created = new AtomicInteger() val created = new AtomicInteger()
val (source, sink) = TestSource val (source, sink) = TestSource
.probe[String] .probe[String]
.viaMat(RestartFlow.withBackoff(shortMinBackoff, shortMaxBackoff, 0) { () => .viaMat(RestartFlow.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet() created.incrementAndGet()
Flow[String] Flow[String]
})(Keep.left) })(Keep.left)
@ -810,7 +868,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val restartOnFailures = val restartOnFailures =
RestartFlow RestartFlow
.onFailuresWithBackoff(1.second, 2.seconds, 0.2, 2)(() => { .onFailuresWithBackoff(RestartSettings(1.second, 2.seconds, 0.2).withMaxRestarts(2, 1.second))(() => {
flowCreations.incrementAndGet() flowCreations.incrementAndGet()
failsSomeTimes failsSomeTimes
}) })

View file

@ -0,0 +1,6 @@
# Changes to private internals
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSink.this")

View file

@ -0,0 +1,81 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream
import scala.concurrent.duration.FiniteDuration
import akka.util.JavaDurationConverters._
final class RestartSettings private (
val minBackoff: FiniteDuration,
val maxBackoff: FiniteDuration,
val randomFactor: Double,
val maxRestarts: Int,
val maxRestartsWithin: FiniteDuration) {
/** Scala API: minimum (initial) duration until the child actor will started again, if it is terminated */
def withMinBackoff(value: FiniteDuration): RestartSettings = copy(minBackoff = value)
/** Java API: minimum (initial) duration until the child actor will started again, if it is terminated */
def withMinBackoff(value: java.time.Duration): RestartSettings = copy(minBackoff = value.asScala)
/** Scala API: the exponential back-off is capped to this duration */
def withMaxBackoff(value: FiniteDuration): RestartSettings = copy(maxBackoff = value)
/** Java API: the exponential back-off is capped to this duration */
def withMaxBackoff(value: java.time.Duration): RestartSettings = copy(maxBackoff = value.asScala)
/**
* After calculation of the exponential back-off an additional random delay based on this factor is added
* e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`
*/
def withRandomFactor(value: Double): RestartSettings = copy(randomFactor = value)
/** Scala API: The amount of restarts is capped to `count` within a timeframe of `within` */
def withMaxRestarts(count: Int, within: FiniteDuration): RestartSettings =
copy(maxRestarts = count, maxRestartsWithin = within)
/** Java API: The amount of restarts is capped to `count` within a timeframe of `within` */
def withMaxRestarts(count: Int, within: java.time.Duration): RestartSettings =
copy(maxRestarts = count, maxRestartsWithin = within.asScala)
override def toString: String =
"RestartSettings(" +
s"minBackoff=$minBackoff," +
s"maxBackoff=$maxBackoff," +
s"randomFactor=$randomFactor," +
s"maxRestarts=$maxRestarts," +
s"maxRestartsWithin=$maxRestartsWithin)"
private def copy(
minBackoff: FiniteDuration = minBackoff,
maxBackoff: FiniteDuration = maxBackoff,
randomFactor: Double = randomFactor,
maxRestarts: Int = maxRestarts,
maxRestartsWithin: FiniteDuration = maxRestartsWithin): RestartSettings =
new RestartSettings(minBackoff, maxBackoff, randomFactor, maxRestarts, maxRestartsWithin)
}
object RestartSettings {
/** Scala API */
def apply(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): RestartSettings =
new RestartSettings(
minBackoff = minBackoff,
maxBackoff = maxBackoff,
randomFactor = randomFactor,
maxRestarts = Int.MaxValue,
maxRestartsWithin = minBackoff)
/** Java API */
def create(minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double): RestartSettings =
new RestartSettings(
minBackoff = minBackoff.asScala,
maxBackoff = maxBackoff.asScala,
randomFactor = randomFactor,
maxRestarts = Int.MaxValue,
maxRestartsWithin = minBackoff.asScala)
}

View file

@ -6,10 +6,9 @@ package akka.stream.javadsl
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import com.github.ghik.silencer.silent
import akka.NotUsed import akka.NotUsed
import akka.japi.function.Creator import akka.japi.function.Creator
import akka.stream.RestartSettings
/** /**
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
@ -33,7 +32,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -50,11 +49,8 @@ object RestartFlow {
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
.withBackoff(minBackoff, maxBackoff, randomFactor) { () => withBackoff(settings, flowFactory)
flowFactory.create().asScala
}
.asJava
} }
/** /**
@ -70,7 +66,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -80,14 +76,15 @@ object RestartFlow {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out]( def withBackoff[In, Out](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory) withBackoff(settings, flowFactory)
} }
/** /**
@ -103,7 +100,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -123,11 +120,8 @@ object RestartFlow {
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => withBackoff(settings, flowFactory)
flowFactory.create().asScala
}
.asJava
} }
/** /**
@ -143,7 +137,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -155,17 +149,43 @@ object RestartFlow {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out]( def withBackoff[In, Out](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) withBackoff(settings, flowFactory)
} }
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](settings: RestartSettings, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] =
akka.stream.scaladsl.RestartFlow
.withBackoff(settings) { () =>
flowFactory.create().asScala
}
.asJava
/** /**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff. * using an exponential backoff.
@ -179,7 +199,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -199,11 +219,8 @@ object RestartFlow {
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => onFailuresWithBackoff(settings, flowFactory)
flowFactory.create().asScala
}
.asJava
} }
/** /**
@ -219,7 +236,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -231,14 +248,42 @@ object RestartFlow {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[In, Out]( def onFailuresWithBackoff[In, Out](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) onFailuresWithBackoff(settings, flowFactory)
} }
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff.
*
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
* time will be handled by restarting it as long as maxRestarts is not reached.
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def onFailuresWithBackoff[In, Out](
settings: RestartSettings,
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] =
akka.stream.scaladsl.RestartFlow
.onFailuresWithBackoff(settings) { () =>
flowFactory.create().asScala
}
.asJava
} }

View file

@ -6,10 +6,9 @@ package akka.stream.javadsl
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import com.github.ghik.silencer.silent
import akka.NotUsed import akka.NotUsed
import akka.japi.function.Creator import akka.japi.function.Creator
import akka.stream.RestartSettings
/** /**
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
@ -34,7 +33,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost. * sent may have been lost.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -51,11 +50,8 @@ object RestartSink {
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
akka.stream.scaladsl.RestartSink val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
.withBackoff(minBackoff, maxBackoff, randomFactor) { () => withBackoff(settings, sinkFactory)
sinkFactory.create().asScala
}
.asJava
} }
/** /**
@ -72,7 +68,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost. * sent may have been lost.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -82,14 +78,15 @@ object RestartSink {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap. * @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T]( def withBackoff[T](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory) withBackoff(settings, sinkFactory)
} }
/** /**
@ -106,7 +103,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost. * sent may have been lost.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -126,11 +123,8 @@ object RestartSink {
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
akka.stream.scaladsl.RestartSink val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => withBackoff(settings, sinkFactory)
sinkFactory.create().asScala
}
.asJava
} }
/** /**
@ -147,7 +141,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost. * sent may have been lost.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -159,14 +153,41 @@ object RestartSink {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap. * @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T]( def withBackoff[T](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory) withBackoff(settings, sinkFactory)
} }
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
def withBackoff[T](settings: RestartSettings, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] =
akka.stream.scaladsl.RestartSink
.withBackoff(settings) { () =>
sinkFactory.create().asScala
}
.asJava
} }

View file

@ -6,10 +6,9 @@ package akka.stream.javadsl
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import com.github.ghik.silencer.silent
import akka.NotUsed import akka.NotUsed
import akka.japi.function.Creator import akka.japi.function.Creator
import akka.stream.RestartSettings
/** /**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
@ -30,7 +29,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -47,11 +46,8 @@ object RestartSource {
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
.withBackoff(minBackoff, maxBackoff, randomFactor) { () => withBackoff(settings, sourceFactory)
sourceFactory.create().asScala
}
.asJava
} }
/** /**
@ -64,7 +60,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -74,14 +70,15 @@ object RestartSource {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T]( def withBackoff[T](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) withBackoff(settings, sourceFactory)
} }
/** /**
@ -95,7 +92,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -115,11 +112,8 @@ object RestartSource {
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => withBackoff(settings, sourceFactory)
sourceFactory.create().asScala
}
.asJava
} }
/** /**
@ -133,7 +127,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -145,17 +139,41 @@ object RestartSource {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T]( def withBackoff[T](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) withBackoff(settings, sourceFactory)
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
def withBackoff[T](settings: RestartSettings, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] =
akka.stream.scaladsl.RestartSource
.withBackoff(settings) { () =>
sourceFactory.create().asScala
}
.asJava
/** /**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
* *
@ -165,7 +183,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -183,11 +201,8 @@ object RestartSource {
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () => onFailuresWithBackoff(settings, sourceFactory)
sourceFactory.create().asScala
}
.asJava
} }
/** /**
@ -199,7 +214,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -210,14 +225,15 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
* *
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T]( def onFailuresWithBackoff[T](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor)
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) onFailuresWithBackoff(settings, sourceFactory)
} }
/** /**
@ -229,7 +245,7 @@ object RestartSource {
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph. * introducing a [[KillSwitch]] right after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -250,11 +266,8 @@ object RestartSource {
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => onFailuresWithBackoff(settings, sourceFactory)
sourceFactory.create().asScala
}
.asJava
} }
/** /**
@ -266,7 +279,7 @@ object RestartSource {
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph. * introducing a [[KillSwitch]] right after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -279,14 +292,37 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
* *
*/ */
@silent("deprecated") @Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T]( def onFailuresWithBackoff[T](
minBackoff: java.time.Duration, minBackoff: java.time.Duration,
maxBackoff: java.time.Duration, maxBackoff: java.time.Duration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int, maxRestarts: Int,
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
import akka.util.JavaDurationConverters._ val settings = RestartSettings.create(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) onFailuresWithBackoff(settings, sourceFactory)
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
def onFailuresWithBackoff[T](settings: RestartSettings, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] =
akka.stream.scaladsl.RestartSource
.onFailuresWithBackoff(settings) { () =>
sourceFactory.create().asScala
}
.asJava
} }

View file

@ -39,7 +39,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -49,16 +49,12 @@ object RestartFlow {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = { flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph( val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
new RestartWithBackoffFlow( withBackoff(settings)(flowFactory)
flowFactory,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures = false,
Int.MaxValue))
} }
/** /**
@ -74,7 +70,7 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -86,19 +82,73 @@ object RestartFlow {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[In, Out]( def withBackoff[In, Out](
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = { maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph( val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
new RestartWithBackoffFlow( withBackoff(settings)(flowFactory)
flowFactory, }
minBackoff,
maxBackoff, /**
randomFactor, * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
onlyOnFailures = false, * backoff.
maxRestarts)) *
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](settings: RestartSettings)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] =
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, settings, onlyOnFailures = false))
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential
* backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow.
*
* This [[Flow]] will not emit any failure
* The failures by the wrapped [[Flow]] will be handled by
* restarting the wrapping [[Flow]] as long as maxRestarts is not reached.
* Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
onFailuresWithBackoff(settings)(flowFactory)
} }
/** /**
@ -115,36 +165,21 @@ object RestartFlow {
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param settings [[RestartSettings]] defining restart configuration
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap. * @param flowFactory A factory for producing the [[Flow]] to wrap.
*/ */
def onFailuresWithBackoff[In, Out]( def onFailuresWithBackoff[In, Out](settings: RestartSettings)(
minBackoff: FiniteDuration, flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] =
maxBackoff: FiniteDuration, Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, settings, onlyOnFailures = true))
randomFactor: Double,
maxRestarts: Int)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(
new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts))
}
} }
private final class RestartWithBackoffFlow[In, Out]( private final class RestartWithBackoffFlow[In, Out](
flowFactory: () => Flow[In, Out, _], flowFactory: () => Flow[In, Out, _],
minBackoff: FiniteDuration, settings: RestartSettings,
maxBackoff: FiniteDuration, onlyOnFailures: Boolean)
randomFactor: Double,
onlyOnFailures: Boolean,
maxRestarts: Int)
extends GraphStage[FlowShape[In, Out]] { self => extends GraphStage[FlowShape[In, Out]] { self =>
val in = Inlet[In]("RestartWithBackoffFlow.in") val in = Inlet[In]("RestartWithBackoffFlow.in")
@ -153,15 +188,7 @@ private final class RestartWithBackoffFlow[In, Out](
override def shape = FlowShape(in, out) override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = override def createLogic(inheritedAttributes: Attributes) =
new RestartWithBackoffLogic( new RestartWithBackoffLogic("Flow", shape, inheritedAttributes, settings, onlyOnFailures) {
"Flow",
shape,
inheritedAttributes,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures,
maxRestarts) {
val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration
var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
@ -218,14 +245,12 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
name: String, name: String,
shape: S, shape: S,
inheritedAttributes: Attributes, inheritedAttributes: Attributes,
minBackoff: FiniteDuration, settings: RestartSettings,
maxBackoff: FiniteDuration, onlyOnFailures: Boolean)
randomFactor: Double,
onlyOnFailures: Boolean,
maxRestarts: Int)
extends TimerGraphStageLogicWithLogging(shape) { extends TimerGraphStageLogicWithLogging(shape) {
import settings._
var restartCount = 0 var restartCount = 0
var resetDeadline = minBackoff.fromNow var resetDeadline = maxRestartsWithin.fromNow
// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
// don't want to restart the sub inlet when it finishes, we just finish normally. // don't want to restart the sub inlet when it finishes, we just finish normally.
@ -335,9 +360,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
} }
protected final def maxRestartsReached(): Boolean = { protected final def maxRestartsReached(): Boolean = {
// Check if the last start attempt was more than the minimum backoff // Check if the last start attempt was more than the reset deadline
if (resetDeadline.isOverdue()) { if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) log.debug("Last restart attempt was more than {} ago, resetting restart count", maxRestartsWithin)
restartCount = 0 restartCount = 0
} }
restartCount == maxRestarts restartCount == maxRestarts
@ -356,7 +381,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
// Invoked when the backoff timer ticks // Invoked when the backoff timer ticks
override protected def onTimer(timerKey: Any) = { override protected def onTimer(timerKey: Any) = {
startGraph() startGraph()
resetDeadline = minBackoff.fromNow resetDeadline = maxRestartsWithin.fromNow
} }
// When the stage starts, start the source // When the stage starts, start the source

View file

@ -7,7 +7,7 @@ package akka.stream.scaladsl
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.NotUsed import akka.NotUsed
import akka.stream.{ Attributes, Inlet, SinkShape } import akka.stream.{ Attributes, Inlet, RestartSettings, SinkShape }
import akka.stream.stage.{ GraphStage, InHandler } import akka.stream.stage.{ GraphStage, InHandler }
/** /**
@ -33,7 +33,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost. * sent may have been lost.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -43,9 +43,12 @@ object RestartSink {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap. * @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = { sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue)) val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
withBackoff(settings)(sinkFactory)
} }
/** /**
@ -62,7 +65,7 @@ object RestartSink {
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost. * sent may have been lost.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -74,33 +77,45 @@ object RestartSink {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sinkFactory A factory for producing the [[Sink]] to wrap. * @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)( def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(
sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = { sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts)) val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
withBackoff(settings)(sinkFactory)
} }
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
* before this [[Sink]] in the graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
def withBackoff[T](settings: RestartSettings)(sinkFactory: () => Sink[T, _]): Sink[T, NotUsed] =
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, settings))
} }
private final class RestartWithBackoffSink[T]( private final class RestartWithBackoffSink[T](sinkFactory: () => Sink[T, _], restartSettings: RestartSettings)
sinkFactory: () => Sink[T, _],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int)
extends GraphStage[SinkShape[T]] { self => extends GraphStage[SinkShape[T]] { self =>
val in = Inlet[T]("RestartWithBackoffSink.in") val in = Inlet[T]("RestartWithBackoffSink.in")
override def shape = SinkShape(in) override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = override def createLogic(inheritedAttributes: Attributes) =
new RestartWithBackoffLogic( new RestartWithBackoffLogic("Sink", shape, inheritedAttributes, restartSettings, onlyOnFailures = false) {
"Sink",
shape,
inheritedAttributes,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures = false,
maxRestarts) {
override protected def logSource = self.getClass override protected def logSource = self.getClass
override protected def startGraph() = { override protected def startGraph() = {

View file

@ -7,7 +7,7 @@ package akka.stream.scaladsl
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.NotUsed import akka.NotUsed
import akka.stream.{ Attributes, Outlet, SourceShape } import akka.stream.{ Attributes, Outlet, RestartSettings, SourceShape }
import akka.stream.stage.{ GraphStage, OutHandler } import akka.stream.stage.{ GraphStage, OutHandler }
/** /**
@ -29,7 +29,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -39,16 +39,12 @@ object RestartSource {
* In order to skip this additional delay pass in `0`. * In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph( val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
new RestartWithBackoffSource( withBackoff(settings)(sourceFactory)
sourceFactory,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures = false,
Int.MaxValue))
} }
/** /**
@ -62,7 +58,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -74,18 +70,33 @@ object RestartSource {
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)( def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph( val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
new RestartWithBackoffSource( withBackoff(settings)(sourceFactory)
sourceFactory,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures = false,
maxRestarts))
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
def withBackoff[T](settings: RestartSettings)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] =
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures = false))
/** /**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
* *
@ -95,7 +106,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -106,16 +117,12 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
* *
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)( def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(
sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph( val settings = RestartSettings(minBackoff, maxBackoff, randomFactor)
new RestartWithBackoffSource( onFailuresWithBackoff(settings)(sourceFactory)
sourceFactory,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures = true,
Int.MaxValue))
} }
/** /**
@ -128,7 +135,7 @@ object RestartSource {
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph. * after this [[Source]] in the graph.
* *
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
* *
* @param minBackoff minimum (initial) duration until the child actor will * @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated * started again, if it is terminated
@ -141,44 +148,48 @@ object RestartSource {
* @param sourceFactory A factory for producing the [[Source]] to wrap. * @param sourceFactory A factory for producing the [[Source]] to wrap.
* *
*/ */
@Deprecated
@deprecated("Use the overloaded method which accepts akka.stream.RestartSettings instead.", since = "2.6.10")
def onFailuresWithBackoff[T]( def onFailuresWithBackoff[T](
minBackoff: FiniteDuration, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, maxBackoff: FiniteDuration,
randomFactor: Double, randomFactor: Double,
maxRestarts: Int)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] = { maxRestarts: Int)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph( val settings = RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
new RestartWithBackoffSource( onFailuresWithBackoff(settings)(sourceFactory)
sourceFactory,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures = true,
maxRestarts))
} }
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
*
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
* and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.BackoffOpts]].
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*
*/
def onFailuresWithBackoff[T](settings: RestartSettings)(sourceFactory: () => Source[T, _]): Source[T, NotUsed] =
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures = true))
} }
private final class RestartWithBackoffSource[T]( private final class RestartWithBackoffSource[T](
sourceFactory: () => Source[T, _], sourceFactory: () => Source[T, _],
minBackoff: FiniteDuration, settings: RestartSettings,
maxBackoff: FiniteDuration, onlyOnFailures: Boolean)
randomFactor: Double,
onlyOnFailures: Boolean,
maxRestarts: Int)
extends GraphStage[SourceShape[T]] { self => extends GraphStage[SourceShape[T]] { self =>
val out = Outlet[T]("RestartWithBackoffSource.out") val out = Outlet[T]("RestartWithBackoffSource.out")
override def shape = SourceShape(out) override def shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) = override def createLogic(inheritedAttributes: Attributes) =
new RestartWithBackoffLogic( new RestartWithBackoffLogic("Source", shape, inheritedAttributes, settings, onlyOnFailures) {
"Source",
shape,
inheritedAttributes,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures,
maxRestarts) {
override protected def logSource = self.getClass override protected def logSource = self.getClass