diff --git a/akka-docs/src/main/categories/error-handling.md b/akka-docs/src/main/categories/error-handling.md new file mode 100644 index 0000000000..94c032e44d --- /dev/null +++ b/akka-docs/src/main/categories/error-handling.md @@ -0,0 +1 @@ +For more background see the @ref[Error Handling in Streams](../stream-error.md) section. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md new file mode 100644 index 0000000000..27715833ad --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RestartFlow/onFailuresWithBackoff.md @@ -0,0 +1,37 @@ +# RestartFlow.onFailuresWithBackoff + +Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @unidoc[Flow] will not restart on completion of the wrapped flow. + +@ref[Error handling](../index.md#error-handling) + +@@@div { .group-scala } + +## Signature + +@@signature [RestartFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala) { #onFailuresWithBackoff } + +@@@ + +## Description + +This @unidoc[Flow] will not emit any failure +The failures by the wrapped @unidoc[Flow] will be handled by +restarting the wrapping @unidoc[Flow] as long as maxRestarts is not reached. +Any termination signals sent to this @unidoc[Flow] however will terminate the wrapped @unidoc[Flow], if it's +running, and then the @unidoc[Flow] will be allowed to terminate without being restarted. + +The restart process is inherently lossy, since there is no coordination between cancelling and the sending of +messages. A termination signal from either end of the wrapped @unidoc[Flow] will cause the other end to be terminated, +and any in transit messages will be lost. During backoff, this @unidoc[Flow] will backpressure. + +This uses the same exponential backoff algorithm as @unidoc[Backoff]. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the wrapped flow emits + +**backpressures** during backoff and when the wrapped flow backpressures + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md new file mode 100644 index 0000000000..76276b763d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RestartFlow/withBackoff.md @@ -0,0 +1,38 @@ +# RestartFlow.withBackoff + +Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails or complete using an exponential backoff. + +@ref[Error handling](../index.md#error-handling) + +@@@div { .group-scala } + +## Signature + +@@signature [RestartFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala) { #withBackoff } + +@@@ + +## Description + +The resulting @unidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or +completed. Any termination by the @unidoc[Flow] before that time will be handled by restarting it. Any termination +signals sent to this @unidoc[Flow] however will terminate the wrapped @unidoc[Flow], if it's running, and then the @unidoc[Flow] +will be allowed to terminate without being restarted. + +The restart process is inherently lossy, since there is no coordination between cancelling and the sending of +messages. A termination signal from either end of the wrapped @unidoc[Flow] will cause the other end to be terminated, +and any in transit messages will be lost. During backoff, this @unidoc[Flow] will backpressure. + +This uses the same exponential backoff algorithm as @unidoc[Backoff]. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the wrapped flow emits + +**backpressures** during backoff and when the wrapped flow backpressures + +**completes** when the wrapped flow completes + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md new file mode 100644 index 0000000000..88e602fd96 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RestartSink/withBackoff.md @@ -0,0 +1,27 @@ +# RestartSink.withBackoff + +Wrap the given @unidoc[Sink] with a @unidoc[Sink] that will restart it when it fails or complete using an exponential backoff. + +@ref[Error handling](../index.md#error-handling) + +@@@div { .group-scala } + +## Signature + +@@signature [RestartSink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala) { #withBackoff } + +@@@ + +## Description + +This @unidoc[Sink] will never cancel, since cancellation by the wrapped @unidoc[Sink] is always handled by restarting it. +The wrapped @unidoc[Sink] can however be completed by feeding a completion or error into this @unidoc[Sink]. When that +happens, the @unidoc[Sink], if currently running, will terminate and will not be restarted. This can be triggered +simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this @unidoc[Sink] in the +graph. + +The restart process is inherently lossy, since there is no coordination between cancelling and the sending of +messages. When the wrapped @unidoc[Sink] does cancel, this @unidoc[Sink] will backpressure, however any elements already +sent may have been lost. + +This uses the same exponential backoff algorithm as @unidoc[Backoff]. diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md new file mode 100644 index 0000000000..6d37df9e72 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RestartSource/onFailuresWithBackoff.md @@ -0,0 +1,29 @@ +# RestartSource.onFailuresWithBackoff + +Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails using an exponential backoff. + +@ref[Error handling](../index.md#error-handling) + +@@@div { .group-scala } + +## Signature + +@@signature [RestartSource.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala) { #onFailuresWithBackoff } + +@@@ + +## Description + + This @unidoc[Source] will never emit a failure, since the failure of the wrapped @unidoc[Source] is always handled by + restarting. The wrapped @unidoc[Source] can be cancelled by cancelling this @unidoc[Source]. + When that happens, the wrapped @unidoc[Source], if currently running will be cancelled, and it will not be restarted. + This can be triggered simply by the downstream cancelling, or externally by introducing a @unidoc[KillSwitch] right + after this @unidoc[Source] in the graph. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the wrapped source emits + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md new file mode 100644 index 0000000000..711ba86a58 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RestartSource/withBackoff.md @@ -0,0 +1,33 @@ +# RestartSource.withBackoff + +Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails or complete using an exponential backoff. + +@ref[Error handling](../index.md#error-handling) + +@@@div { .group-scala } + +## Signature + +@@signature [RestartSource.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala) { #withBackoff } + +@@@ + +## Description + +This @unidoc[Flow] will never emit a complete or failure, since the completion or failure of the wrapped @unidoc[Source] +is always handled by restarting it. The wrapped @unidoc[Source] can however be cancelled by cancelling this @unidoc[Source]. +When that happens, the wrapped @unidoc[Source], if currently running will be cancelled, and it will not be restarted. +This can be triggered simply by the downstream cancelling, or externally by introducing a @unidoc[KillSwitch] right +after this @unidoc[Source] in the graph. + +This uses the same exponential backoff algorithm as @unidoc[Backoff]. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the wrapped source emits + +**completes** when the wrapped source completes + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source/queue.md b/akka-docs/src/main/paradox/stream/operators/Source/queue.md index 28e9d3dfdd..9486364f13 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/queue.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/queue.md @@ -33,7 +33,7 @@ Scala Java : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue } -## Reactive Streams Semantics +## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index d70b227a34..2a0c672e47 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -270,6 +270,18 @@ Operators meant for inter-operating between Akka Streams and Actors: |ActorSink|@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`].| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.| +## Error handling + +For more background see the @ref[Error Handling in Streams](../stream-error.md) section. + +| |Operator|Description| +|--|--|--| +|RestartSource|@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails using an exponential backoff.| +|RestartFlow|@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @unidoc[Flow] will not restart on completion of the wrapped flow.| +|RestartSource|@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails or complete using an exponential backoff.| +|RestartFlow|@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails or complete using an exponential backoff.| +|RestartSink|@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @unidoc[Sink] with a @unidoc[Sink] that will restart it when it fails or complete using an exponential backoff.| + @@@ index * [combine](Source/combine.md) @@ -408,6 +420,11 @@ Operators meant for inter-operating between Akka Streams and Actors: * [fromPath](FileIO/fromPath.md) * [toFile](FileIO/toFile.md) * [toPath](FileIO/toPath.md) +* [withBackoff](RestartSource/withBackoff.md) +* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) +* [withBackoff](RestartFlow/withBackoff.md) +* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) +* [withBackoff](RestartSink/withBackoff.md) * [ask](ActorFlow/ask.md) * [actorRef](ActorSink/actorRef.md) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala deleted file mode 100644 index be019ae46c..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala +++ /dev/null @@ -1,591 +0,0 @@ -/** - * Copyright (C) 2015-2018 Lightbend Inc. - */ - -package akka.stream.javadsl - -import akka.NotUsed -import akka.japi.function.Creator - -import scala.concurrent.duration.FiniteDuration - -/** - * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. - * - * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for - * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The - * RestartSource ensures that the graph can continue running while the [[Source]] restarts. - */ -object RestartSource { - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] - * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ - sourceFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] - * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - import akka.util.JavaDurationConverters._ - withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - sourceFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - import akka.util.JavaDurationConverters._ - withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by - * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - * - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ - sourceFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by - * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - * - */ - def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - import akka.util.JavaDurationConverters._ - onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by - * introducing a [[KillSwitch]] right after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - * - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - sourceFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by - * introducing a [[KillSwitch]] right after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - * - */ - def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - import akka.util.JavaDurationConverters._ - onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) - } -} - -/** - * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. - * - * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for - * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The - * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. - */ -object RestartSink { - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. - * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that - * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered - * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the - * graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ - sinkFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. - * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that - * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered - * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the - * graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - import akka.util.JavaDurationConverters._ - withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory) - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] - * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into - * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. - * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right - * before this [[Sink]] in the graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - sinkFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] - * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into - * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. - * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right - * before this [[Sink]] in the graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - import akka.util.JavaDurationConverters._ - withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory) - } -} - -/** - * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. - * - * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for - * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The - * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts. - */ -object RestartFlow { - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination - * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] - * will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ - flowFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination - * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] - * will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - import akka.util.JavaDurationConverters._ - withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts - * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's - * running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - flowFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or - * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts - * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's - * running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - import akka.util.JavaDurationConverters._ - withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts - * using an exponential backoff. - * - * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that - * time will be handled by restarting it as long as maxRestarts is not reached. - * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate - * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - flowFactory.create().asScala - }.asJava - } - - /** - * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts - * using an exponential backoff. - * - * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that - * time will be handled by restarting it as long as maxRestarts is not reached. - * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate - * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, - * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param flowFactory A factory for producing the [[Flow]] to wrap. - */ - def onFailuresWithBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, - maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - import akka.util.JavaDurationConverters._ - onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) - } -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala new file mode 100644 index 0000000000..a7f9cb9735 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala @@ -0,0 +1,211 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.NotUsed +import akka.japi.function.Creator + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for + * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The + * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts. + */ +object RestartFlow { + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination + * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] + * will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + flowFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination + * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] + * will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory) + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts + * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's + * running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + flowFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts + * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's + * running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts + * using an exponential backoff. + * + * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that + * time will be handled by restarting it as long as maxRestarts is not reached. + * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate + * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + flowFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts + * using an exponential backoff. + * + * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that + * time will be handled by restarting it as long as maxRestarts is not reached. + * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate + * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def onFailuresWithBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + import akka.util.JavaDurationConverters._ + onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala new file mode 100644 index 0000000000..3ccae5ba05 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.NotUsed +import akka.japi.function.Creator + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for + * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. + */ +object RestartSink { + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. + * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that + * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered + * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the + * graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + sinkFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. + * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that + * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered + * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the + * graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory) + } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] + * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into + * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. + * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right + * before this [[Sink]] in the graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sinkFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] + * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into + * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. + * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right + * before this [[Sink]] in the graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala new file mode 100644 index 0000000000..7d2ef79a94 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala @@ -0,0 +1,250 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.NotUsed +import akka.japi.function.Creator + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for + * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSource ensures that the graph can continue running while the [[Source]] restarts. + */ +object RestartSource { + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] + * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + sourceFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] + * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sourceFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by + * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + sourceFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by + * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by + * introducing a [[KillSwitch]] right after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sourceFactory.create().asScala + }.asJava + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by + * introducing a [[KillSwitch]] right after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala similarity index 52% rename from akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala index ec6f0a266c..8aee8ca840 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala @@ -16,249 +16,6 @@ import akka.stream.stage._ import scala.concurrent.duration._ -/** - * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. - * - * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for - * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The - * RestartSource ensures that the graph can continue running while the [[Source]] restarts. - */ -object RestartSource { - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] - * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue)) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts)) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by - * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. - * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - * - */ - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, Int.MaxValue)) - } - - /** - * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. - * - * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion - * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled - * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, - * and it will not be restarted. - * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right - * after this [[Source]] in the graph. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sourceFactory A factory for producing the [[Source]] to wrap. - * - */ - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { - Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts)) - } -} - -private final class RestartWithBackoffSource[T]( - sourceFactory: () ⇒ Source[T, _], - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - onlyOnFailures: Boolean, - maxRestarts: Int) extends GraphStage[SourceShape[T]] { self ⇒ - - val out = Outlet[T]("RestartWithBackoffSource.out") - - override def shape = SourceShape(out) - override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) { - - override protected def logSource = self.getClass - - override protected def startGraph() = { - val sinkIn = createSubInlet(out) - sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer) - if (isAvailable(out)) { - sinkIn.pull() - } - } - - override protected def backoff() = { - setHandler(out, new OutHandler { - override def onPull() = () - }) - } - - backoff() - } -} - -/** - * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. - * - * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for - * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The - * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. - */ -object RestartSink { - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. - * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that - * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered - * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the - * graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { - Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue)) - } - - /** - * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential - * backoff. - * - * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] - * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into - * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. - * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right - * before this [[Sink]] in the graph. - * - * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already - * sent may have been lost. - * - * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. - * - * @param minBackoff minimum (initial) duration until the child actor will - * started again, if it is terminated - * @param maxBackoff the exponential back-off is capped to this duration - * @param randomFactor after calculation of the exponential back-off an additional - * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. - * In order to skip this additional delay pass in `0`. - * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. - * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - * @param sinkFactory A factory for producing the [[Sink]] to wrap. - */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { - Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts)) - } -} - -private final class RestartWithBackoffSink[T]( - sinkFactory: () ⇒ Sink[T, _], - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int) extends GraphStage[SinkShape[T]] { self ⇒ - - val in = Inlet[T]("RestartWithBackoffSink.in") - - override def shape = SinkShape(in) - override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( - "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) { - override protected def logSource = self.getClass - - override protected def startGraph() = { - val sourceOut = createSubOutlet(in) - Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer) - } - - override protected def backoff() = { - setHandler(in, new InHandler { - override def onPush() = () - }) - } - - backoff() - } -} - /** * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala new file mode 100644 index 0000000000..9e36d559f5 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.{ Attributes, Inlet, KillSwitch, SinkShape } +import akka.stream.stage.{ GraphStage, InHandler } + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for + * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. + */ +object RestartSink { + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. + * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that + * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered + * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the + * graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { + Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue)) + } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] + * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into + * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. + * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right + * before this [[Sink]] in the graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { + Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor, maxRestarts)) + } +} + +private final class RestartWithBackoffSink[T]( + sinkFactory: () ⇒ Sink[T, _], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int) extends GraphStage[SinkShape[T]] { self ⇒ + + val in = Inlet[T]("RestartWithBackoffSink.in") + + override def shape = SinkShape(in) + override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( + "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) { + override protected def logSource = self.getClass + + override protected def startGraph() = { + val sourceOut = createSubOutlet(in) + Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer) + } + + override protected def backoff() = { + setHandler(in, new InHandler { + override def onPush() = () + }) + } + + backoff() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala new file mode 100644 index 0000000000..0874393278 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.{ Attributes, KillSwitch, Outlet, SourceShape } +import akka.stream.stage.{ GraphStage, OutHandler } + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for + * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSource ensures that the graph can continue running while the [[Source]] restarts. + */ +object RestartSource { + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] + * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue)) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts)) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by + * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, Int.MaxValue)) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts)) + } +} + +private final class RestartWithBackoffSource[T]( + sourceFactory: () ⇒ Source[T, _], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + onlyOnFailures: Boolean, + maxRestarts: Int) extends GraphStage[SourceShape[T]] { self ⇒ + + val out = Outlet[T]("RestartWithBackoffSource.out") + + override def shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( + "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) { + + override protected def logSource = self.getClass + + override protected def startGraph() = { + val sinkIn = createSubInlet(out) + sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer) + if (isAvailable(out)) { + sinkIn.pull() + } + } + + override protected def backoff() = { + setHandler(out, new OutHandler { + override def onPull() = () + }) + } + + backoff() + } +} diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index d2393fcaea..0f7ee5debc 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -35,7 +35,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { // TODO these don't show up as def's yet so don't show up in the index.. // "Fan-out operators", "Watching status operators", - "Actor interop operators" + "Actor interop operators", + "Error handling" ) def categoryId(name: String): String = name.toLowerCase.replace(' ', '-') @@ -119,7 +120,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++ - Set("++") + Set("++", "onPush", "onPull") def isPending(element: String, opName: String) = pendingTestCases.get(element).exists(_.contains(opName)) @@ -145,6 +146,12 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala", "akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala", "akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala", + "akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala", + "akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala", + "akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala", + "akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala", + "akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala", + "akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala", // akka-stream-typed "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",