RetryFlow: retries of individual stream elements with backouff (#27742)

RetryFlow wraps a flow with context and allows to individually retry elements. A decider function decides if another try should be made and gives the element to try with.
Retries are backed off exponentially.
Retries are limited by maxRetries.

Inspired by the work of Gilad Hoch <gilad.hoch@thomsonreuters.com> and Martynas Mickevičius <self@2m.lt>
This commit is contained in:
Enno 2019-10-15 18:41:30 +02:00 committed by GitHub
parent 5e61c4123d
commit 70c3cdfa97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 1295 additions and 0 deletions

View file

@ -0,0 +1,54 @@
# RetryFlow.withBackoff
Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.
@ref[Error handling](../index.md#error-handling)
## Signature
Scala
: @@signature [RetryFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala) { #withBackoff }
Java
: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #withBackoff-signature }
## Description
When an element is emitted by the wrapped `flow` it is passed to the `decideRetry` function, which may return an element to retry in the `flow`.
The retry backoff is controlled by the `minBackoff`, `maxBackoff` and `randomFactor` parameters.
At most `maxRetries` will be made after the initial try.
The wrapped `flow` must have **one-in one-out semantics**. It may not filter, nor duplicate elements. The `RetryFlow` will fail if two elements are emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will be emitted into the `flow` at any time. The `flow` needs to emit an element before the next will be emitted to it.
Elements are retried as long as `maxRetries` is not reached and the `decideRetry` function returns a new element to be sent to `flow`. The `decideRetry` function gets passed in the original element sent to the `flow` and the element emitted by it.
When `decideRetry` returns @scala[`None`]@java[`Optional.empty`], no retries will be issued, and the response will be emitted downstream.
@@@ note
This API was added in Akka 2.6.0 and @ref:[may be changed](../../../common/may-change.md) in further patch releases.
@@@
This example wraps a `flow` handling @scala[`Int`s]@java[`Integer`s], and retries elements unless the result is 0 or negative, or `maxRetries` is hit.
Scala
: @@snip [RetryFlowSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala) { #withBackoff-demo }
Java
: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #withBackoff-demo }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the wrapped flow emits, and either `maxRetries` is reached or `decideRetry` returns @scala[`None`]@java[`Optional.empty`]
**backpressures** during backoff, when the wrapped flow backpressures, or when downstream backpressures
**completes** when upstream or the wrapped flow completes
**cancels** when downstream or the wrapped flow cancels
@@@

View file

@ -0,0 +1,54 @@
# RetryFlow.withBackoffAndContext
Wrap the given @apidoc[FlowWithContext] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.
@ref[Error handling](../index.md#error-handling)
## Signature
Scala
: @@signature [RetryFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala) { #withBackoffAndContext }
Java
: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #signature }
## Description
When an element is emitted by the wrapped `flow` it is passed to the `decideRetry` function, which may return an element to retry in the `flow`.
The retry backoff is controlled by the `minBackoff`, `maxBackoff` and `randomFactor` parameters.
At most `maxRetries` will be made after the initial try.
The wrapped `flow` must have **one-in one-out semantics**. It may not filter, nor duplicate elements. The `RetryFlow` will fail if two elements are emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will be emitted into the `flow` at any time. The `flow` needs to emit an element before the next will be emitted to it.
Elements are retried as long as `maxRetries` is not reached and the `decideRetry` function returns a new element to be sent to `flow`. The `decideRetry` function gets passed in the original element sent to the `flow` and the element emitted by it together with their contexts as @scala[tuples]@java[`akka.japi.Pair`s].
When `decideRetry` returns @scala[`None`]@java[`Optional.empty`], no retries will be issued, and the response will be emitted downstream.
@@@ note
This API was added in Akka 2.6.0 and @ref:[may be changed](../../../common/may-change.md) in further patch releases.
@@@
This example wraps a `flow` handling @scala[`Int`s]@java[`Integer`s] with `SomeContext` in context, and retries elements unless the result is 0 or negative, or `maxRetries` is hit.
Scala
: @@snip [RetryFlowSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RetryFlowSpec.scala) { #retry-success }
Java
: @@snip [RetryFlowTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/RetryFlowTest.java) { #retry-success }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the wrapped flow emits, and either `maxRetries` is reached or `decideRetry` returns @scala[`None`]@java[`Optional.empty`]
**backpressures** during backoff, when the wrapped flow backpressures, or when downstream backpressures
**completes** when upstream or the wrapped flow completes
**cancels** when downstream or the wrapped flow cancels
@@@

View file

@ -311,6 +311,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|RestartSource|<a name="withbackoff"></a>@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails or complete using an exponential backoff.|
|RestartFlow|<a name="withbackoff"></a>@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails or complete using an exponential backoff.|
|RestartSink|<a name="withbackoff"></a>@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @apidoc[Sink] with a @apidoc[Sink] that will restart it when it fails or complete using an exponential backoff.|
|RetryFlow|<a name="withbackoff"></a>@ref[withBackoff](RetryFlow/withBackoff.md)|Wrap the given @apidoc[Flow] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.|
|RetryFlow|<a name="withbackoffandcontext"></a>@ref[withBackoffAndContext](RetryFlow/withBackoffAndContext.md)|Wrap the given @apidoc[FlowWithContext] and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.|
@@@ index
@ -466,6 +468,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [withBackoff](RestartFlow/withBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [withBackoff](RestartSink/withBackoff.md)
* [withBackoff](RetryFlow/withBackoff.md)
* [withBackoffAndContext](RetryFlow/withBackoffAndContext.md)
* [actorRef](ActorSource/actorRef.md)
* [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)
* [ask](ActorFlow/ask.md)

View file

@ -0,0 +1,277 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl;
import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.StreamTest;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.TestSource;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import java.time.Duration;
import java.util.Optional;
import static akka.NotUsed.notUsed;
import static org.junit.Assert.assertEquals;
public class RetryFlowTest extends StreamTest {
public RetryFlowTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("RetryFlowTest", AkkaSpec.testConf());
public static
// #withBackoff-signature
<In, InCtx, Out, OutCtx, Mat> Flow<In, Out, Mat> withBackoff(
Duration minBackoff,
Duration maxBackoff,
double randomFactor,
int maxRetries,
Flow<In, Out, Mat> flow,
akka.japi.function.Function2<In, Out, Optional<In>> decideRetry)
// #withBackoff-signature
{
return RetryFlow.<In, Out, Mat>withBackoff(
minBackoff, maxBackoff, randomFactor, maxRetries, flow, decideRetry);
}
public static
// #signature
<In, InCtx, Out, OutCtx, Mat> FlowWithContext<In, InCtx, Out, OutCtx, Mat> withBackoffAndContext(
Duration minBackoff,
Duration maxBackoff,
double randomFactor,
int maxRetries,
FlowWithContext<In, InCtx, Out, OutCtx, Mat> flow,
akka.japi.function.Function2<Pair<In, InCtx>, Pair<Out, OutCtx>, Optional<Pair<In, InCtx>>>
decideRetry)
// #signature
{
return RetryFlow.<In, InCtx, Out, OutCtx, Mat>withBackoffAndContext(
minBackoff, maxBackoff, randomFactor, maxRetries, flow, decideRetry);
}
@Test
public void withBackoffShouldRetry() {
final Duration minBackoff = Duration.ofMillis(10);
final Duration maxBackoff = Duration.ofSeconds(5);
final double randomFactor = 0d;
final int maxRetries = 3;
// #withBackoff-demo
Flow<Integer, Integer, NotUsed> flow = // ...
// the wrapped flow
// #withBackoff-demo
Flow.fromFunction(in -> in / 2);
// #withBackoff-demo
Flow<Integer, Integer, NotUsed> retryFlow =
RetryFlow.withBackoff(
minBackoff,
maxBackoff,
randomFactor,
maxRetries,
flow,
(in, out) -> {
if (out > 0) return Optional.of(out);
else return Optional.empty();
});
// #withBackoff-demo
final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> probes =
TestSource.<Integer>probe(system)
.via(retryFlow)
.toMat(TestSink.probe(system), Keep.both())
.run(system);
final TestPublisher.Probe<Integer> source = probes.first();
final TestSubscriber.Probe<Integer> sink = probes.second();
sink.request(4);
source.sendNext(5);
assertEquals(0, sink.expectNext().intValue());
source.sendComplete();
sink.expectComplete();
}
@Test
public void withBackoffAndContextShouldRetry() {
final Duration minBackoff = Duration.ofMillis(10);
final Duration maxBackoff = Duration.ofSeconds(5);
final double randomFactor = 0d;
final int maxRetries = 3;
class SomeContext {}
// #retry-success
FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> flow = // ...
// the wrapped flow
// #retry-success
FlowWithContext.fromPairs(
Flow.fromFunction(
in -> {
final Integer request = in.first();
return Pair.create(request / 2, in.second());
}));
// #retry-success
FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> retryFlow =
RetryFlow.withBackoffAndContext(
minBackoff,
maxBackoff,
randomFactor,
maxRetries,
flow,
(in, out) -> {
Integer value = out.first();
SomeContext context = out.second();
if (value > 0) {
return Optional.of(Pair.create(value, context));
} else {
return Optional.empty();
}
});
// #retry-success
final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Pair<Integer, SomeContext>>>
probes =
TestSource.<Integer>probe(system)
.map(i -> Pair.create(i, new SomeContext()))
.via(retryFlow)
.toMat(TestSink.probe(system), Keep.both())
.run(system);
final TestPublisher.Probe<Integer> source = probes.first();
final TestSubscriber.Probe<Pair<Integer, SomeContext>> sink = probes.second();
sink.request(4);
source.sendNext(5);
assertEquals(0, sink.expectNext().first().intValue());
source.sendComplete();
sink.expectComplete();
}
@Test
public void retryFailedResponses() {
final Duration minBackoff = Duration.ofMillis(10);
final Duration maxBackoff = Duration.ofSeconds(5);
final double randomFactor = 0d;
final int maxRetries = 3;
final FlowWithContext<Integer, Integer, Try<Integer>, Integer, NotUsed> failEvenValuesFlow =
FlowWithContext.fromPairs(
Flow.fromFunction(
in -> {
final Integer request = in.first();
if (request % 2 == 0)
return Pair.create(Failure.apply(new Error("Failed response")), in.second());
else return Pair.create(Success.apply(request), in.second());
}));
final FlowWithContext<Integer, Integer, Try<Integer>, Integer, NotUsed> retryFlow =
RetryFlow.withBackoffAndContext(
minBackoff,
maxBackoff,
randomFactor,
maxRetries,
failEvenValuesFlow,
(in, out) -> {
if (out.first().isFailure()) {
return Optional.of(Pair.create(in.first() + 1, out.second()));
} else {
return Optional.empty();
}
});
final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Pair<Try<Integer>, Integer>>>
probes =
TestSource.<Integer>probe(system)
.map(i -> Pair.create(i, i))
.via(retryFlow)
.toMat(TestSink.probe(system), Keep.both())
.run(system);
final TestPublisher.Probe<Integer> source = probes.first();
final TestSubscriber.Probe<Pair<Try<Integer>, Integer>> sink = probes.second();
sink.request(1);
source.sendNext(8);
Pair<Try<Integer>, Integer> response = sink.expectNext();
assertEquals(9, response.first().get().intValue());
assertEquals(8, response.second().intValue());
source.sendComplete();
sink.expectComplete();
}
@Test
public void supportFlowWithContext() {
final Duration minBackoff = Duration.ofMillis(10);
final Duration maxBackoff = Duration.ofSeconds(5);
final double randomFactor = 0d;
final int maxRetries = 5;
final FlowWithContext<Integer, Integer, Try<Integer>, Integer, NotUsed> flow =
Flow.<Integer>create()
.<Integer, Integer, Integer>asFlowWithContext((el, ctx) -> el, ctx -> ctx)
.map(
i -> {
if (i > 0) return Failure.apply(new RuntimeException("i is larger than 0"));
else return Success.apply(i);
});
final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Pair<Try<Integer>, Integer>>>
probes =
TestSource.<Integer>probe(system)
.asSourceWithContext(ctx -> ctx)
.via(
RetryFlow.withBackoffAndContext(
minBackoff,
maxBackoff,
randomFactor,
maxRetries,
flow,
(in, out) -> {
if (out.first().isFailure()) {
if (out.second() > 0) {
return Optional.of(Pair.create(out.second() / 2, out.second() / 2));
}
}
return Optional.empty();
}))
.toMat(TestSink.probe(system), Keep.both())
.run(system);
final TestPublisher.Probe<Integer> source = probes.first();
final TestSubscriber.Probe<Pair<Try<Integer>, Integer>> sink = probes.second();
sink.request(1);
source.sendNext(8);
Pair<Try<Integer>, Integer> response = sink.expectNext();
assertEquals(0, response.first().get().intValue());
assertEquals(0, response.second().intValue());
source.sendComplete();
sink.expectComplete();
}
}

View file

@ -0,0 +1,560 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import org.scalatest.matchers.{ MatchResult, Matcher }
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
class RetryFlowSpec extends StreamSpec("""
akka.stream.materializer.initial-input-buffer-size = 1
akka.stream.materializer.max-input-buffer-size = 1
""") with CustomMatchers {
final val Failed = new Exception("prepared failure")
final val FailedElem: Try[Int] = Failure(Failed)
val failEvenValuesFlow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] =
FlowWithContext.fromTuples(Flow.fromFunction {
case (i, ctx) if i % 2 == 0 => (FailedElem, ctx)
case (i, ctx) => (Success(i), ctx)
})
val failAllValuesFlow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] =
FlowWithContext.fromTuples(Flow.fromFunction {
case (_, j) => (FailedElem, j)
})
val alwaysRecoveringFunc: ((Int, Int), (Try[Int], Int)) => Option[(Int, Int)] = {
case (_, (Failure(_), i)) => Some(i -> i)
case _ => None
}
/** increments the value and the context with every failure */
def incrementFailedValues[OutData](in: (Int, Int), out: (Try[OutData], Int)): Option[(Int, Int)] = {
(in, out) match {
case ((in, _), (Failure(_), outCtx)) => Some((in + 1, outCtx + 1))
case _ => None
}
}
"RetryFlow.withBackoff" should {
val failEvenValuesFlow: Flow[Int, Try[Int], NotUsed] =
Flow.fromFunction {
case i if i % 2 == 0 => FailedElem
case i => Success(i)
}
def incrementFailedValues[OutData](in: Int, out: Try[OutData]): Option[Int] = {
(in, out) match {
case (in, Failure(_)) => Some(in + 1)
case _ => None
}
}
"send elements through" in {
val sink =
Source(List(13, 17, 19, 23, 27))
.via(
RetryFlow.withBackoff(10.millis, 5.second, 0d, maxRetries = 3, failEvenValuesFlow)(incrementFailedValues))
.runWith(Sink.seq)
sink.futureValue should contain inOrderElementsOf List(
Success(13),
Success(17),
Success(19),
Success(23),
Success(27))
}
"allow retrying a successful element" in {
// #withBackoff-demo
val flow: Flow[Int, Int, NotUsed] = // ???
// #withBackoff-demo
Flow.fromFunction(i => i / 2)
// #withBackoff-demo
val retryFlow: Flow[Int, Int, NotUsed] =
RetryFlow.withBackoff(minBackoff = 10.millis, maxBackoff = 5.seconds, randomFactor = 0d, maxRetries = 3, flow)(
decideRetry = {
case (_, result) if result > 0 => Some(result)
case _ => None
})
// #withBackoff-demo
val (source, sink) = TestSource.probe[Int].via(retryFlow).toMat(TestSink.probe)(Keep.both).run()
sink.request(4)
source.sendNext(5)
sink.expectNext(0)
source.sendNext(2)
sink.expectNext(0)
source.sendComplete()
sink.expectComplete()
}
}
"RetryFlow.withBackoffAndContext" should {
"send elements through" in {
val sink =
Source(List(13, 17, 19, 23, 27).map(_ -> 0))
.via(RetryFlow.withBackoffAndContext(10.millis, 5.second, 0d, maxRetries = 3, failEvenValuesFlow)(
incrementFailedValues))
.runWith(Sink.seq)
sink.futureValue should contain inOrderElementsOf List(
Success(13) -> 0,
Success(17) -> 0,
Success(19) -> 0,
Success(23) -> 0,
Success(27) -> 0)
}
"send failures through (after retrying)" in {
val maxRetries = 2
val sink =
Source(List(13, 17).map(_ -> 0))
.via(RetryFlow.withBackoffAndContext(1.millis, 5.millis, 0d, maxRetries, failAllValuesFlow)(
incrementFailedValues))
.runWith(Sink.seq)
sink.futureValue should contain inOrderElementsOf List(FailedElem -> maxRetries, FailedElem -> maxRetries)
}
"send elements through (with retrying)" in {
val sink =
Source(List(12, 13, 14).map(_ -> 0))
.via(RetryFlow.withBackoffAndContext(1.millis, 5.millis, 0d, maxRetries = 3, failEvenValuesFlow)(
incrementFailedValues))
.runWith(Sink.seq)
sink.futureValue should contain inOrderElementsOf List(Success(13) -> 1, Success(13) -> 0, Success(15) -> 1)
}
"allow retrying a successful element" in {
class SomeContext
//#retry-success
val flow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = // ???
//#retry-success
FlowWithContext.fromTuples[Int, SomeContext, Int, SomeContext, NotUsed](Flow.fromFunction {
case (i, ctx) => i / 2 -> ctx
})
//#retry-success
val retryFlow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] =
RetryFlow.withBackoffAndContext(
minBackoff = 10.millis,
maxBackoff = 5.seconds,
randomFactor = 0d,
maxRetries = 3,
flow)(decideRetry = {
case ((_, _), (result, ctx)) if result > 0 => Some(result -> ctx)
case _ => None
})
//#retry-success
val (source, sink) = TestSource.probe[(Int, SomeContext)].via(retryFlow).toMat(TestSink.probe)(Keep.both).run()
sink.request(4)
val ctx = new SomeContext
source.sendNext(5 -> ctx)
sink.expectNext(0 -> ctx)
source.sendNext(2 -> ctx)
sink.expectNext(0 -> ctx)
source.sendComplete()
sink.expectComplete()
}
"work with a buffer in the inner flow" in {
val flow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] =
FlowWithContext.fromTuples(Flow[(Int, Int)].buffer(10, OverflowStrategy.backpressure).via(failEvenValuesFlow))
val (source, sink) = TestSource
.probe[(Int, Int)]
.via(RetryFlow.withBackoffAndContext(10.millis, 5.seconds, 0d, 3, flow)((_, _) => None))
.toMat(TestSink.probe)(Keep.both)
.run()
sink.request(99)
source.sendNext(1 -> 0)
source.sendNext(3 -> 0)
sink.expectNext(Success(1) -> 0)
sink.expectNext(Success(3) -> 0)
}
}
"Backing off" should {
"have min backoff" in {
val minBackoff = 200.millis
val (source, sink) = TestSource
.probe[(Int, Int)]
.via(RetryFlow.withBackoffAndContext(minBackoff, 5.second, 0d, 3, failEvenValuesFlow)(incrementFailedValues))
.toMat(TestSink.probe)(Keep.both)
.run()
sink.request(99)
source.sendNext(1 -> 0)
sink.expectNext(80.millis, Success(1) -> 0)
source.sendNext(2 -> 0)
sink.expectNoMessage(minBackoff)
sink.expectNext(Success(3) -> 1)
}
"use min backoff for every try" in {
val minBackoff = 50.millis
val maxRetries = 3
val (source, sink) = TestSource
.probe[(Int, Int)]
.via(RetryFlow.withBackoffAndContext(minBackoff, 5.seconds, 0d, maxRetries, failAllValuesFlow)(
incrementFailedValues))
.toMat(TestSink.probe)(Keep.both)
.run()
sink.request(1)
source.sendNext(10 -> 0)
sink.expectNoMessage(minBackoff * maxRetries)
sink.expectNext(FailedElem -> maxRetries)
}
"exponentially backoff between retries" in {
val NumRetries = 7
val nanoTimeOffset = System.nanoTime()
case class State(retriedAt: List[Long])
val flow = FlowWithContext.fromTuples[State, NotUsed, State, NotUsed, NotUsed](Flow.fromFunction {
case (State(retriedAt), _) => State(System.nanoTime - nanoTimeOffset :: retriedAt) -> NotUsed
})
val (source, sink) = TestSource
.probe[(State, NotUsed)]
.via(RetryFlow.withBackoffAndContext(10.millis, 5.seconds, 0d, NumRetries, flow) {
case (_, (s, _)) => Some(s -> NotUsed)
case _ => None
})
.toMat(TestSink.probe)(Keep.both)
.run()
sink.request(1)
source.sendNext(State(Nil) -> NotUsed)
val (State(retriedAt), _) = sink.expectNext()
val timesBetweenRetries = retriedAt
.sliding(2)
.collect {
case before :: after :: Nil => before - after
}
.toIndexedSeq
timesBetweenRetries.reverse should strictlyIncrease
source.sendComplete()
sink.expectComplete()
}
}
"Aborting" should {
"propagate error from upstream" in {
val retryFlow: FlowWithContext[Int, Int, Try[Int], Int, NotUsed] =
RetryFlow.withBackoffAndContext(
minBackoff = 10.millis,
maxBackoff = 5.seconds,
randomFactor = 0d,
maxRetries = 3,
flow = failEvenValuesFlow)(decideRetry = {
case ((in, _), (Failure(_), ctx)) => Some((in + 1, ctx))
case _ => None
})
val (source, sink) = TestSource.probe[(Int, Int)].via(retryFlow).toMat(TestSink.probe)(Keep.both).run()
sink.request(99)
source.sendNext(1 -> 1)
sink.expectNext(Success(1) -> 1)
source.sendNext(2 -> 2)
sink.expectNext(Success(3) -> 2)
source.sendError(Failed)
sink.expectError(Failed)
}
"propagate error from upstream on start" in new AllSucceedBench[Int, Int, Int] {
externalOut.request(99)
externalIn.sendError(Failed)
externalOut.expectError(Failed)
}
"propagate error from upstream before start" in new AllSucceedBench[Int, Int, Int] {
externalIn.sendError(Failed)
externalOut.request(1)
externalOut.expectError(Failed)
}
"propagate error on the inner flow after start" in new AllSucceedBench[Int, Int, Int] {
externalOut.request(99)
// send one element through
externalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
internalIn.sendNext(1 -> 0)
externalOut.expectNext(1 -> 0)
// fail inner flow
internalIn.sendError(Failed)
externalOut.expectError(Failed)
externalIn.expectCancellation()
}
"propagate error on the inner flow on start" in new AllSucceedBench[Int, Int, Int] {
externalOut.request(29)
internalIn.sendError(Failed)
externalOut.expectError(Failed)
externalIn.expectCancellation()
}
"propagate non-error cancel on the inner flow on start" in new AllSucceedBench[Int, Int, Int] {
externalOut.request(29)
internalOut.cancel()
externalOut.expectComplete()
externalIn.expectCancellation()
}
"propagate error on the inner flow before start" in new AllSucceedBench[Int, Int, Int] {
internalIn.sendError(Failed)
externalOut.request(13)
externalOut.expectError(Failed)
externalIn.expectCancellation()
}
"propagate error before the RetryFlow, while on retry spin" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) {
externalOut.request(92)
// spinning message
externalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
internalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
externalOut.expectNoMessage()
externalIn.sendError(Failed)
externalOut.expectError(Failed)
}
"propagate error on the inner flow, while on retry spin" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) {
externalOut.request(35)
// spinning message
externalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
internalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
externalOut.expectNoMessage()
internalIn.sendError(Failed)
externalOut.expectError(Failed)
externalIn.expectCancellation()
}
"allow for downstream cancel while element is in flow" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) {
externalOut.request(78)
// spinning message
externalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
externalOut.cancel()
internalIn.expectCancellation()
externalIn.expectCancellation()
}
"allow for downstream cancel while retry is backing off" in new ConstructBench[Int, Int, Int]((v, _) => Some(v)) {
externalOut.request(35)
// spinning message
externalIn.sendNext(1 -> 0)
internalOut.requestNext(1 -> 0)
internalIn.sendNext(1 -> 0)
externalOut.cancel()
internalIn.expectCancellation()
externalIn.expectCancellation()
}
"finish only after processing all elements in stream" in new AllSucceedBench[Int, Int, Int] {
externalOut.request(32)
// send one element and complete
externalIn.sendNext(1 -> 0)
externalIn.sendComplete()
internalOut.requestNext(1 -> 0)
internalIn.sendNext(1 -> 0)
externalOut.requestNext(1 -> 0)
externalOut.expectComplete()
}
}
"Coordinator" should {
type InData = String
type Ctx2 = Int
type OutData = Try[String]
"send elements across" in new AllSucceedBench[InData, Ctx2, OutData] {
// push element
val elA = "A" -> 123
externalIn.sendNext(elA)
// let element go via retryable flow
internalOut.requestNext(elA)
private val resA = Success("result A") -> 123
internalIn.sendNext(resA)
// expect result
externalOut.requestNext(resA)
}
"retry for a failure" in new ConstructBench[InData, Ctx2, OutData]((_, out) =>
out._1 match {
case Success(_) => None
case Failure(_) => Some("A'" -> 2)
}) {
// demand on stream end
externalOut.request(1)
// push element
val element1 = "A" -> 1
externalIn.sendNext(element1)
// let element go via retryable flow
internalOut.requestNext(element1)
internalIn.sendNext(Failure(new RuntimeException("boom")) -> 1)
// let element go via retryable flow
val try2 = internalOut.requestNext(3.seconds)
try2._1 shouldBe "A'"
try2._2 shouldBe 2
val res1 = Success("Ares") -> 1
internalIn.sendNext(res1)
// expect result
externalOut.requestNext(res1)
}
"fail if inner flow emits twice" in new AllSucceedBench[InData, Ctx2, OutData] {
externalOut.request(99)
// push element
val elA = "A" -> 123
externalIn.sendNext(elA)
// let element go via retryable flow
internalOut.requestNext(elA)
val resA = Success("result A") -> 123
internalIn.sendNext(resA)
internalIn.sendNext(Success("result B") -> 222)
// expect result
externalOut.requestNext(resA)
externalOut.expectError() shouldBe an[IllegalStateException]
}
"allow more demand in inner flow (but never pass in more than one element into the retrying cycle)" in new AllSucceedBench[
InData,
Ctx2,
OutData] {
externalOut.request(1)
internalIn.expectRequest() shouldBe 1L
internalOut.request(1)
externalIn.expectRequest() shouldBe 1L
// push element
val elA = "A" -> 123
externalIn.sendNext(elA)
// let element go via retryable flow
internalOut.expectNext(elA)
// more demand on retryable flow
internalOut.request(1)
internalOut.expectNoMessage(50.millis)
// emit from retryable flow, push to external
val resA = Success("result A") -> 123
internalIn.sendNext(resA)
// element should NOT reach internal before pushed from externalOut
internalOut.expectNoMessage(50.millis)
// put a new element in external in buffer
val elB = "B" -> 567
externalIn.sendNext(elB)
// element reaches internal flow before elA has bean fetched from externalOut
internalOut.expectNext(elB)
externalOut.expectNext(resA)
}
}
class ConstructBench[In, Ctx, Out](retryWith: ((In, Ctx), (Out, Ctx)) => Option[(In, Ctx)]) {
val throughDangerousFlow
: FlowWithContext[In, Ctx, Out, Ctx, (TestSubscriber.Probe[(In, Ctx)], TestPublisher.Probe[(Out, Ctx)])] =
FlowWithContext.fromTuples(
Flow.fromSinkAndSourceMat(TestSink.probe[(In, Ctx)], TestSource.probe[(Out, Ctx)])(Keep.both))
val ((externalIn, (internalOut, internalIn)), externalOut) =
TestSource
.probe[(In, Ctx)]
.viaMat(
RetryFlow.withBackoffAndContext(
minBackoff = 10.millis,
maxBackoff = 1.second,
randomFactor = 0d,
maxRetries = 3,
throughDangerousFlow)(retryWith))(Keep.both)
.toMat(TestSink.probe[(Out, Ctx)])(Keep.both)
.run()
}
class AllSucceedBench[In, Ctx, Out] extends ConstructBench[In, Ctx, Out]((_, _) => None)
}
trait CustomMatchers {
class StrictlyIncreasesMatcher() extends Matcher[Seq[Long]] {
def apply(left: Seq[Long]): MatchResult = {
val result = left.sliding(2).map(pair => pair.head < pair.last).reduceOption(_ && _).getOrElse(false)
MatchResult(
result,
s"""Collection $left elements are not increasing strictly""",
s"""Collection $left elements are increasing strictly""")
}
}
def strictlyIncrease = new StrictlyIncreasesMatcher()
}

View file

@ -0,0 +1,150 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.pattern.BackoffSupervisor
import akka.stream.SubscriptionWithCancelException.NonFailureCancellation
import akka.stream.stage._
import akka.stream.{ Attributes, BidiShape, Inlet, Outlet }
import akka.util.OptionVal
import scala.concurrent.duration._
/**
* INTERNAL API.
*
* ```
* externalIn
* |
* |
* +-> internalOut -->+
* | |
* | flow
* | |
* | internalIn --+
* +<-yes- retry?
* |
* no
* |
* externalOut
* ```
*/
@InternalApi private[akka] final class RetryFlowCoordinator[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRetries: Int,
decideRetry: (In, Out) => Option[In])
extends GraphStage[BidiShape[In, In, Out, Out]] {
private val externalIn = Inlet[In]("RetryFlow.externalIn")
private val externalOut = Outlet[Out]("RetryFlow.externalOut")
private val internalOut = Outlet[In]("RetryFlow.internalOut")
private val internalIn = Inlet[Out]("RetryFlow.internalIn")
override val shape: BidiShape[In, In, Out, Out] =
BidiShape(externalIn, internalOut, internalIn, externalOut)
override def createLogic(attributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var elementInProgress: OptionVal[In] = OptionVal.none
private var retryNo = 0
setHandler(
externalIn,
new InHandler {
override def onPush(): Unit = {
val element = grab(externalIn)
elementInProgress = OptionVal.Some(element)
retryNo = 0
pushInternal(element)
}
override def onUpstreamFinish(): Unit =
if (elementInProgress.isEmpty) {
completeStage()
}
})
setHandler(
internalOut,
new OutHandler {
override def onPull(): Unit = {
if (elementInProgress.isEmpty) {
if (!hasBeenPulled(externalIn) && !isClosed(externalIn)) {
pull(externalIn)
}
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (elementInProgress.isEmpty || !cause.isInstanceOf[NonFailureCancellation]) {
super.onDownstreamFinish(cause)
} else {
// emit elements before finishing
setKeepGoing(true)
}
}
})
setHandler(
internalIn,
new InHandler {
override def onPush(): Unit = {
val result = grab(internalIn)
elementInProgress match {
case OptionVal.None =>
failStage(
new IllegalStateException(
s"inner flow emitted unexpected element $result; the flow must be one-in one-out"))
case OptionVal.Some((_, _)) if retryNo == maxRetries => pushExternal(result)
case OptionVal.Some(in) =>
decideRetry(in, result) match {
case None => pushExternal(result)
case Some(element) => planRetry(element)
}
}
}
})
setHandler(externalOut, new OutHandler {
override def onPull(): Unit =
// external demand
if (!hasBeenPulled(internalIn)) pull(internalIn)
})
private def pushInternal(element: In): Unit = {
push(internalOut, element)
}
private def pushExternal(result: Out): Unit = {
elementInProgress = OptionVal.none
push(externalOut, result)
if (isClosed(externalIn)) {
completeStage()
} else if (isAvailable(internalOut)) {
pull(externalIn)
}
}
private def planRetry(element: In): Unit = {
val delay = BackoffSupervisor.calculateDelay(retryNo, minBackoff, maxBackoff, randomFactor)
elementInProgress = OptionVal.Some(element)
retryNo += 1
pull(internalIn)
scheduleOnce(RetryFlowCoordinator.RetryCurrentElement, delay)
}
override def onTimer(timerKey: Any): Unit = pushInternal(elementInProgress.get)
}
}
private object RetryFlowCoordinator {
case object RetryCurrentElement
}

View file

@ -0,0 +1,98 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl
import java.util.Optional
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.stream.scaladsl
import akka.util.JavaDurationConverters._
import scala.compat.java8.OptionConverters._
object RetryFlow {
/**
* API may change!
*
* Allows retrying individual elements in the stream with an exponential backoff.
*
* The retry condition is controlled by the `decideRetry` function. It takes the originally emitted
* element and the response emitted by `flow`, and may return a request to be retried.
*
* The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics,
* the [[akka.stream.javadsl.Flow Flow]] may not filter elements,
* nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are
* emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will
* be emitted into the `flow` at any time. The `flow` needs to emit an element before the next
* will be emitted to it.
*
* @param minBackoff minimum duration to backoff between issuing retries
* @param maxBackoff maximum duration to backoff between issuing retries
* @param randomFactor adds jitter to the retry delay. Use 0 for no jitter
* @param flow a flow to retry elements from
* @param decideRetry retry condition decision function
*/
@ApiMayChange(issue = "https://github.com/akka/akka/issues/27960")
def withBackoff[In, Out, Mat](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRetries: Int,
flow: Flow[In, Out, Mat],
decideRetry: akka.japi.function.Function2[In, Out, Optional[In]]): Flow[In, Out, Mat] =
scaladsl.RetryFlow
.withBackoff[In, Out, Mat](minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRetries, flow.asScala) {
(in, out) =>
decideRetry.apply(in, out).asScala
}
.asJava
/**
* API may change!
*
* Allows retrying individual elements in the stream with an exponential backoff.
*
* The retry condition is controlled by the `decideRetry` function. It takes the originally emitted
* element with its context, and the response emitted by `flow`, and may return a request to be retried.
*
* The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics,
* the [[akka.stream.javadsl.FlowWithContext FlowWithContext]] may not filter elements,
* nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are
* emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will
* be emitted into the `flow` at any time. The `flow` needs to emit an element before the next
* will be emitted to it.
*
* The wrapped `flow` and `decideRetry` take the additional context parameters which can be a context,
* or used to control retrying with other information.
*
* @param minBackoff minimum duration to backoff between issuing retries
* @param maxBackoff maximum duration to backoff between issuing retries
* @param randomFactor adds jitter to the retry delay. Use 0 for no jitter
* @param flow a flow to retry elements from
* @param decideRetry retry condition decision function
*/
@ApiMayChange(issue = "https://github.com/akka/akka/issues/27960")
def withBackoffAndContext[In, InCtx, Out, OutCtx, Mat](
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRetries: Int,
flow: FlowWithContext[In, InCtx, Out, OutCtx, Mat],
decideRetry: akka.japi.function.Function2[Pair[In, InCtx], Pair[Out, OutCtx], Optional[Pair[In, InCtx]]])
: FlowWithContext[In, InCtx, Out, OutCtx, Mat] =
scaladsl.RetryFlow
.withBackoffAndContext[In, InCtx, Out, OutCtx, Mat](
minBackoff.asScala,
maxBackoff.asScala,
randomFactor,
maxRetries,
flow.asScala) { (in, out) =>
decideRetry.apply(Pair(in._1, in._2), Pair(out._1, out._2)).asScala.map(_.toScala)
}
.asJava[In, InCtx, Out, OutCtx, Mat]
}

View file

@ -0,0 +1,96 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.annotation.ApiMayChange
import akka.stream.impl.RetryFlowCoordinator
import scala.concurrent.duration._
object RetryFlow {
/**
* API may change!
*
* Allows retrying individual elements in the stream with an exponential backoff.
*
* The retry condition is controlled by the `decideRetry` function. It takes the originally emitted
* element and the response emitted by `flow`, and may return a request to be retried.
*
* The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics,
* the [[akka.stream.scaladsl.Flow Flow]] may not filter elements,
* nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are
* emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will
* be emitted into the `flow` at any time. The `flow` needs to emit an element before the next
* will be emitted to it.
*
* @param minBackoff minimum duration to backoff between issuing retries
* @param maxBackoff maximum duration to backoff between issuing retries
* @param randomFactor adds jitter to the retry delay. Use 0 for no jitter
* @param maxRetries total number of allowed retries, when reached the last result will be emitted
* even if unsuccessful
* @param flow a flow to retry elements from
* @param decideRetry retry condition decision function
*/
@ApiMayChange(issue = "https://github.com/akka/akka/issues/27960")
def withBackoff[In, Out, Mat](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRetries: Int,
flow: Flow[In, Out, Mat])(decideRetry: (In, Out) => Option[In]): Flow[In, Out, Mat] =
Flow.fromGraph {
val retryCoordination = BidiFlow.fromGraph(
new RetryFlowCoordinator[In, Out](minBackoff, maxBackoff, randomFactor, maxRetries, decideRetry))
retryCoordination.joinMat(flow)(Keep.right)
}
/**
* API may change!
*
* Allows retrying individual elements in the stream with an exponential backoff.
*
* The retry condition is controlled by the `decideRetry` function. It takes the originally emitted
* element with its context, and the response emitted by `flow`, and may return a request to be retried.
*
* The implementation of the `RetryFlow` requires that `flow` follows one-in-one-out semantics,
* the [[akka.stream.scaladsl.FlowWithContext FlowWithContext]] may not filter elements,
* nor emit more than one element per incoming element. The `RetryFlow` will fail if two elements are
* emitted from the `flow`, it will be stuck "forever" if nothing is emitted. Just one element will
* be emitted into the `flow` at any time. The `flow` needs to emit an element before the next
* will be emitted to it.
*
* The wrapped `flow` and `decideRetry` take the additional context parameters which can be a context,
* or used to control retrying with other information.
*
* @param minBackoff minimum duration to backoff between issuing retries
* @param maxBackoff maximum duration to backoff between issuing retries
* @param randomFactor adds jitter to the retry delay. Use 0 for no jitter
* @param maxRetries total number of allowed retries, when reached the last result will be emitted
* even if unsuccessful
* @param flow a flow with context to retry elements from
* @param decideRetry retry condition decision function
*/
@ApiMayChange(issue = "https://github.com/akka/akka/issues/27960")
def withBackoffAndContext[In, CtxIn, Out, CtxOut, Mat](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRetries: Int,
flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat])(
decideRetry: ((In, CtxIn), (Out, CtxOut)) => Option[(In, CtxIn)]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
FlowWithContext.fromTuples {
val retryCoordination = BidiFlow.fromGraph(
new RetryFlowCoordinator[(In, CtxIn), (Out, CtxOut)](
minBackoff,
maxBackoff,
randomFactor,
maxRetries,
decideRetry))
retryCoordination.joinMat(flow)(Keep.right)
}
}

View file

@ -155,6 +155,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala",
"akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala",
"akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala",
"akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala",
"akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala",
// akka-stream-typed
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",