diff --git a/akka-docs/src/main/paradox/scala/stream/stream-error.md b/akka-docs/src/main/paradox/scala/stream/stream-error.md index 73655c4e55..51557f81e3 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-error.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-error.md @@ -101,4 +101,49 @@ Java : @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision } If we would not use `Resume` the default stopping strategy would complete the stream -with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally]. \ No newline at end of file +with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally]. + +## Delayed restarts with a backoff stage + +Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams +also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff +supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts. + +This pattern is useful when the stage fails or completes because some external resource is not available +and we need to give it some time to start-up again. One of the prime examples when this is useful is +when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded. +By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time +to recover, and it avoids using needless resources on the client side. + +The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`] +@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a +stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will +be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due +to the `maxBackoff` parameter): + +Scala +: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #restart-with-backoff-source } + +Java +: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #restart-with-backoff-source } + +Using a `randomFactor` to add a little bit of additional variance to the backoff intervals +is highly recommended, in order to avoid multiple streams re-start at the exact same point in time, +for example because they were stopped due to a shared resource such as the same server going down +and re-starting after the same configured interval. By adding additional randomness to the +re-start intervals the streams will start in slightly different points in time, thus avoiding +large spikes of traffic hitting the recovering server or other resource that they all need to contact. + +The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use +it in combination with a @ref:[`KillSwitch`](stream-dynamic.md#kill-switch), so that you can terminate it when needed: + +Scala +: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #with-kill-switch } + +Java +: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch } + +Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`] +@java[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]. The `RestartSink` is restarted when +it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out + port sends an error. \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java new file mode 100644 index 0000000000..bd909620c2 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package jdocs.stream; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.KillSwitch; +import akka.stream.KillSwitches; +import akka.stream.Materializer; +import akka.stream.javadsl.*; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +public class RestartDocTest { + + static ActorSystem system; + static Materializer materializer; + + // Mocking akka-http + public static class Http { + public static Http get(ActorSystem system) { + return new Http(); + } + public CompletionStage singleRequest(String uri) { + return new CompletableFuture<>(); + } + public NotUsed entity() { + return NotUsed.getInstance(); + } + } + public static class HttpRequest { + public static String create(String uri) { + return uri; + } + } + public static class ServerSentEvent {} + public static class EventStreamUnmarshalling { + public static EventStreamUnmarshalling fromEventStream() { + return new EventStreamUnmarshalling(); + } + public CompletionStage> unmarshall(Http http, Materializer mat) { + return new CompletableFuture<>(); + } + } + public void doSomethingElse() { + + } + + public void recoverWithBackoffSource() { + //#restart-with-backoff-source + Source eventStream = RestartSource.withBackoff( + Duration.apply(3, TimeUnit.SECONDS), // min backoff + Duration.apply(30, TimeUnit.SECONDS), // max backoff + 0.2, // adds 20% "noise" to vary the intervals slightly + + () -> + // Create a source from a future of a source + Source.fromSourceCompletionStage( + // Issue a GET request on the event stream + Http.get(system).singleRequest(HttpRequest.create("http://example.com/eventstream")) + .thenCompose(response -> + // Unmarshall it to a stream of ServerSentEvents + EventStreamUnmarshalling.fromEventStream() + .unmarshall(response, materializer) + ) + ) + ); + //#restart-with-backoff-source + + //#with-kill-switch + KillSwitch killSwitch = eventStream + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(Sink.foreach(event -> System.out.println("Got event: " + event)), Keep.left()) + .run(materializer); + + doSomethingElse(); + + killSwitch.shutdown(); + //#with-kill-switch + + } +} \ No newline at end of file diff --git a/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala b/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala new file mode 100644 index 0000000000..9c75af6117 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package docs.stream + +import akka.NotUsed +import akka.stream.{ ActorMaterializer, KillSwitches } +import akka.stream.scaladsl._ +import akka.testkit.AkkaSpec +import docs.CompileOnlySpec + +import scala.concurrent.duration._ +import scala.concurrent._ + +class RestartDocSpec extends AkkaSpec with CompileOnlySpec { + implicit val materializer = ActorMaterializer() + import system.dispatcher + + // Mock akka-http interfaces + object Http { + def apply() = this + def singleRequest(req: HttpRequest) = Future.successful(()) + } + case class HttpRequest(uri: String) + case class Unmarshal(b: Any) { + def to[T]: Future[T] = Promise[T]().future + } + case class ServerSentEvent() + + def doSomethingElse(): Unit = () + + "Restart stages" should { + + "demonstrate a restart with backoff source" in compileOnlySpec { + + //#restart-with-backoff-source + val restartSource = RestartSource.withBackoff( + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + ) { () => + // Create a source from a future of a source + Source.fromFutureSource { + // Make a single request with akka-http + Http().singleRequest(HttpRequest( + uri = "http://example.com/eventstream" + )) + // Unmarshall it as a source of server sent events + .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) + } + } + //#restart-with-backoff-source + + //#with-kill-switch + val killSwitch = restartSource + .viaMat(KillSwitches.single)(Keep.right) + .toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left) + .run() + + doSomethingElse() + + killSwitch.shutdown() + //#with-kill-switch + } + + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala new file mode 100644 index 0000000000..ccbfddc9b2 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -0,0 +1,491 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.stream.ActorMaterializer +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.testkit.DefaultTimeout + +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } + +class RestartSpec extends StreamSpec with DefaultTimeout { + + implicit val mat = ActorMaterializer() + import system.dispatcher + + "A restart with backoff source" should { + "run normally" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒ + created.incrementAndGet() + Source.repeat("a") + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("a") + probe.requestNext("a") + probe.requestNext("a") + probe.requestNext("a") + + created.get() should ===(1) + + probe.cancel() + } + + "restart on completion" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒ + created.incrementAndGet() + Source(List("a", "b")) + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + + created.get() should ===(3) + + probe.cancel() + } + + "restart on failure" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { () ⇒ + created.incrementAndGet() + Source(List("a", "b", "c")) + .map { + case "c" ⇒ throw TE("failed") + case other ⇒ other + } + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + + created.get() should ===(3) + + probe.cancel() + } + + "backoff before restart" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(200.millis, 1.second, 0) { () ⇒ + created.incrementAndGet() + Source(List("a", "b")) + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("b") + probe.request(1) + // There should be a delay of at least 200ms before we receive the element, wait for 100ms. + val deadline = 100.millis.fromNow + // But the delay shouldn't be more than 300ms. + probe.expectNext(300.milliseconds, "a") + deadline.isOverdue() should be(true) + + created.get() should ===(2) + + probe.cancel() + } + + "reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + created.incrementAndGet() + Source(List("a", "b")) + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.requestNext("b") + // There should be a 200ms delay + probe.requestNext("a") + probe.requestNext("b") + probe.request(1) + // The probe should now be backing off for 400ms + + // Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the + // subsequent 200ms min backoff to pass, so it resets the restart count + Thread.sleep(700) + + probe.expectNext("a") + probe.requestNext("b") + + // We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the + // next element within 300ms + probe.requestNext(300.milliseconds) should ===("a") + + created.get() should ===(4) + + probe.cancel() + } + + "cancel the currently running source when cancelled" in assertAllStagesStopped { + val created = new AtomicInteger() + val promise = Promise[Done]() + val probe = RestartSource.withBackoff(10.millis, 2.seconds, 0) { () ⇒ + created.incrementAndGet() + Source.repeat("a").watchTermination() { (_, term) ⇒ + promise.completeWith(term) + } + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.cancel() + + promise.future.futureValue should ===(Done) + + // Wait to ensure it isn't restarted + Thread.sleep(200) + created.get() should ===(1) + } + + "not restart the source when cancelled while backing off" in assertAllStagesStopped { + val created = new AtomicInteger() + val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + created.incrementAndGet() + Source.single("a") + }.runWith(TestSink.probe) + + probe.requestNext("a") + probe.request(1) + // Should be backing off now + probe.cancel() + + // Wait to ensure it isn't restarted + Thread.sleep(300) + created.get() should ===(1) + } + } + + "A restart with backoff sink" should { + "run normally" in assertAllStagesStopped { + val created = new AtomicInteger() + val result = Promise[Seq[String]]() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { () ⇒ + created.incrementAndGet() + Sink.seq.mapMaterializedValue(result.completeWith) + })(Keep.left).run() + + probe.sendNext("a") + probe.sendNext("b") + probe.sendNext("c") + probe.sendComplete() + + result.future.futureValue should contain inOrderOnly ("a", "b", "c") + created.get() should ===(1) + } + + "restart on cancellation" in assertAllStagesStopped { + val created = new AtomicInteger() + val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { () ⇒ + created.incrementAndGet() + Flow[String].takeWhile(_ != "cancel", inclusive = true) + .to(Sink.foreach(queue.sendNext)) + })(Keep.left).run() + + probe.sendNext("a") + sinkProbe.requestNext("a") + probe.sendNext("b") + sinkProbe.requestNext("b") + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + probe.sendNext("c") + sinkProbe.requestNext("c") + + created.get() should ===(2) + + sinkProbe.cancel() + probe.sendComplete() + } + + "backoff before restart" in assertAllStagesStopped { + val created = new AtomicInteger() + val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + created.incrementAndGet() + Flow[String].takeWhile(_ != "cancel", inclusive = true) + .to(Sink.foreach(queue.sendNext)) + })(Keep.left).run() + + probe.sendNext("a") + sinkProbe.requestNext("a") + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + probe.sendNext("b") + sinkProbe.request(1) + val deadline = 100.millis.fromNow + sinkProbe.expectNext(300.millis, "b") + deadline.isOverdue() should be(true) + + created.get() should ===(2) + + sinkProbe.cancel() + probe.sendComplete() + } + + "reset exponential backoff back to minimum when sink runs for at least minimum backoff without completing" in assertAllStagesStopped { + val created = new AtomicInteger() + val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + created.incrementAndGet() + Flow[String].takeWhile(_ != "cancel", inclusive = true) + .to(Sink.foreach(queue.sendNext)) + })(Keep.left).run() + + probe.sendNext("a") + sinkProbe.requestNext("a") + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + // There should be a 200ms delay + probe.sendNext("b") + sinkProbe.requestNext("b") + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + sinkProbe.request(1) + // The probe should now be backing off for 400ms + + // Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the + // subsequent 200ms min backoff to pass, so it resets the restart count + Thread.sleep(700) + + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + + // We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the + // next element within 300ms + probe.sendNext("c") + sinkProbe.request(1) + sinkProbe.expectNext(300.milliseconds, "c") + + created.get() should ===(4) + + sinkProbe.cancel() + probe.sendComplete() + } + + "not restart the sink when completed while backing off" in assertAllStagesStopped { + val created = new AtomicInteger() + val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { () ⇒ + created.incrementAndGet() + Flow[String].takeWhile(_ != "cancel", inclusive = true) + .to(Sink.foreach(queue.sendNext)) + })(Keep.left).run() + + probe.sendNext("a") + sinkProbe.requestNext("a") + probe.sendNext("cancel") + sinkProbe.requestNext("cancel") + // Should be backing off now + probe.sendComplete() + + // Wait to ensure it isn't restarted + Thread.sleep(300) + created.get() should ===(1) + + sinkProbe.cancel() + } + } + + "A restart with backoff flow" should { + + def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration) = { + val created = new AtomicInteger() + val (flowInSource, flowInProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run() + val (flowOutProbe, flowOutSource) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run() + + // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we + // simply use the probes as a message bus for feeding and capturing events. + val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0) { () ⇒ + created.incrementAndGet() + Flow.fromSinkAndSource( + Flow[String].takeWhile(_ != "cancel").to(Sink.foreach(flowInSource.sendNext).mapMaterializedValue(_.onComplete { + case Success(_) ⇒ flowInSource.sendNext("in complete") + case Failure(_) ⇒ flowInSource.sendNext("in error") + })), + flowOutSource.takeWhile(_ != "complete").map { + case "error" ⇒ throw TE("error") + case other ⇒ other + }.watchTermination()((_, term) ⇒ + term.foreach(_ ⇒ { + flowInSource.sendNext("out complete") + }) + ) + ) + })(Keep.left).toMat(TestSink.probe[String])(Keep.both).run() + + (created, source, flowInProbe, flowOutProbe, sink) + } + + "run normally" in assertAllStagesStopped { + val created = new AtomicInteger() + val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(10.millis, 20.millis, 0) { () ⇒ + created.incrementAndGet() + Flow[String] + })(Keep.left).toMat(TestSink.probe[String])(Keep.both).run() + + source.sendNext("a") + sink.requestNext("a") + source.sendNext("b") + sink.requestNext("b") + + created.get() should ===(1) + + source.sendComplete() + } + + "restart on cancellation" in { + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis) + + source.sendNext("a") + flowInProbe.requestNext("a") + flowOutProbe.sendNext("b") + sink.requestNext("b") + + source.sendNext("cancel") + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.request(2) + Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete") + + // and it should restart + source.sendNext("c") + flowInProbe.requestNext("c") + flowOutProbe.sendNext("d") + sink.requestNext("d") + + created.get() should ===(2) + } + + "restart on completion" in { + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis) + + source.sendNext("a") + flowInProbe.requestNext("a") + flowOutProbe.sendNext("b") + sink.requestNext("b") + + sink.request(1) + flowOutProbe.sendNext("complete") + + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.request(2) + Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete") + + // and it should restart + source.sendNext("c") + flowInProbe.requestNext("c") + flowOutProbe.sendNext("d") + sink.requestNext("d") + + created.get() should ===(2) + } + + "restart on failure" in { + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis) + + source.sendNext("a") + flowInProbe.requestNext("a") + flowOutProbe.sendNext("b") + sink.requestNext("b") + + sink.request(1) + flowOutProbe.sendNext("error") + + // This should complete the in probe + flowInProbe.requestNext("in complete") + + // and it should restart + source.sendNext("c") + flowInProbe.requestNext("c") + flowOutProbe.sendNext("d") + sink.requestNext("d") + + created.get() should ===(2) + } + + "backoff before restart" in { + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(200.millis, 2.seconds) + + source.sendNext("a") + flowInProbe.requestNext("a") + flowOutProbe.sendNext("b") + sink.requestNext("b") + + source.sendNext("cancel") + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.request(2) + Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete") + + source.sendNext("c") + flowInProbe.request(1) + val deadline = 100.millis.fromNow + flowInProbe.expectNext(300.millis, "c") + deadline.isOverdue() should be(true) + + created.get() should ===(2) + } + + "continue running flow out port after in has been sent completion" in { + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds) + + source.sendNext("a") + flowInProbe.requestNext("a") + flowOutProbe.sendNext("b") + sink.requestNext("b") + + source.sendComplete() + flowInProbe.requestNext("in complete") + + flowOutProbe.sendNext("c") + sink.requestNext("c") + flowOutProbe.sendNext("d") + sink.requestNext("d") + + sink.request(1) + flowOutProbe.sendComplete() + flowInProbe.requestNext("out complete") + sink.expectComplete() + + created.get() should ===(1) + } + + "continue running flow in port after out has been cancelled" in { + val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds) + + source.sendNext("a") + flowInProbe.requestNext("a") + flowOutProbe.sendNext("b") + sink.requestNext("b") + + sink.cancel() + flowInProbe.requestNext("out complete") + + source.sendNext("c") + flowInProbe.requestNext("c") + source.sendNext("d") + flowInProbe.requestNext("d") + + source.sendNext("cancel") + flowInProbe.requestNext("in complete") + source.expectCancellation() + + created.get() should ===(1) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala new file mode 100644 index 0000000000..84ca1daf8c --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.javadsl + +import akka.NotUsed +import akka.japi.function.Creator +import akka.stream.KillSwitch +import akka.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for + * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSource ensures that the graph can continue running while the [[Source]] restarts. + */ +object RestartSource { + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] + * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + sourceFactory.create().asScala + }.asJava + } +} + +/** + * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for + * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. + */ +object RestartSink { + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. + * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that + * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered + * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the + * graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + sinkFactory.create().asScala + }.asJava + } +} + +/** + * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for + * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The + * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts. + */ +object RestartFlow { + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination + * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] + * will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + flowFactory.create().asScala + }.asJava + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1f4c1168e9..0bb52f85aa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -184,13 +184,14 @@ object Source { * Streams the elements of the given future source once it successfully completes. * If the future fails the stream is failed. */ - def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) + def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) /** * Streams the elements of an asynchronous source once its given `completion` stage completes. * If the `completion` fails the stream is failed with that exception. */ - def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion)) + def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = + new Source(scaladsl.Source.fromSourceCompletionStage(completion)) /** * Elements are emitted periodically with the specified interval. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala new file mode 100644 index 0000000000..0038e2fbb5 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala @@ -0,0 +1,354 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.NotUsed +import akka.pattern.BackoffSupervisor +import akka.stream._ +import akka.stream.stage.{ GraphStage, InHandler, OutHandler, TimerGraphStageLogicWithLogging } + +import scala.concurrent.duration.FiniteDuration + +/** + * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for + * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSource ensures that the graph can continue running while the [[Source]] restarts. + */ +object RestartSource { + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] + * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { + Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor)) + } +} + +private final class RestartWithBackoffSource[T]( + sourceFactory: () ⇒ Source[T, _], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double +) extends GraphStage[SourceShape[T]] { self ⇒ + + val out = Outlet[T]("RestartWithBackoffSource.out") + + override def shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( + "Source", shape, minBackoff, maxBackoff, randomFactor + ) { + + override protected def logSource = self.getClass + + override protected def startGraph() = { + val sinkIn = createSubInlet(out) + sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer) + if (isAvailable(out)) { + sinkIn.pull() + } + } + + override protected def backoff() = { + setHandler(out, new OutHandler { + override def onPull() = () + }) + } + + backoff() + } +} + +/** + * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for + * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The + * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. + */ +object RestartSink { + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. + * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that + * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered + * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the + * graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { + Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor)) + } +} + +private final class RestartWithBackoffSink[T]( + sinkFactory: () ⇒ Sink[T, _], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double +) extends GraphStage[SinkShape[T]] { self ⇒ + + val in = Inlet[T]("RestartWithBackoffSink.in") + + override def shape = SinkShape(in) + override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( + "Sink", shape, minBackoff, maxBackoff, randomFactor + ) { + override protected def logSource = self.getClass + + override protected def startGraph() = { + val sourceOut = createSubOutlet(in) + Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer) + } + + override protected def backoff() = { + setHandler(in, new InHandler { + override def onPush() = () + }) + } + + backoff() + } +} + +/** + * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for + * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The + * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts. + */ +object RestartFlow { + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination + * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] + * will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () ⇒ Flow[In, Out, _]): Flow[In, Out, NotUsed] = { + Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor)) + } +} + +private final class RestartWithBackoffFlow[In, Out]( + flowFactory: () ⇒ Flow[In, Out, _], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double +) extends GraphStage[FlowShape[In, Out]] { self ⇒ + + val in = Inlet[In]("RestartWithBackoffFlow.in") + val out = Outlet[Out]("RestartWithBackoffFlow.out") + + override def shape = FlowShape(in, out) + override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( + "Flow", shape, minBackoff, maxBackoff, randomFactor + ) { + + var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None + + override protected def logSource = self.getClass + + override protected def startGraph() = { + val sourceOut = createSubOutlet(in) + val sinkIn = createSubInlet(out) + Source.fromGraph(sourceOut.source).via(flowFactory()).runWith(sinkIn.sink)(subFusingMaterializer) + if (isAvailable(out)) { + sinkIn.pull() + } + activeOutIn = Some((sourceOut, sinkIn)) + } + + override protected def backoff() = { + setHandler(in, new InHandler { + override def onPush() = () + }) + setHandler(out, new OutHandler { + override def onPull() = () + }) + + // We need to ensure that the other end of the sub flow is also completed, so that we don't + // receive any callbacks from it. + activeOutIn.foreach { + case (sourceOut, sinkIn) ⇒ + if (!sourceOut.isClosed) { + sourceOut.complete() + } + if (!sinkIn.isClosed) { + sinkIn.cancel() + } + activeOutIn = None + } + } + + backoff() + } +} + +/** + * Shared logic for all restart with backoff logics. + */ +private abstract class RestartWithBackoffLogic[S <: Shape]( + name: String, + shape: S, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double +) extends TimerGraphStageLogicWithLogging(shape) { + var restartCount = 0 + var resetDeadline = minBackoff.fromNow + // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we + // don't want to restart the sub inlet when it finishes, we just finish normally. + var finishing = false + + protected def startGraph(): Unit + protected def backoff(): Unit + + protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = { + val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn") + + sinkIn.setHandler(new InHandler { + override def onPush() = push(out, sinkIn.grab()) + override def onUpstreamFinish() = { + if (finishing) { + complete(out) + } else { + log.debug("Graph out finished") + onCompleteOrFailure() + } + } + override def onUpstreamFailure(ex: Throwable) = { + if (finishing) { + fail(out, ex) + } else { + log.error(ex, "Restarting graph due to failure") + onCompleteOrFailure() + } + } + }) + + setHandler(out, new OutHandler { + override def onPull() = sinkIn.pull() + override def onDownstreamFinish() = { + finishing = true + sinkIn.cancel() + } + }) + + sinkIn + } + + protected final def createSubOutlet[T](in: Inlet[T]): SubSourceOutlet[T] = { + val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut") + + sourceOut.setHandler(new OutHandler { + override def onPull() = if (isAvailable(in)) { + sourceOut.push(grab(in)) + } else { + pull(in) + } + override def onDownstreamFinish() = { + if (finishing) { + cancel(in) + } else { + log.debug("Graph in finished") + onCompleteOrFailure() + } + } + }) + + setHandler(in, new InHandler { + override def onPush() = if (sourceOut.isAvailable) { + sourceOut.push(grab(in)) + } + override def onUpstreamFinish() = { + finishing = true + sourceOut.complete() + } + override def onUpstreamFailure(ex: Throwable) = { + finishing = true + sourceOut.fail(ex) + } + }) + + sourceOut + } + + // Set a timer to restart after the calculated delay + protected final def onCompleteOrFailure() = { + // Check if the last start attempt was more than the minimum backoff + if (resetDeadline.isOverdue()) { + log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) + restartCount = 0 + } + + val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) + log.debug("Restarting graph in {}", restartDelay) + scheduleOnce("RestartTimer", restartDelay) + restartCount += 1 + // And while we wait, we go into backoff mode + backoff() + } + + // Invoked when the backoff timer ticks + override protected def onTimer(timerKey: Any) = { + startGraph() + resetDeadline = minBackoff.fromNow + } + + // When the stage starts, start the source + override def preStart() = startGraph() +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index e884e47962..1e1f4358ca 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -275,7 +275,7 @@ object Source { * Streams the elements of an asynchronous source once its given `completion` stage completes. * If the `completion` fails the stream is failed with that exception. */ - def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava) + def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava) /** * Elements are emitted periodically with the specified interval.