diff --git a/akka-docs/src/main/paradox/stream/operators/RetryFlow/withBackoff.md b/akka-docs/src/main/paradox/stream/operators/RetryFlow/withBackoff.md new file mode 100644 index 0000000000..b049c81a59 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RetryFlow/withBackoff.md @@ -0,0 +1,54 @@ +# RetryFlow.withBackoff + +Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try. + +@ref[Error handling](../index.md#error-handling) + +## Signature + +Scala +: @@signature [RetryFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala) { #withBackoff } + +Java +: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #withBackoff-signature } + + +## Description + +When an element is emitted by the wrapped `flow` it is passed to the `decideRetry` function, which may return an element to retry in the `flow`. + +The retry backoff is controlled by the `minBackoff`, `maxBackoff` and `randomFactor` parameters. +At most `maxRetries` will be made after the initial try. + +The wrapped `flow` must have **one-in one-out semantics**. It may not filter, nor duplicate elements. The `RetryFlow` will fail if two elements are emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will be emitted into the `flow` at any time. The `flow` needs to emit an element before the next will be emitted to it. + +Elements are retried as long as `maxRetries` is not reached and the `decideRetry` function returns a new element to be sent to `flow`. The `decideRetry` function gets passed in the original element sent to the `flow` and the element emitted by it. +When `decideRetry` returns @scala[`None`]@java[`Optional.empty`], no retries will be issued, and the response will be emitted downstream. + +@@@ note + +This API was added in Akka 2.6.0 and @ref:[may be changed](../../../common/may-change.md) in further patch releases. + +@@@ + +This example wraps a `flow` handling @scala[`Int`s]@java[`Integer`s], and retries elements unless the result is 0 or negative, or `maxRetries` is hit. + +Scala +: @@snip [RetryFlowSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala) { #withBackoff-demo } + +Java +: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #withBackoff-demo } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the wrapped flow emits, and either `maxRetries` is reached or `decideRetry` returns @scala[`None`]@java[`Optional.empty`] + +**backpressures** during backoff, when the wrapped flow backpressures, or when downstream backpressures + +**completes** when upstream or the wrapped flow completes + +**cancels** when downstream or the wrapped flow cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/RetryFlow/withBackoffAndContext.md b/akka-docs/src/main/paradox/stream/operators/RetryFlow/withBackoffAndContext.md new file mode 100644 index 0000000000..ffb1e6e54c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/RetryFlow/withBackoffAndContext.md @@ -0,0 +1,54 @@ +# RetryFlow.withBackoffAndContext + +Wrap the given @apidoc[FlowWithContext] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try. + +@ref[Error handling](../index.md#error-handling) + +## Signature + +Scala +: @@signature [RetryFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala) { #withBackoffAndContext } + +Java +: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #signature } + + +## Description + +When an element is emitted by the wrapped `flow` it is passed to the `decideRetry` function, which may return an element to retry in the `flow`. + +The retry backoff is controlled by the `minBackoff`, `maxBackoff` and `randomFactor` parameters. +At most `maxRetries` will be made after the initial try. + +The wrapped `flow` must have **one-in one-out semantics**. It may not filter, nor duplicate elements. The `RetryFlow` will fail if two elements are emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will be emitted into the `flow` at any time. The `flow` needs to emit an element before the next will be emitted to it. + +Elements are retried as long as `maxRetries` is not reached and the `decideRetry` function returns a new element to be sent to `flow`. The `decideRetry` function gets passed in the original element sent to the `flow` and the element emitted by it together with their contexts as @scala[tuples]@java[`akka.japi.Pair`s]. +When `decideRetry` returns @scala[`None`]@java[`Optional.empty`], no retries will be issued, and the response will be emitted downstream. + +@@@ note + +This API was added in Akka 2.6.0 and @ref:[may be changed](../../../common/may-change.md) in further patch releases. + +@@@ + +This example wraps a `flow` handling @scala[`Int`s]@java[`Integer`s] with `SomeContext` in context, and retries elements unless the result is 0 or negative, or `maxRetries` is hit. + +Scala +: @@snip [RetryFlowSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala) { #retry-success } + +Java +: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #retry-success } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the wrapped flow emits, and either `maxRetries` is reached or `decideRetry` returns @scala[`None`]@java[`Optional.empty`] + +**backpressures** during backoff, when the wrapped flow backpressures, or when downstream backpressures + +**completes** when upstream or the wrapped flow completes + +**cancels** when downstream or the wrapped flow cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 69e5df1ed5..67df16009c 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -311,6 +311,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) |RestartSource|@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or complete using an exponential backoff.| |RestartFlow|@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails or complete using an exponential backoff.| |RestartSink|@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it fails or complete using an exponential backoff.| +|RetryFlow|@ref[withBackoff](RetryFlow/withBackoff.md)|Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.| +|RetryFlow|@ref[withBackoffAndContext](RetryFlow/withBackoffAndContext.md)|Wrap the given @apidoc[FlowWithContext] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.| @@@ index @@ -466,6 +468,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [withBackoff](RestartFlow/withBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [withBackoff](RestartSink/withBackoff.md) +* [withBackoff](RetryFlow/withBackoff.md) +* [withBackoffAndContext](RetryFlow/withBackoffAndContext.md) * [actorRef](ActorSource/actorRef.md) * [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md) * [ask](ActorFlow/ask.md) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java new file mode 100644 index 0000000000..a0acb97ccc --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java @@ -0,0 +1,277 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.stream.testkit.TestPublisher; +import akka.stream.testkit.TestSubscriber; +import akka.stream.testkit.javadsl.TestSink; +import akka.stream.testkit.javadsl.TestSource; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; +import scala.util.Failure; +import scala.util.Success; +import scala.util.Try; + +import java.time.Duration; +import java.util.Optional; + +import static akka.NotUsed.notUsed; +import static org.junit.Assert.assertEquals; + +public class RetryFlowTest extends StreamTest { + public RetryFlowTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("RetryFlowTest", AkkaSpec.testConf()); + + public static + // #withBackoff-signature + Flow withBackoff( + Duration minBackoff, + Duration maxBackoff, + double randomFactor, + int maxRetries, + Flow flow, + akka.japi.function.Function2> decideRetry) + // #withBackoff-signature + { + return RetryFlow.withBackoff( + minBackoff, maxBackoff, randomFactor, maxRetries, flow, decideRetry); + } + + public static + // #signature + FlowWithContext withBackoffAndContext( + Duration minBackoff, + Duration maxBackoff, + double randomFactor, + int maxRetries, + FlowWithContext flow, + akka.japi.function.Function2, Pair, Optional>> + decideRetry) + // #signature + { + return RetryFlow.withBackoffAndContext( + minBackoff, maxBackoff, randomFactor, maxRetries, flow, decideRetry); + } + + @Test + public void withBackoffShouldRetry() { + final Duration minBackoff = Duration.ofMillis(10); + final Duration maxBackoff = Duration.ofSeconds(5); + final double randomFactor = 0d; + final int maxRetries = 3; + + // #withBackoff-demo + Flow flow = // ... + // the wrapped flow + // #withBackoff-demo + Flow.fromFunction(in -> in / 2); + + // #withBackoff-demo + + Flow retryFlow = + RetryFlow.withBackoff( + minBackoff, + maxBackoff, + randomFactor, + maxRetries, + flow, + (in, out) -> { + if (out > 0) return Optional.of(out); + else return Optional.empty(); + }); + // #withBackoff-demo + + final Pair, TestSubscriber.Probe> probes = + TestSource.probe(system) + .via(retryFlow) + .toMat(TestSink.probe(system), Keep.both()) + .run(system); + + final TestPublisher.Probe source = probes.first(); + final TestSubscriber.Probe sink = probes.second(); + + sink.request(4); + + source.sendNext(5); + assertEquals(0, sink.expectNext().intValue()); + + source.sendComplete(); + sink.expectComplete(); + } + + @Test + public void withBackoffAndContextShouldRetry() { + final Duration minBackoff = Duration.ofMillis(10); + final Duration maxBackoff = Duration.ofSeconds(5); + final double randomFactor = 0d; + final int maxRetries = 3; + + class SomeContext {} + + // #retry-success + FlowWithContext flow = // ... + // the wrapped flow + // #retry-success + FlowWithContext.fromPairs( + Flow.fromFunction( + in -> { + final Integer request = in.first(); + return Pair.create(request / 2, in.second()); + })); + + // #retry-success + + FlowWithContext retryFlow = + RetryFlow.withBackoffAndContext( + minBackoff, + maxBackoff, + randomFactor, + maxRetries, + flow, + (in, out) -> { + Integer value = out.first(); + SomeContext context = out.second(); + if (value > 0) { + return Optional.of(Pair.create(value, context)); + } else { + return Optional.empty(); + } + }); + // #retry-success + + final Pair, TestSubscriber.Probe>> + probes = + TestSource.probe(system) + .map(i -> Pair.create(i, new SomeContext())) + .via(retryFlow) + .toMat(TestSink.probe(system), Keep.both()) + .run(system); + + final TestPublisher.Probe source = probes.first(); + final TestSubscriber.Probe> sink = probes.second(); + + sink.request(4); + + source.sendNext(5); + assertEquals(0, sink.expectNext().first().intValue()); + + source.sendComplete(); + sink.expectComplete(); + } + + @Test + public void retryFailedResponses() { + final Duration minBackoff = Duration.ofMillis(10); + final Duration maxBackoff = Duration.ofSeconds(5); + final double randomFactor = 0d; + final int maxRetries = 3; + final FlowWithContext, Integer, NotUsed> failEvenValuesFlow = + FlowWithContext.fromPairs( + Flow.fromFunction( + in -> { + final Integer request = in.first(); + if (request % 2 == 0) + return Pair.create(Failure.apply(new Error("Failed response")), in.second()); + else return Pair.create(Success.apply(request), in.second()); + })); + + final FlowWithContext, Integer, NotUsed> retryFlow = + RetryFlow.withBackoffAndContext( + minBackoff, + maxBackoff, + randomFactor, + maxRetries, + failEvenValuesFlow, + (in, out) -> { + if (out.first().isFailure()) { + return Optional.of(Pair.create(in.first() + 1, out.second())); + } else { + return Optional.empty(); + } + }); + + final Pair, TestSubscriber.Probe, Integer>>> + probes = + TestSource.probe(system) + .map(i -> Pair.create(i, i)) + .via(retryFlow) + .toMat(TestSink.probe(system), Keep.both()) + .run(system); + + final TestPublisher.Probe source = probes.first(); + final TestSubscriber.Probe, Integer>> sink = probes.second(); + + sink.request(1); + source.sendNext(8); + + Pair, Integer> response = sink.expectNext(); + assertEquals(9, response.first().get().intValue()); + assertEquals(8, response.second().intValue()); + + source.sendComplete(); + sink.expectComplete(); + } + + @Test + public void supportFlowWithContext() { + final Duration minBackoff = Duration.ofMillis(10); + final Duration maxBackoff = Duration.ofSeconds(5); + final double randomFactor = 0d; + final int maxRetries = 5; + final FlowWithContext, Integer, NotUsed> flow = + Flow.create() + .asFlowWithContext((el, ctx) -> el, ctx -> ctx) + .map( + i -> { + if (i > 0) return Failure.apply(new RuntimeException("i is larger than 0")); + else return Success.apply(i); + }); + + final Pair, TestSubscriber.Probe, Integer>>> + probes = + TestSource.probe(system) + .asSourceWithContext(ctx -> ctx) + .via( + RetryFlow.withBackoffAndContext( + minBackoff, + maxBackoff, + randomFactor, + maxRetries, + flow, + (in, out) -> { + if (out.first().isFailure()) { + if (out.second() > 0) { + return Optional.of(Pair.create(out.second() / 2, out.second() / 2)); + } + } + return Optional.empty(); + })) + .toMat(TestSink.probe(system), Keep.both()) + .run(system); + + final TestPublisher.Probe source = probes.first(); + final TestSubscriber.Probe, Integer>> sink = probes.second(); + + sink.request(1); + source.sendNext(8); + + Pair, Integer> response = sink.expectNext(); + assertEquals(0, response.first().get().intValue()); + assertEquals(0, response.second().intValue()); + + source.sendComplete(); + sink.expectComplete(); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala new file mode 100644 index 0000000000..b8632ed77d --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala @@ -0,0 +1,560 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.OverflowStrategy +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import org.scalatest.matchers.{ MatchResult, Matcher } + +import scala.concurrent.duration._ +import scala.util.{ Failure, Success, Try } + +class RetryFlowSpec extends StreamSpec(""" + akka.stream.materializer.initial-input-buffer-size = 1 + akka.stream.materializer.max-input-buffer-size = 1 + """) with CustomMatchers { + + final val Failed = new Exception("prepared failure") + final val FailedElem: Try[Int] = Failure(Failed) + + val failEvenValuesFlow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] = + FlowWithContext.fromTuples(Flow.fromFunction { + case (i, ctx) if i % 2 == 0 => (FailedElem, ctx) + case (i, ctx) => (Success(i), ctx) + }) + + val failAllValuesFlow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] = + FlowWithContext.fromTuples(Flow.fromFunction { + case (_, j) => (FailedElem, j) + }) + + val alwaysRecoveringFunc: ((Int, Int), (Try[Int], Int)) => Option[(Int, Int)] = { + case (_, (Failure(_), i)) => Some(i -> i) + case _ => None + } + + /** increments the value and the context with every failure */ + def incrementFailedValues[OutData](in: (Int, Int), out: (Try[OutData], Int)): Option[(Int, Int)] = { + (in, out) match { + case ((in, _), (Failure(_), outCtx)) => Some((in + 1, outCtx + 1)) + case _ => None + } + } + + "RetryFlow.withBackoff" should { + + val failEvenValuesFlow: Flow[Int, Try[Int], NotUsed] = + Flow.fromFunction { + case i if i % 2 == 0 => FailedElem + case i => Success(i) + } + + def incrementFailedValues[OutData](in: Int, out: Try[OutData]): Option[Int] = { + (in, out) match { + case (in, Failure(_)) => Some(in + 1) + case _ => None + } + } + + "send elements through" in { + val sink = + Source(List(13, 17, 19, 23, 27)) + .via( + RetryFlow.withBackoff(10.millis, 5.second, 0d, maxRetries = 3, failEvenValuesFlow)(incrementFailedValues)) + .runWith(Sink.seq) + sink.futureValue should contain inOrderElementsOf List( + Success(13), + Success(17), + Success(19), + Success(23), + Success(27)) + } + + "allow retrying a successful element" in { + // #withBackoff-demo + val flow: Flow[Int, Int, NotUsed] = // ??? + // #withBackoff-demo + Flow.fromFunction(i => i / 2) + + // #withBackoff-demo + + val retryFlow: Flow[Int, Int, NotUsed] = + RetryFlow.withBackoff(minBackoff = 10.millis, maxBackoff = 5.seconds, randomFactor = 0d, maxRetries = 3, flow)( + decideRetry = { + case (_, result) if result > 0 => Some(result) + case _ => None + }) + // #withBackoff-demo + + val (source, sink) = TestSource.probe[Int].via(retryFlow).toMat(TestSink.probe)(Keep.both).run() + + sink.request(4) + + source.sendNext(5) + sink.expectNext(0) + + source.sendNext(2) + sink.expectNext(0) + + source.sendComplete() + sink.expectComplete() + } + } + + "RetryFlow.withBackoffAndContext" should { + + "send elements through" in { + val sink = + Source(List(13, 17, 19, 23, 27).map(_ -> 0)) + .via(RetryFlow.withBackoffAndContext(10.millis, 5.second, 0d, maxRetries = 3, failEvenValuesFlow)( + incrementFailedValues)) + .runWith(Sink.seq) + sink.futureValue should contain inOrderElementsOf List( + Success(13) -> 0, + Success(17) -> 0, + Success(19) -> 0, + Success(23) -> 0, + Success(27) -> 0) + } + + "send failures through (after retrying)" in { + val maxRetries = 2 + val sink = + Source(List(13, 17).map(_ -> 0)) + .via(RetryFlow.withBackoffAndContext(1.millis, 5.millis, 0d, maxRetries, failAllValuesFlow)( + incrementFailedValues)) + .runWith(Sink.seq) + sink.futureValue should contain inOrderElementsOf List(FailedElem -> maxRetries, FailedElem -> maxRetries) + } + + "send elements through (with retrying)" in { + val sink = + Source(List(12, 13, 14).map(_ -> 0)) + .via(RetryFlow.withBackoffAndContext(1.millis, 5.millis, 0d, maxRetries = 3, failEvenValuesFlow)( + incrementFailedValues)) + .runWith(Sink.seq) + sink.futureValue should contain inOrderElementsOf List(Success(13) -> 1, Success(13) -> 0, Success(15) -> 1) + } + + "allow retrying a successful element" in { + class SomeContext + + //#retry-success + val flow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = // ??? + //#retry-success + FlowWithContext.fromTuples[Int, SomeContext, Int, SomeContext, NotUsed](Flow.fromFunction { + case (i, ctx) => i / 2 -> ctx + }) + + //#retry-success + + val retryFlow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = + RetryFlow.withBackoffAndContext( + minBackoff = 10.millis, + maxBackoff = 5.seconds, + randomFactor = 0d, + maxRetries = 3, + flow)(decideRetry = { + case ((_, _), (result, ctx)) if result > 0 => Some(result -> ctx) + case _ => None + }) + //#retry-success + + val (source, sink) = TestSource.probe[(Int, SomeContext)].via(retryFlow).toMat(TestSink.probe)(Keep.both).run() + + sink.request(4) + + val ctx = new SomeContext + source.sendNext(5 -> ctx) + sink.expectNext(0 -> ctx) + + source.sendNext(2 -> ctx) + sink.expectNext(0 -> ctx) + + source.sendComplete() + sink.expectComplete() + } + + "work with a buffer in the inner flow" in { + val flow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] = + FlowWithContext.fromTuples(Flow[(Int, Int)].buffer(10, OverflowStrategy.backpressure).via(failEvenValuesFlow)) + val (source, sink) = TestSource + .probe[(Int, Int)] + .via(RetryFlow.withBackoffAndContext(10.millis, 5.seconds, 0d, 3, flow)((_, _) => None)) + .toMat(TestSink.probe)(Keep.both) + .run() + + sink.request(99) + + source.sendNext(1 -> 0) + source.sendNext(3 -> 0) + sink.expectNext(Success(1) -> 0) + sink.expectNext(Success(3) -> 0) + } + + } + + "Backing off" should { + "have min backoff" in { + val minBackoff = 200.millis + val (source, sink) = TestSource + .probe[(Int, Int)] + .via(RetryFlow.withBackoffAndContext(minBackoff, 5.second, 0d, 3, failEvenValuesFlow)(incrementFailedValues)) + .toMat(TestSink.probe)(Keep.both) + .run() + + sink.request(99) + + source.sendNext(1 -> 0) + sink.expectNext(80.millis, Success(1) -> 0) + + source.sendNext(2 -> 0) + sink.expectNoMessage(minBackoff) + sink.expectNext(Success(3) -> 1) + } + + "use min backoff for every try" in { + val minBackoff = 50.millis + val maxRetries = 3 + val (source, sink) = TestSource + .probe[(Int, Int)] + .via(RetryFlow.withBackoffAndContext(minBackoff, 5.seconds, 0d, maxRetries, failAllValuesFlow)( + incrementFailedValues)) + .toMat(TestSink.probe)(Keep.both) + .run() + + sink.request(1) + + source.sendNext(10 -> 0) + sink.expectNoMessage(minBackoff * maxRetries) + sink.expectNext(FailedElem -> maxRetries) + } + + "exponentially backoff between retries" in { + val NumRetries = 7 + + val nanoTimeOffset = System.nanoTime() + case class State(retriedAt: List[Long]) + + val flow = FlowWithContext.fromTuples[State, NotUsed, State, NotUsed, NotUsed](Flow.fromFunction { + case (State(retriedAt), _) => State(System.nanoTime - nanoTimeOffset :: retriedAt) -> NotUsed + }) + + val (source, sink) = TestSource + .probe[(State, NotUsed)] + .via(RetryFlow.withBackoffAndContext(10.millis, 5.seconds, 0d, NumRetries, flow) { + case (_, (s, _)) => Some(s -> NotUsed) + case _ => None + }) + .toMat(TestSink.probe)(Keep.both) + .run() + + sink.request(1) + + source.sendNext(State(Nil) -> NotUsed) + val (State(retriedAt), _) = sink.expectNext() + + val timesBetweenRetries = retriedAt + .sliding(2) + .collect { + case before :: after :: Nil => before - after + } + .toIndexedSeq + + timesBetweenRetries.reverse should strictlyIncrease + + source.sendComplete() + sink.expectComplete() + } + + } + + "Aborting" should { + "propagate error from upstream" in { + val retryFlow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] = + RetryFlow.withBackoffAndContext( + minBackoff = 10.millis, + maxBackoff = 5.seconds, + randomFactor = 0d, + maxRetries = 3, + flow = failEvenValuesFlow)(decideRetry = { + case ((in, _), (Failure(_), ctx)) => Some((in + 1, ctx)) + case _ => None + }) + + val (source, sink) = TestSource.probe[(Int, Int)].via(retryFlow).toMat(TestSink.probe)(Keep.both).run() + + sink.request(99) + + source.sendNext(1 -> 1) + sink.expectNext(Success(1) -> 1) + + source.sendNext(2 -> 2) + sink.expectNext(Success(3) -> 2) + + source.sendError(Failed) + sink.expectError(Failed) + } + + "propagate error from upstream on start" in new AllSucceedBench[Int, Int, Int] { + externalOut.request(99) + externalIn.sendError(Failed) + externalOut.expectError(Failed) + } + + "propagate error from upstream before start" in new AllSucceedBench[Int, Int, Int] { + externalIn.sendError(Failed) + externalOut.request(1) + externalOut.expectError(Failed) + } + + "propagate error on the inner flow after start" in new AllSucceedBench[Int, Int, Int] { + externalOut.request(99) + + // send one element through + externalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + internalIn.sendNext(1 -> 0) + externalOut.expectNext(1 -> 0) + + // fail inner flow + internalIn.sendError(Failed) + externalOut.expectError(Failed) + externalIn.expectCancellation() + } + + "propagate error on the inner flow on start" in new AllSucceedBench[Int, Int, Int] { + externalOut.request(29) + internalIn.sendError(Failed) + externalOut.expectError(Failed) + externalIn.expectCancellation() + } + + "propagate non-error cancel on the inner flow on start" in new AllSucceedBench[Int, Int, Int] { + externalOut.request(29) + internalOut.cancel() + externalOut.expectComplete() + externalIn.expectCancellation() + } + + "propagate error on the inner flow before start" in new AllSucceedBench[Int, Int, Int] { + internalIn.sendError(Failed) + externalOut.request(13) + externalOut.expectError(Failed) + externalIn.expectCancellation() + } + + "propagate error before the RetryFlow, while on retry spin" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) { + externalOut.request(92) + // spinning message + externalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + internalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + + externalOut.expectNoMessage() + externalIn.sendError(Failed) + externalOut.expectError(Failed) + } + + "propagate error on the inner flow, while on retry spin" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) { + externalOut.request(35) + // spinning message + externalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + internalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + + externalOut.expectNoMessage() + internalIn.sendError(Failed) + externalOut.expectError(Failed) + externalIn.expectCancellation() + } + + "allow for downstream cancel while element is in flow" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) { + externalOut.request(78) + // spinning message + externalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + + externalOut.cancel() + + internalIn.expectCancellation() + externalIn.expectCancellation() + } + + "allow for downstream cancel while retry is backing off" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) { + externalOut.request(35) + // spinning message + externalIn.sendNext(1 -> 0) + internalOut.requestNext(1 -> 0) + internalIn.sendNext(1 -> 0) + + externalOut.cancel() + + internalIn.expectCancellation() + externalIn.expectCancellation() + } + + "finish only after processing all elements in stream" in new AllSucceedBench[Int, Int, Int] { + externalOut.request(32) + + // send one element and complete + externalIn.sendNext(1 -> 0) + externalIn.sendComplete() + + internalOut.requestNext(1 -> 0) + internalIn.sendNext(1 -> 0) + externalOut.requestNext(1 -> 0) + + externalOut.expectComplete() + } + } + + "Coordinator" should { + + type InData = String + type Ctx2 = Int + type OutData = Try[String] + + "send elements across" in new AllSucceedBench[InData, Ctx2, OutData] { + // push element + val elA = "A" -> 123 + externalIn.sendNext(elA) + + // let element go via retryable flow + internalOut.requestNext(elA) + private val resA = Success("result A") -> 123 + internalIn.sendNext(resA) + + // expect result + externalOut.requestNext(resA) + } + + "retry for a failure" in new ConstructBench[InData, Ctx2, OutData]((_, out) => + out._1 match { + case Success(_) => None + case Failure(_) => Some("A'" -> 2) + }) { + // demand on stream end + externalOut.request(1) + + // push element + val element1 = "A" -> 1 + externalIn.sendNext(element1) + + // let element go via retryable flow + internalOut.requestNext(element1) + internalIn.sendNext(Failure(new RuntimeException("boom")) -> 1) + + // let element go via retryable flow + val try2 = internalOut.requestNext(3.seconds) + try2._1 shouldBe "A'" + try2._2 shouldBe 2 + val res1 = Success("Ares") -> 1 + internalIn.sendNext(res1) + + // expect result + externalOut.requestNext(res1) + } + + "fail if inner flow emits twice" in new AllSucceedBench[InData, Ctx2, OutData] { + externalOut.request(99) + + // push element + val elA = "A" -> 123 + externalIn.sendNext(elA) + + // let element go via retryable flow + internalOut.requestNext(elA) + val resA = Success("result A") -> 123 + internalIn.sendNext(resA) + internalIn.sendNext(Success("result B") -> 222) + + // expect result + externalOut.requestNext(resA) + externalOut.expectError() shouldBe an[IllegalStateException] + } + + "allow more demand in inner flow (but never pass in more than one element into the retrying cycle)" in new AllSucceedBench[ + InData, + Ctx2, + OutData] { + externalOut.request(1) + internalIn.expectRequest() shouldBe 1L + internalOut.request(1) + externalIn.expectRequest() shouldBe 1L + + // push element + val elA = "A" -> 123 + externalIn.sendNext(elA) + + // let element go via retryable flow + internalOut.expectNext(elA) + + // more demand on retryable flow + internalOut.request(1) + internalOut.expectNoMessage(50.millis) + + // emit from retryable flow, push to external + val resA = Success("result A") -> 123 + internalIn.sendNext(resA) + + // element should NOT reach internal before pushed from externalOut + internalOut.expectNoMessage(50.millis) + + // put a new element in external in buffer + val elB = "B" -> 567 + externalIn.sendNext(elB) + // element reaches internal flow before elA has bean fetched from externalOut + internalOut.expectNext(elB) + + externalOut.expectNext(resA) + } + } + + class ConstructBench[In, Ctx, Out](retryWith: ((In, Ctx), (Out, Ctx)) => Option[(In, Ctx)]) { + + val throughDangerousFlow + : FlowWithContext[In, Ctx, Out, Ctx, (TestSubscriber.Probe[(In, Ctx)], TestPublisher.Probe[(Out, Ctx)])] = + FlowWithContext.fromTuples( + Flow.fromSinkAndSourceMat(TestSink.probe[(In, Ctx)], TestSource.probe[(Out, Ctx)])(Keep.both)) + + val ((externalIn, (internalOut, internalIn)), externalOut) = + TestSource + .probe[(In, Ctx)] + .viaMat( + RetryFlow.withBackoffAndContext( + minBackoff = 10.millis, + maxBackoff = 1.second, + randomFactor = 0d, + maxRetries = 3, + throughDangerousFlow)(retryWith))(Keep.both) + .toMat(TestSink.probe[(Out, Ctx)])(Keep.both) + .run() + } + + class AllSucceedBench[In, Ctx, Out] extends ConstructBench[In, Ctx, Out]((_, _) => None) + +} + +trait CustomMatchers { + + class StrictlyIncreasesMatcher() extends Matcher[Seq[Long]] { + + def apply(left: Seq[Long]): MatchResult = { + val result = left.sliding(2).map(pair => pair.head < pair.last).reduceOption(_ && _).getOrElse(false) + + MatchResult( + result, + s"""Collection $left elements are not increasing strictly""", + s"""Collection $left elements are increasing strictly""") + } + } + + def strictlyIncrease = new StrictlyIncreasesMatcher() +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/RetryFlowCoordinator.scala b/akka-stream/src/main/scala/akka/stream/impl/RetryFlowCoordinator.scala new file mode 100644 index 0000000000..9805dc0e1b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/RetryFlowCoordinator.scala @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.pattern.BackoffSupervisor +import akka.stream.SubscriptionWithCancelException.NonFailureCancellation +import akka.stream.stage._ +import akka.stream.{ Attributes, BidiShape, Inlet, Outlet } +import akka.util.OptionVal + +import scala.concurrent.duration._ + +/** + * INTERNAL API. + * + * ``` + * externalIn + * | + * | + * +-> internalOut -->+ + * | | + * | flow + * | | + * | internalIn --+ + * +<-yes- retry? + * | + * no + * | + * externalOut + * ``` + */ +@InternalApi private[akka] final class RetryFlowCoordinator[In, Out]( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRetries: Int, + decideRetry: (In, Out) => Option[In]) + extends GraphStage[BidiShape[In, In, Out, Out]] { + + private val externalIn = Inlet[In]("RetryFlow.externalIn") + private val externalOut = Outlet[Out]("RetryFlow.externalOut") + + private val internalOut = Outlet[In]("RetryFlow.internalOut") + private val internalIn = Inlet[Out]("RetryFlow.internalIn") + + override val shape: BidiShape[In, In, Out, Out] = + BidiShape(externalIn, internalOut, internalIn, externalOut) + + override def createLogic(attributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + + private var elementInProgress: OptionVal[In] = OptionVal.none + private var retryNo = 0 + + setHandler( + externalIn, + new InHandler { + override def onPush(): Unit = { + val element = grab(externalIn) + elementInProgress = OptionVal.Some(element) + retryNo = 0 + pushInternal(element) + } + + override def onUpstreamFinish(): Unit = + if (elementInProgress.isEmpty) { + completeStage() + } + }) + + setHandler( + internalOut, + new OutHandler { + + override def onPull(): Unit = { + if (elementInProgress.isEmpty) { + if (!hasBeenPulled(externalIn) && !isClosed(externalIn)) { + pull(externalIn) + } + } + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + if (elementInProgress.isEmpty || !cause.isInstanceOf[NonFailureCancellation]) { + super.onDownstreamFinish(cause) + } else { + // emit elements before finishing + setKeepGoing(true) + } + } + }) + + setHandler( + internalIn, + new InHandler { + override def onPush(): Unit = { + val result = grab(internalIn) + elementInProgress match { + case OptionVal.None => + failStage( + new IllegalStateException( + s"inner flow emitted unexpected element $result; the flow must be one-in one-out")) + case OptionVal.Some((_, _)) if retryNo == maxRetries => pushExternal(result) + case OptionVal.Some(in) => + decideRetry(in, result) match { + case None => pushExternal(result) + case Some(element) => planRetry(element) + } + } + } + }) + + setHandler(externalOut, new OutHandler { + override def onPull(): Unit = + // external demand + if (!hasBeenPulled(internalIn)) pull(internalIn) + }) + + private def pushInternal(element: In): Unit = { + push(internalOut, element) + } + + private def pushExternal(result: Out): Unit = { + elementInProgress = OptionVal.none + push(externalOut, result) + if (isClosed(externalIn)) { + completeStage() + } else if (isAvailable(internalOut)) { + pull(externalIn) + } + } + + private def planRetry(element: In): Unit = { + val delay = BackoffSupervisor.calculateDelay(retryNo, minBackoff, maxBackoff, randomFactor) + elementInProgress = OptionVal.Some(element) + retryNo += 1 + pull(internalIn) + scheduleOnce(RetryFlowCoordinator.RetryCurrentElement, delay) + } + + override def onTimer(timerKey: Any): Unit = pushInternal(elementInProgress.get) + + } +} + +private object RetryFlowCoordinator { + case object RetryCurrentElement +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala new file mode 100644 index 0000000000..c50ac9463e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.javadsl + +import java.util.Optional + +import akka.annotation.ApiMayChange +import akka.japi.Pair +import akka.stream.scaladsl +import akka.util.JavaDurationConverters._ + +import scala.compat.java8.OptionConverters._ + +object RetryFlow { + + /** + * API may change! + * + * Allows retrying individual elements in the stream with an exponential backoff. + * + * The retry condition is controlled by the `decideRetry` function. It takes the originally emitted + * element and the response emitted by `flow`, and may return a request to be retried. + * + * The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics, + * the [[akka.stream.javadsl.Flow Flow]] may not filter elements, + * nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are + * emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will + * be emitted into the `flow` at any time. The `flow` needs to emit an element before the next + * will be emitted to it. + * + * @param minBackoff minimum duration to backoff between issuing retries + * @param maxBackoff maximum duration to backoff between issuing retries + * @param randomFactor adds jitter to the retry delay. Use 0 for no jitter + * @param flow a flow to retry elements from + * @param decideRetry retry condition decision function + */ + @ApiMayChange(issue = "https://github.com/akka/akka/issues/27960") + def withBackoff[In, Out, Mat]( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRetries: Int, + flow: Flow[In, Out, Mat], + decideRetry: akka.japi.function.Function2[In, Out, Optional[In]]): Flow[In, Out, Mat] = + scaladsl.RetryFlow + .withBackoff[In, Out, Mat](minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRetries, flow.asScala) { + (in, out) => + decideRetry.apply(in, out).asScala + } + .asJava + + /** + * API may change! + * + * Allows retrying individual elements in the stream with an exponential backoff. + * + * The retry condition is controlled by the `decideRetry` function. It takes the originally emitted + * element with its context, and the response emitted by `flow`, and may return a request to be retried. + * + * The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics, + * the [[akka.stream.javadsl.FlowWithContext FlowWithContext]] may not filter elements, + * nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are + * emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will + * be emitted into the `flow` at any time. The `flow` needs to emit an element before the next + * will be emitted to it. + * + * The wrapped `flow` and `decideRetry` take the additional context parameters which can be a context, + * or used to control retrying with other information. + * + * @param minBackoff minimum duration to backoff between issuing retries + * @param maxBackoff maximum duration to backoff between issuing retries + * @param randomFactor adds jitter to the retry delay. Use 0 for no jitter + * @param flow a flow to retry elements from + * @param decideRetry retry condition decision function + */ + @ApiMayChange(issue = "https://github.com/akka/akka/issues/27960") + def withBackoffAndContext[In, InCtx, Out, OutCtx, Mat]( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRetries: Int, + flow: FlowWithContext[In, InCtx, Out, OutCtx, Mat], + decideRetry: akka.japi.function.Function2[Pair[In, InCtx], Pair[Out, OutCtx], Optional[Pair[In, InCtx]]]) + : FlowWithContext[In, InCtx, Out, OutCtx, Mat] = + scaladsl.RetryFlow + .withBackoffAndContext[In, InCtx, Out, OutCtx, Mat]( + minBackoff.asScala, + maxBackoff.asScala, + randomFactor, + maxRetries, + flow.asScala) { (in, out) => + decideRetry.apply(Pair(in._1, in._2), Pair(out._1, out._2)).asScala.map(_.toScala) + } + .asJava[In, InCtx, Out, OutCtx, Mat] + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala new file mode 100644 index 0000000000..76b9bd744f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.annotation.ApiMayChange +import akka.stream.impl.RetryFlowCoordinator + +import scala.concurrent.duration._ + +object RetryFlow { + + /** + * API may change! + * + * Allows retrying individual elements in the stream with an exponential backoff. + * + * The retry condition is controlled by the `decideRetry` function. It takes the originally emitted + * element and the response emitted by `flow`, and may return a request to be retried. + * + * The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics, + * the [[akka.stream.scaladsl.Flow Flow]] may not filter elements, + * nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are + * emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will + * be emitted into the `flow` at any time. The `flow` needs to emit an element before the next + * will be emitted to it. + * + * @param minBackoff minimum duration to backoff between issuing retries + * @param maxBackoff maximum duration to backoff between issuing retries + * @param randomFactor adds jitter to the retry delay. Use 0 for no jitter + * @param maxRetries total number of allowed retries, when reached the last result will be emitted + * even if unsuccessful + * @param flow a flow to retry elements from + * @param decideRetry retry condition decision function + */ + @ApiMayChange(issue = "https://github.com/akka/akka/issues/27960") + def withBackoff[In, Out, Mat]( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRetries: Int, + flow: Flow[In, Out, Mat])(decideRetry: (In, Out) => Option[In]): Flow[In, Out, Mat] = + Flow.fromGraph { + val retryCoordination = BidiFlow.fromGraph( + new RetryFlowCoordinator[In, Out](minBackoff, maxBackoff, randomFactor, maxRetries, decideRetry)) + retryCoordination.joinMat(flow)(Keep.right) + } + + /** + * API may change! + * + * Allows retrying individual elements in the stream with an exponential backoff. + * + * The retry condition is controlled by the `decideRetry` function. It takes the originally emitted + * element with its context, and the response emitted by `flow`, and may return a request to be retried. + * + * The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics, + * the [[akka.stream.scaladsl.FlowWithContext FlowWithContext]] may not filter elements, + * nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are + * emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will + * be emitted into the `flow` at any time. The `flow` needs to emit an element before the next + * will be emitted to it. + * + * The wrapped `flow` and `decideRetry` take the additional context parameters which can be a context, + * or used to control retrying with other information. + * + * @param minBackoff minimum duration to backoff between issuing retries + * @param maxBackoff maximum duration to backoff between issuing retries + * @param randomFactor adds jitter to the retry delay. Use 0 for no jitter + * @param maxRetries total number of allowed retries, when reached the last result will be emitted + * even if unsuccessful + * @param flow a flow with context to retry elements from + * @param decideRetry retry condition decision function + */ + @ApiMayChange(issue = "https://github.com/akka/akka/issues/27960") + def withBackoffAndContext[In, CtxIn, Out, CtxOut, Mat]( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRetries: Int, + flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat])( + decideRetry: ((In, CtxIn), (Out, CtxOut)) => Option[(In, CtxIn)]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + FlowWithContext.fromTuples { + val retryCoordination = BidiFlow.fromGraph( + new RetryFlowCoordinator[(In, CtxIn), (Out, CtxOut)]( + minBackoff, + maxBackoff, + randomFactor, + maxRetries, + decideRetry)) + + retryCoordination.joinMat(flow)(Keep.right) + } + +} diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index a09b04c032..3b8c271b31 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -155,6 +155,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala", "akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala", "akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala", + "akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala", + "akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala", // akka-stream-typed "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",