Document 'withBackoff' srouce/flow/sink (#25770)
This commit is contained in:
parent
f4dd0ac79d
commit
4b012cc306
16 changed files with 1068 additions and 837 deletions
1
akka-docs/src/main/categories/error-handling.md
Normal file
1
akka-docs/src/main/categories/error-handling.md
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
For more background see the @ref[Error Handling in Streams](../stream-error.md) section.
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
# RestartFlow.onFailuresWithBackoff
|
||||||
|
|
||||||
|
Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @unidoc[Flow] will not restart on completion of the wrapped flow.
|
||||||
|
|
||||||
|
@ref[Error handling](../index.md#error-handling)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [RestartFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala) { #onFailuresWithBackoff }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
This @unidoc[Flow] will not emit any failure
|
||||||
|
The failures by the wrapped @unidoc[Flow] will be handled by
|
||||||
|
restarting the wrapping @unidoc[Flow] as long as maxRestarts is not reached.
|
||||||
|
Any termination signals sent to this @unidoc[Flow] however will terminate the wrapped @unidoc[Flow], if it's
|
||||||
|
running, and then the @unidoc[Flow] will be allowed to terminate without being restarted.
|
||||||
|
|
||||||
|
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
messages. A termination signal from either end of the wrapped @unidoc[Flow] will cause the other end to be terminated,
|
||||||
|
and any in transit messages will be lost. During backoff, this @unidoc[Flow] will backpressure.
|
||||||
|
|
||||||
|
This uses the same exponential backoff algorithm as @unidoc[Backoff].
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when the wrapped flow emits
|
||||||
|
|
||||||
|
**backpressures** during backoff and when the wrapped flow backpressures
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
# RestartFlow.withBackoff
|
||||||
|
|
||||||
|
Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails or complete using an exponential backoff.
|
||||||
|
|
||||||
|
@ref[Error handling](../index.md#error-handling)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [RestartFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala) { #withBackoff }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
The resulting @unidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
||||||
|
completed. Any termination by the @unidoc[Flow] before that time will be handled by restarting it. Any termination
|
||||||
|
signals sent to this @unidoc[Flow] however will terminate the wrapped @unidoc[Flow], if it's running, and then the @unidoc[Flow]
|
||||||
|
will be allowed to terminate without being restarted.
|
||||||
|
|
||||||
|
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
messages. A termination signal from either end of the wrapped @unidoc[Flow] will cause the other end to be terminated,
|
||||||
|
and any in transit messages will be lost. During backoff, this @unidoc[Flow] will backpressure.
|
||||||
|
|
||||||
|
This uses the same exponential backoff algorithm as @unidoc[Backoff].
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when the wrapped flow emits
|
||||||
|
|
||||||
|
**backpressures** during backoff and when the wrapped flow backpressures
|
||||||
|
|
||||||
|
**completes** when the wrapped flow completes
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
# RestartSink.withBackoff
|
||||||
|
|
||||||
|
Wrap the given @unidoc[Sink] with a @unidoc[Sink] that will restart it when it fails or complete using an exponential backoff.
|
||||||
|
|
||||||
|
@ref[Error handling](../index.md#error-handling)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [RestartSink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala) { #withBackoff }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
This @unidoc[Sink] will never cancel, since cancellation by the wrapped @unidoc[Sink] is always handled by restarting it.
|
||||||
|
The wrapped @unidoc[Sink] can however be completed by feeding a completion or error into this @unidoc[Sink]. When that
|
||||||
|
happens, the @unidoc[Sink], if currently running, will terminate and will not be restarted. This can be triggered
|
||||||
|
simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this @unidoc[Sink] in the
|
||||||
|
graph.
|
||||||
|
|
||||||
|
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
messages. When the wrapped @unidoc[Sink] does cancel, this @unidoc[Sink] will backpressure, however any elements already
|
||||||
|
sent may have been lost.
|
||||||
|
|
||||||
|
This uses the same exponential backoff algorithm as @unidoc[Backoff].
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
# RestartSource.onFailuresWithBackoff
|
||||||
|
|
||||||
|
Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails using an exponential backoff.
|
||||||
|
|
||||||
|
@ref[Error handling](../index.md#error-handling)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [RestartSource.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala) { #onFailuresWithBackoff }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
This @unidoc[Source] will never emit a failure, since the failure of the wrapped @unidoc[Source] is always handled by
|
||||||
|
restarting. The wrapped @unidoc[Source] can be cancelled by cancelling this @unidoc[Source].
|
||||||
|
When that happens, the wrapped @unidoc[Source], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
This can be triggered simply by the downstream cancelling, or externally by introducing a @unidoc[KillSwitch] right
|
||||||
|
after this @unidoc[Source] in the graph.
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when the wrapped source emits
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
# RestartSource.withBackoff
|
||||||
|
|
||||||
|
Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails or complete using an exponential backoff.
|
||||||
|
|
||||||
|
@ref[Error handling](../index.md#error-handling)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [RestartSource.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala) { #withBackoff }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
This @unidoc[Flow] will never emit a complete or failure, since the completion or failure of the wrapped @unidoc[Source]
|
||||||
|
is always handled by restarting it. The wrapped @unidoc[Source] can however be cancelled by cancelling this @unidoc[Source].
|
||||||
|
When that happens, the wrapped @unidoc[Source], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
This can be triggered simply by the downstream cancelling, or externally by introducing a @unidoc[KillSwitch] right
|
||||||
|
after this @unidoc[Source] in the graph.
|
||||||
|
|
||||||
|
This uses the same exponential backoff algorithm as @unidoc[Backoff].
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when the wrapped source emits
|
||||||
|
|
||||||
|
**completes** when the wrapped source completes
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -33,7 +33,7 @@ Scala
|
||||||
Java
|
Java
|
||||||
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue }
|
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue }
|
||||||
|
|
||||||
## Reactive Streams Semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -270,6 +270,18 @@ Operators meant for inter-operating between Akka Streams and Actors:
|
||||||
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`].|
|
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`].|
|
||||||
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.|
|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.|
|
||||||
|
|
||||||
|
## Error handling
|
||||||
|
|
||||||
|
For more background see the @ref[Error Handling in Streams](../stream-error.md) section.
|
||||||
|
|
||||||
|
| |Operator|Description|
|
||||||
|
|--|--|--|
|
||||||
|
|RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails using an exponential backoff.|
|
||||||
|
|RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @unidoc[Flow] will not restart on completion of the wrapped flow.|
|
||||||
|
|RestartSource|<a name="withbackoff"></a>@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails or complete using an exponential backoff.|
|
||||||
|
|RestartFlow|<a name="withbackoff"></a>@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails or complete using an exponential backoff.|
|
||||||
|
|RestartSink|<a name="withbackoff"></a>@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @unidoc[Sink] with a @unidoc[Sink] that will restart it when it fails or complete using an exponential backoff.|
|
||||||
|
|
||||||
@@@ index
|
@@@ index
|
||||||
|
|
||||||
* [combine](Source/combine.md)
|
* [combine](Source/combine.md)
|
||||||
|
|
@ -408,6 +420,11 @@ Operators meant for inter-operating between Akka Streams and Actors:
|
||||||
* [fromPath](FileIO/fromPath.md)
|
* [fromPath](FileIO/fromPath.md)
|
||||||
* [toFile](FileIO/toFile.md)
|
* [toFile](FileIO/toFile.md)
|
||||||
* [toPath](FileIO/toPath.md)
|
* [toPath](FileIO/toPath.md)
|
||||||
|
* [withBackoff](RestartSource/withBackoff.md)
|
||||||
|
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
|
||||||
|
* [withBackoff](RestartFlow/withBackoff.md)
|
||||||
|
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
|
||||||
|
* [withBackoff](RestartSink/withBackoff.md)
|
||||||
* [ask](ActorFlow/ask.md)
|
* [ask](ActorFlow/ask.md)
|
||||||
* [actorRef](ActorSink/actorRef.md)
|
* [actorRef](ActorSink/actorRef.md)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,591 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package akka.stream.javadsl
|
|
||||||
|
|
||||||
import akka.NotUsed
|
|
||||||
import akka.japi.function.Creator
|
|
||||||
|
|
||||||
import scala.concurrent.duration.FiniteDuration
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
|
|
||||||
*
|
|
||||||
* They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
|
|
||||||
* example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
|
|
||||||
* RestartSource ensures that the graph can continue running while the [[Source]] restarts.
|
|
||||||
*/
|
|
||||||
object RestartSource {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
|
|
||||||
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
|
|
||||||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
|
||||||
sourceFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
|
|
||||||
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
|
|
||||||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
|
||||||
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
|
||||||
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
|
||||||
* and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
|
||||||
sourceFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
|
||||||
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
|
||||||
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
|
||||||
* and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
|
|
||||||
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
|
|
||||||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
|
||||||
sourceFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
|
|
||||||
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
|
|
||||||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
|
||||||
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
|
||||||
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
|
||||||
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
|
|
||||||
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
|
||||||
sourceFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
|
||||||
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
|
||||||
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
|
||||||
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
|
|
||||||
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
|
|
||||||
*
|
|
||||||
* They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
|
|
||||||
* example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
|
|
||||||
* RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
|
|
||||||
*/
|
|
||||||
object RestartSink {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
|
|
||||||
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
|
|
||||||
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
|
|
||||||
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
|
|
||||||
* graph.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
|
||||||
* sent may have been lost.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
|
||||||
sinkFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
|
|
||||||
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
|
|
||||||
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
|
|
||||||
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
|
|
||||||
* graph.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
|
||||||
* sent may have been lost.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
|
|
||||||
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
|
|
||||||
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
|
|
||||||
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* before this [[Sink]] in the graph.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
|
||||||
* sent may have been lost.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
|
||||||
sinkFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
|
|
||||||
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
|
|
||||||
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
|
|
||||||
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* before this [[Sink]] in the graph.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
|
||||||
* sent may have been lost.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
|
|
||||||
*
|
|
||||||
* They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for
|
|
||||||
* example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The
|
|
||||||
* RestartFlow ensures that the graph can continue running while the [[Flow]] restarts.
|
|
||||||
*/
|
|
||||||
object RestartFlow {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
|
||||||
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
|
|
||||||
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
|
|
||||||
* will be allowed to terminate without being restarted.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
|
||||||
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
|
||||||
flowFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
|
||||||
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
|
|
||||||
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
|
|
||||||
* will be allowed to terminate without being restarted.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
|
||||||
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
|
||||||
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
|
|
||||||
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
|
|
||||||
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
|
||||||
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
|
||||||
flowFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
|
||||||
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
|
|
||||||
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
|
|
||||||
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
|
||||||
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
|
|
||||||
* using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
|
|
||||||
* time will be handled by restarting it as long as maxRestarts is not reached.
|
|
||||||
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
|
|
||||||
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
|
||||||
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
|
||||||
def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
|
||||||
akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
|
||||||
flowFactory.create().asScala
|
|
||||||
}.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
|
|
||||||
* using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
|
|
||||||
* time will be handled by restarting it as long as maxRestarts is not reached.
|
|
||||||
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
|
|
||||||
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
|
||||||
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
|
||||||
*/
|
|
||||||
def onFailuresWithBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
|
||||||
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
211
akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala
Normal file
211
akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala
Normal file
|
|
@ -0,0 +1,211 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.javadsl
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.japi.function.Creator
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
|
||||||
|
*
|
||||||
|
* They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for
|
||||||
|
* example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The
|
||||||
|
* RestartFlow ensures that the graph can continue running while the [[Flow]] restarts.
|
||||||
|
*/
|
||||||
|
object RestartFlow {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
||||||
|
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
|
||||||
|
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
|
||||||
|
* will be allowed to terminate without being restarted.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
||||||
|
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
||||||
|
flowFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
||||||
|
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
|
||||||
|
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
|
||||||
|
* will be allowed to terminate without being restarted.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
||||||
|
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
||||||
|
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
|
||||||
|
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
|
||||||
|
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
||||||
|
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
||||||
|
flowFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
|
||||||
|
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts
|
||||||
|
* is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
|
||||||
|
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
||||||
|
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
|
||||||
|
* using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
|
||||||
|
* time will be handled by restarting it as long as maxRestarts is not reached.
|
||||||
|
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
|
||||||
|
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
||||||
|
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
||||||
|
flowFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
|
||||||
|
* using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
|
||||||
|
* time will be handled by restarting it as long as maxRestarts is not reached.
|
||||||
|
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
|
||||||
|
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
|
||||||
|
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param flowFactory A factory for producing the [[Flow]] to wrap.
|
||||||
|
*/
|
||||||
|
def onFailuresWithBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory)
|
||||||
|
}
|
||||||
|
}
|
||||||
150
akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala
Normal file
150
akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala
Normal file
|
|
@ -0,0 +1,150 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.javadsl
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.japi.function.Creator
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
|
||||||
|
*
|
||||||
|
* They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
|
||||||
|
* example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
|
||||||
|
* RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
|
||||||
|
*/
|
||||||
|
object RestartSink {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
|
||||||
|
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
|
||||||
|
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
|
||||||
|
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
|
||||||
|
* graph.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
||||||
|
* sent may have been lost.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
||||||
|
sinkFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
|
||||||
|
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
|
||||||
|
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
|
||||||
|
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
|
||||||
|
* graph.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
||||||
|
* sent may have been lost.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
|
||||||
|
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
|
||||||
|
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
|
||||||
|
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* before this [[Sink]] in the graph.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
||||||
|
* sent may have been lost.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
||||||
|
sinkFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
|
||||||
|
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
|
||||||
|
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
|
||||||
|
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* before this [[Sink]] in the graph.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
||||||
|
* sent may have been lost.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,250 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.javadsl
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.japi.function.Creator
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
|
||||||
|
*
|
||||||
|
* They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
|
||||||
|
* example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
|
||||||
|
* RestartSource ensures that the graph can continue running while the [[Source]] restarts.
|
||||||
|
*/
|
||||||
|
object RestartSource {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
|
||||||
|
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
|
||||||
|
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
||||||
|
sourceFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
|
||||||
|
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
|
||||||
|
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
||||||
|
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
||||||
|
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
||||||
|
* and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
||||||
|
sourceFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
||||||
|
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
||||||
|
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
||||||
|
* and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
|
||||||
|
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
|
||||||
|
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒
|
||||||
|
sourceFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
|
||||||
|
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
|
||||||
|
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
||||||
|
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
||||||
|
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
||||||
|
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
|
||||||
|
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||||
|
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒
|
||||||
|
sourceFactory.create().asScala
|
||||||
|
}.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
||||||
|
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
||||||
|
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
||||||
|
* and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by
|
||||||
|
* introducing a [[KillSwitch]] right after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double,
|
||||||
|
maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -16,249 +16,6 @@ import akka.stream.stage._
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
/**
|
|
||||||
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
|
|
||||||
*
|
|
||||||
* They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
|
|
||||||
* example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
|
|
||||||
* RestartSource ensures that the graph can continue running while the [[Source]] restarts.
|
|
||||||
*/
|
|
||||||
object RestartSource {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
|
|
||||||
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
|
|
||||||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
|
||||||
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
|
||||||
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
|
||||||
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
|
||||||
* and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
|
||||||
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
|
|
||||||
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
|
|
||||||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
|
||||||
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, Int.MaxValue))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
|
||||||
*
|
|
||||||
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
|
||||||
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
|
||||||
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
|
||||||
* and it will not be restarted.
|
|
||||||
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* after this [[Source]] in the graph.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
|
||||||
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class RestartWithBackoffSource[T](
|
|
||||||
sourceFactory: () ⇒ Source[T, _],
|
|
||||||
minBackoff: FiniteDuration,
|
|
||||||
maxBackoff: FiniteDuration,
|
|
||||||
randomFactor: Double,
|
|
||||||
onlyOnFailures: Boolean,
|
|
||||||
maxRestarts: Int) extends GraphStage[SourceShape[T]] { self ⇒
|
|
||||||
|
|
||||||
val out = Outlet[T]("RestartWithBackoffSource.out")
|
|
||||||
|
|
||||||
override def shape = SourceShape(out)
|
|
||||||
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
|
|
||||||
"Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
|
|
||||||
|
|
||||||
override protected def logSource = self.getClass
|
|
||||||
|
|
||||||
override protected def startGraph() = {
|
|
||||||
val sinkIn = createSubInlet(out)
|
|
||||||
sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer)
|
|
||||||
if (isAvailable(out)) {
|
|
||||||
sinkIn.pull()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override protected def backoff() = {
|
|
||||||
setHandler(out, new OutHandler {
|
|
||||||
override def onPull() = ()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
backoff()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
|
|
||||||
*
|
|
||||||
* They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
|
|
||||||
* example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
|
|
||||||
* RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
|
|
||||||
*/
|
|
||||||
object RestartSink {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
|
|
||||||
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
|
|
||||||
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
|
|
||||||
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
|
|
||||||
* graph.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
|
||||||
* sent may have been lost.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = {
|
|
||||||
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
|
||||||
* backoff.
|
|
||||||
*
|
|
||||||
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
|
|
||||||
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
|
|
||||||
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
|
|
||||||
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
|
|
||||||
* before this [[Sink]] in the graph.
|
|
||||||
*
|
|
||||||
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
|
||||||
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
|
||||||
* sent may have been lost.
|
|
||||||
*
|
|
||||||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
|
||||||
*
|
|
||||||
* @param minBackoff minimum (initial) duration until the child actor will
|
|
||||||
* started again, if it is terminated
|
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
|
||||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
|
||||||
* In order to skip this additional delay pass in `0`.
|
|
||||||
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
|
||||||
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
|
||||||
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
|
||||||
*/
|
|
||||||
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = {
|
|
||||||
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class RestartWithBackoffSink[T](
|
|
||||||
sinkFactory: () ⇒ Sink[T, _],
|
|
||||||
minBackoff: FiniteDuration,
|
|
||||||
maxBackoff: FiniteDuration,
|
|
||||||
randomFactor: Double,
|
|
||||||
maxRestarts: Int) extends GraphStage[SinkShape[T]] { self ⇒
|
|
||||||
|
|
||||||
val in = Inlet[T]("RestartWithBackoffSink.in")
|
|
||||||
|
|
||||||
override def shape = SinkShape(in)
|
|
||||||
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
|
|
||||||
"Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
|
|
||||||
override protected def logSource = self.getClass
|
|
||||||
|
|
||||||
override protected def startGraph() = {
|
|
||||||
val sourceOut = createSubOutlet(in)
|
|
||||||
Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer)
|
|
||||||
}
|
|
||||||
|
|
||||||
override protected def backoff() = {
|
|
||||||
setHandler(in, new InHandler {
|
|
||||||
override def onPush() = ()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
backoff()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
|
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
|
||||||
*
|
*
|
||||||
|
|
@ -0,0 +1,108 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.stream.{ Attributes, Inlet, KillSwitch, SinkShape }
|
||||||
|
import akka.stream.stage.{ GraphStage, InHandler }
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
|
||||||
|
*
|
||||||
|
* They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
|
||||||
|
* example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
|
||||||
|
* RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
|
||||||
|
*/
|
||||||
|
object RestartSink {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
|
||||||
|
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
|
||||||
|
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
|
||||||
|
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
|
||||||
|
* graph.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
||||||
|
* sent may have been lost.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = {
|
||||||
|
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]]
|
||||||
|
* is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into
|
||||||
|
* this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted.
|
||||||
|
* This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* before this [[Sink]] in the graph.
|
||||||
|
*
|
||||||
|
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
|
||||||
|
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
|
||||||
|
* sent may have been lost.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = {
|
||||||
|
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class RestartWithBackoffSink[T](
|
||||||
|
sinkFactory: () ⇒ Sink[T, _],
|
||||||
|
minBackoff: FiniteDuration,
|
||||||
|
maxBackoff: FiniteDuration,
|
||||||
|
randomFactor: Double,
|
||||||
|
maxRestarts: Int) extends GraphStage[SinkShape[T]] { self ⇒
|
||||||
|
|
||||||
|
val in = Inlet[T]("RestartWithBackoffSink.in")
|
||||||
|
|
||||||
|
override def shape = SinkShape(in)
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
|
||||||
|
"Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
|
||||||
|
override protected def logSource = self.getClass
|
||||||
|
|
||||||
|
override protected def startGraph() = {
|
||||||
|
val sourceOut = createSubOutlet(in)
|
||||||
|
Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
override protected def backoff() = {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush() = ()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
backoff()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,157 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.stream.{ Attributes, KillSwitch, Outlet, SourceShape }
|
||||||
|
import akka.stream.stage.{ GraphStage, OutHandler }
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
|
||||||
|
*
|
||||||
|
* They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
|
||||||
|
* example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
|
||||||
|
* RestartSource ensures that the graph can continue running while the [[Source]] restarts.
|
||||||
|
*/
|
||||||
|
object RestartSource {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
|
||||||
|
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
|
||||||
|
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
||||||
|
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
|
||||||
|
* backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
||||||
|
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
||||||
|
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
||||||
|
* and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*/
|
||||||
|
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
||||||
|
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by
|
||||||
|
* restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]].
|
||||||
|
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
||||||
|
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, Int.MaxValue))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff.
|
||||||
|
*
|
||||||
|
* This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion
|
||||||
|
* or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled
|
||||||
|
* by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled,
|
||||||
|
* and it will not be restarted.
|
||||||
|
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
|
||||||
|
* after this [[Source]] in the graph.
|
||||||
|
*
|
||||||
|
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
|
||||||
|
*
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
|
||||||
|
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
|
||||||
|
* @param sourceFactory A factory for producing the [[Source]] to wrap.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
|
||||||
|
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class RestartWithBackoffSource[T](
|
||||||
|
sourceFactory: () ⇒ Source[T, _],
|
||||||
|
minBackoff: FiniteDuration,
|
||||||
|
maxBackoff: FiniteDuration,
|
||||||
|
randomFactor: Double,
|
||||||
|
onlyOnFailures: Boolean,
|
||||||
|
maxRestarts: Int) extends GraphStage[SourceShape[T]] { self ⇒
|
||||||
|
|
||||||
|
val out = Outlet[T]("RestartWithBackoffSource.out")
|
||||||
|
|
||||||
|
override def shape = SourceShape(out)
|
||||||
|
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
|
||||||
|
"Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
|
||||||
|
|
||||||
|
override protected def logSource = self.getClass
|
||||||
|
|
||||||
|
override protected def startGraph() = {
|
||||||
|
val sinkIn = createSubInlet(out)
|
||||||
|
sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer)
|
||||||
|
if (isAvailable(out)) {
|
||||||
|
sinkIn.pull()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override protected def backoff() = {
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull() = ()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
backoff()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -35,7 +35,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
// TODO these don't show up as def's yet so don't show up in the index..
|
// TODO these don't show up as def's yet so don't show up in the index..
|
||||||
// "Fan-out operators",
|
// "Fan-out operators",
|
||||||
"Watching status operators",
|
"Watching status operators",
|
||||||
"Actor interop operators"
|
"Actor interop operators",
|
||||||
|
"Error handling"
|
||||||
)
|
)
|
||||||
|
|
||||||
def categoryId(name: String): String = name.toLowerCase.replace(' ', '-')
|
def categoryId(name: String): String = name.toLowerCase.replace(' ', '-')
|
||||||
|
|
@ -119,7 +120,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
|
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
|
||||||
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
|
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
|
||||||
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
|
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
|
||||||
Set("++")
|
Set("++", "onPush", "onPull")
|
||||||
|
|
||||||
def isPending(element: String, opName: String) =
|
def isPending(element: String, opName: String) =
|
||||||
pendingTestCases.get(element).exists(_.contains(opName))
|
pendingTestCases.get(element).exists(_.contains(opName))
|
||||||
|
|
@ -145,6 +146,12 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
"akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala",
|
"akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala",
|
||||||
"akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala",
|
"akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala",
|
||||||
"akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala",
|
"akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala",
|
||||||
|
"akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala",
|
||||||
|
"akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala",
|
||||||
|
"akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala",
|
||||||
|
"akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala",
|
||||||
|
"akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala",
|
||||||
|
"akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala",
|
||||||
|
|
||||||
// akka-stream-typed
|
// akka-stream-typed
|
||||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",
|
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue