Restart Flow/Source/Sink #19950

This commit is contained in:
James Roper 2017-07-20 23:02:34 +10:00 committed by Johan Andrén
parent b43e6b519a
commit c60d20af32
8 changed files with 1174 additions and 4 deletions

View file

@ -101,4 +101,49 @@ Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision }
If we would not use `Resume` the default stopping strategy would complete the stream
with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally].
with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally].
## Delayed restarts with a backoff stage
Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams
also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts.
This pattern is useful when the stage fails or completes because some external resource is not available
and we need to give it some time to start-up again. One of the prime examples when this is useful is
when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded.
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.
The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`]
@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a
stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will
be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due
to the `maxBackoff` parameter):
Scala
: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #restart-with-backoff-source }
Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #restart-with-backoff-source }
Using a `randomFactor` to add a little bit of additional variance to the backoff intervals
is highly recommended, in order to avoid multiple streams re-start at the exact same point in time,
for example because they were stopped due to a shared resource such as the same server going down
and re-starting after the same configured interval. By adding additional randomness to the
re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact.
The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use
it in combination with a @ref:[`KillSwitch`](stream-dynamic.md#kill-switch), so that you can terminate it when needed:
Scala
: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #with-kill-switch }
Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch }
Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]
@java[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]. The `RestartSink` is restarted when
it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out
port sends an error.

View file

@ -0,0 +1,86 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.stream;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.javadsl.*;
import scala.concurrent.duration.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class RestartDocTest {
static ActorSystem system;
static Materializer materializer;
// Mocking akka-http
public static class Http {
public static Http get(ActorSystem system) {
return new Http();
}
public CompletionStage<Http> singleRequest(String uri) {
return new CompletableFuture<>();
}
public NotUsed entity() {
return NotUsed.getInstance();
}
}
public static class HttpRequest {
public static String create(String uri) {
return uri;
}
}
public static class ServerSentEvent {}
public static class EventStreamUnmarshalling {
public static EventStreamUnmarshalling fromEventStream() {
return new EventStreamUnmarshalling();
}
public CompletionStage<Source<ServerSentEvent, NotUsed>> unmarshall(Http http, Materializer mat) {
return new CompletableFuture<>();
}
}
public void doSomethingElse() {
}
public void recoverWithBackoffSource() {
//#restart-with-backoff-source
Source<ServerSentEvent, NotUsed> eventStream = RestartSource.withBackoff(
Duration.apply(3, TimeUnit.SECONDS), // min backoff
Duration.apply(30, TimeUnit.SECONDS), // max backoff
0.2, // adds 20% "noise" to vary the intervals slightly
() ->
// Create a source from a future of a source
Source.fromSourceCompletionStage(
// Issue a GET request on the event stream
Http.get(system).singleRequest(HttpRequest.create("http://example.com/eventstream"))
.thenCompose(response ->
// Unmarshall it to a stream of ServerSentEvents
EventStreamUnmarshalling.fromEventStream()
.unmarshall(response, materializer)
)
)
);
//#restart-with-backoff-source
//#with-kill-switch
KillSwitch killSwitch = eventStream
.viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.foreach(event -> System.out.println("Got event: " + event)), Keep.left())
.run(materializer);
doSomethingElse();
killSwitch.shutdown();
//#with-kill-switch
}
}

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.stream
import akka.NotUsed
import akka.stream.{ ActorMaterializer, KillSwitches }
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
import scala.concurrent.duration._
import scala.concurrent._
class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
implicit val materializer = ActorMaterializer()
import system.dispatcher
// Mock akka-http interfaces
object Http {
def apply() = this
def singleRequest(req: HttpRequest) = Future.successful(())
}
case class HttpRequest(uri: String)
case class Unmarshal(b: Any) {
def to[T]: Future[T] = Promise[T]().future
}
case class ServerSentEvent()
def doSomethingElse(): Unit = ()
"Restart stages" should {
"demonstrate a restart with backoff source" in compileOnlySpec {
//#restart-with-backoff-source
val restartSource = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) { () =>
// Create a source from a future of a source
Source.fromFutureSource {
// Make a single request with akka-http
Http().singleRequest(HttpRequest(
uri = "http://example.com/eventstream"
))
// Unmarshall it as a source of server sent events
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
}
}
//#restart-with-backoff-source
//#with-kill-switch
val killSwitch = restartSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
.run()
doSomethingElse()
killSwitch.shutdown()
//#with-kill-switch
}
}
}

View file

@ -0,0 +1,491 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.stream.ActorMaterializer
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.DefaultTimeout
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
class RestartSpec extends StreamSpec with DefaultTimeout {
implicit val mat = ActorMaterializer()
import system.dispatcher
"A restart with backoff source" should {
"run normally" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { ()
created.incrementAndGet()
Source.repeat("a")
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("a")
probe.requestNext("a")
probe.requestNext("a")
probe.requestNext("a")
created.get() should ===(1)
probe.cancel()
}
"restart on completion" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { ()
created.incrementAndGet()
Source(List("a", "b"))
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
created.get() should ===(3)
probe.cancel()
}
"restart on failure" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(10.millis, 20.millis, 0) { ()
created.incrementAndGet()
Source(List("a", "b", "c"))
.map {
case "c" throw TE("failed")
case other other
}
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
created.get() should ===(3)
probe.cancel()
}
"backoff before restart" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(200.millis, 1.second, 0) { ()
created.incrementAndGet()
Source(List("a", "b"))
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
probe.request(1)
// There should be a delay of at least 200ms before we receive the element, wait for 100ms.
val deadline = 100.millis.fromNow
// But the delay shouldn't be more than 300ms.
probe.expectNext(300.milliseconds, "a")
deadline.isOverdue() should be(true)
created.get() should ===(2)
probe.cancel()
}
"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { ()
created.incrementAndGet()
Source(List("a", "b"))
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
// There should be a 200ms delay
probe.requestNext("a")
probe.requestNext("b")
probe.request(1)
// The probe should now be backing off for 400ms
// Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the
// subsequent 200ms min backoff to pass, so it resets the restart count
Thread.sleep(700)
probe.expectNext("a")
probe.requestNext("b")
// We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the
// next element within 300ms
probe.requestNext(300.milliseconds) should ===("a")
created.get() should ===(4)
probe.cancel()
}
"cancel the currently running source when cancelled" in assertAllStagesStopped {
val created = new AtomicInteger()
val promise = Promise[Done]()
val probe = RestartSource.withBackoff(10.millis, 2.seconds, 0) { ()
created.incrementAndGet()
Source.repeat("a").watchTermination() { (_, term)
promise.completeWith(term)
}
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.cancel()
promise.future.futureValue should ===(Done)
// Wait to ensure it isn't restarted
Thread.sleep(200)
created.get() should ===(1)
}
"not restart the source when cancelled while backing off" in assertAllStagesStopped {
val created = new AtomicInteger()
val probe = RestartSource.withBackoff(200.millis, 2.seconds, 0) { ()
created.incrementAndGet()
Source.single("a")
}.runWith(TestSink.probe)
probe.requestNext("a")
probe.request(1)
// Should be backing off now
probe.cancel()
// Wait to ensure it isn't restarted
Thread.sleep(300)
created.get() should ===(1)
}
}
"A restart with backoff sink" should {
"run normally" in assertAllStagesStopped {
val created = new AtomicInteger()
val result = Promise[Seq[String]]()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { ()
created.incrementAndGet()
Sink.seq.mapMaterializedValue(result.completeWith)
})(Keep.left).run()
probe.sendNext("a")
probe.sendNext("b")
probe.sendNext("c")
probe.sendComplete()
result.future.futureValue should contain inOrderOnly ("a", "b", "c")
created.get() should ===(1)
}
"restart on cancellation" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(10.millis, 20.millis, 0) { ()
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true)
.to(Sink.foreach(queue.sendNext))
})(Keep.left).run()
probe.sendNext("a")
sinkProbe.requestNext("a")
probe.sendNext("b")
sinkProbe.requestNext("b")
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
probe.sendNext("c")
sinkProbe.requestNext("c")
created.get() should ===(2)
sinkProbe.cancel()
probe.sendComplete()
}
"backoff before restart" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { ()
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true)
.to(Sink.foreach(queue.sendNext))
})(Keep.left).run()
probe.sendNext("a")
sinkProbe.requestNext("a")
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
probe.sendNext("b")
sinkProbe.request(1)
val deadline = 100.millis.fromNow
sinkProbe.expectNext(300.millis, "b")
deadline.isOverdue() should be(true)
created.get() should ===(2)
sinkProbe.cancel()
probe.sendComplete()
}
"reset exponential backoff back to minimum when sink runs for at least minimum backoff without completing" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { ()
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true)
.to(Sink.foreach(queue.sendNext))
})(Keep.left).run()
probe.sendNext("a")
sinkProbe.requestNext("a")
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// There should be a 200ms delay
probe.sendNext("b")
sinkProbe.requestNext("b")
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
sinkProbe.request(1)
// The probe should now be backing off for 400ms
// Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the
// subsequent 200ms min backoff to pass, so it resets the restart count
Thread.sleep(700)
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the
// next element within 300ms
probe.sendNext("c")
sinkProbe.request(1)
sinkProbe.expectNext(300.milliseconds, "c")
created.get() should ===(4)
sinkProbe.cancel()
probe.sendComplete()
}
"not restart the sink when completed while backing off" in assertAllStagesStopped {
val created = new AtomicInteger()
val (queue, sinkProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val probe = TestSource.probe[String].toMat(RestartSink.withBackoff(200.millis, 2.seconds, 0) { ()
created.incrementAndGet()
Flow[String].takeWhile(_ != "cancel", inclusive = true)
.to(Sink.foreach(queue.sendNext))
})(Keep.left).run()
probe.sendNext("a")
sinkProbe.requestNext("a")
probe.sendNext("cancel")
sinkProbe.requestNext("cancel")
// Should be backing off now
probe.sendComplete()
// Wait to ensure it isn't restarted
Thread.sleep(300)
created.get() should ===(1)
sinkProbe.cancel()
}
}
"A restart with backoff flow" should {
def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration) = {
val created = new AtomicInteger()
val (flowInSource, flowInProbe) = TestSource.probe[String].toMat(TestSink.probe)(Keep.both).run()
val (flowOutProbe, flowOutSource) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run()
// We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we
// simply use the probes as a message bus for feeding and capturing events.
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0) { ()
created.incrementAndGet()
Flow.fromSinkAndSource(
Flow[String].takeWhile(_ != "cancel").to(Sink.foreach(flowInSource.sendNext).mapMaterializedValue(_.onComplete {
case Success(_) flowInSource.sendNext("in complete")
case Failure(_) flowInSource.sendNext("in error")
})),
flowOutSource.takeWhile(_ != "complete").map {
case "error" throw TE("error")
case other other
}.watchTermination()((_, term)
term.foreach(_ {
flowInSource.sendNext("out complete")
})
)
)
})(Keep.left).toMat(TestSink.probe[String])(Keep.both).run()
(created, source, flowInProbe, flowOutProbe, sink)
}
"run normally" in assertAllStagesStopped {
val created = new AtomicInteger()
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(10.millis, 20.millis, 0) { ()
created.incrementAndGet()
Flow[String]
})(Keep.left).toMat(TestSink.probe[String])(Keep.both).run()
source.sendNext("a")
sink.requestNext("a")
source.sendNext("b")
sink.requestNext("b")
created.get() should ===(1)
source.sendComplete()
}
"restart on cancellation" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis)
source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")
source.sendNext("cancel")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete")
// and it should restart
source.sendNext("c")
flowInProbe.requestNext("c")
flowOutProbe.sendNext("d")
sink.requestNext("d")
created.get() should ===(2)
}
"restart on completion" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis)
source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")
sink.request(1)
flowOutProbe.sendNext("complete")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete")
// and it should restart
source.sendNext("c")
flowInProbe.requestNext("c")
flowOutProbe.sendNext("d")
sink.requestNext("d")
created.get() should ===(2)
}
"restart on failure" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(10.millis, 20.millis)
source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")
sink.request(1)
flowOutProbe.sendNext("error")
// This should complete the in probe
flowInProbe.requestNext("in complete")
// and it should restart
source.sendNext("c")
flowInProbe.requestNext("c")
flowOutProbe.sendNext("d")
sink.requestNext("d")
created.get() should ===(2)
}
"backoff before restart" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(200.millis, 2.seconds)
source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")
source.sendNext("cancel")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain only ("in complete", "out complete")
source.sendNext("c")
flowInProbe.request(1)
val deadline = 100.millis.fromNow
flowInProbe.expectNext(300.millis, "c")
deadline.isOverdue() should be(true)
created.get() should ===(2)
}
"continue running flow out port after in has been sent completion" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds)
source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")
source.sendComplete()
flowInProbe.requestNext("in complete")
flowOutProbe.sendNext("c")
sink.requestNext("c")
flowOutProbe.sendNext("d")
sink.requestNext("d")
sink.request(1)
flowOutProbe.sendComplete()
flowInProbe.requestNext("out complete")
sink.expectComplete()
created.get() should ===(1)
}
"continue running flow in port after out has been cancelled" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(20.millis, 40.seconds)
source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")
sink.cancel()
flowInProbe.requestNext("out complete")
source.sendNext("c")
flowInProbe.requestNext("c")
source.sendNext("d")
flowInProbe.requestNext("d")
source.sendNext("cancel")
flowInProbe.requestNext("in complete")
source.expectCancellation()
created.get() should ===(1)
}
}
}

View file

@ -0,0 +1,126 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.javadsl
import akka.NotUsed
import akka.japi.function.Creator
import akka.stream.KillSwitch
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.duration.FiniteDuration
/**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
* example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
* RestartSource ensures that the graph can continue running while the [[Source]] restarts.
*/
object RestartSource {
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = {
akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { ()
sourceFactory.create().asScala
}.asJava
}
}
/**
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
* example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
* RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
*/
object RestartSink {
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
* graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = {
akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { ()
sinkFactory.create().asScala
}.asJava
}
}
/**
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for
* example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The
* RestartFlow ensures that the graph can continue running while the [[Flow]] restarts.
*/
object RestartFlow {
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
* will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { ()
flowFactory.create().asScala
}.asJava
}
}

View file

@ -184,13 +184,14 @@ object Source {
* Streams the elements of the given future source once it successfully completes.
* If the future fails the stream is failed.
*/
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
/**
* Streams the elements of an asynchronous source once its given `completion` stage completes.
* If the `completion` fails the stream is failed with that exception.
*/
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion))
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] =
new Source(scaladsl.Source.fromSourceCompletionStage(completion))
/**
* Elements are emitted periodically with the specified interval.

View file

@ -0,0 +1,354 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.NotUsed
import akka.pattern.BackoffSupervisor
import akka.stream._
import akka.stream.stage.{ GraphStage, InHandler, OutHandler, TimerGraphStageLogicWithLogging }
import scala.concurrent.duration.FiniteDuration
/**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
* example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
* RestartSource ensures that the graph can continue running while the [[Source]] restarts.
*/
object RestartSource {
/**
* Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
* This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
* after this [[Source]] in the graph.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sourceFactory A factory for producing the [[Source]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () Source[T, _]): Source[T, NotUsed] = {
Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor))
}
}
private final class RestartWithBackoffSource[T](
sourceFactory: () Source[T, _],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
) extends GraphStage[SourceShape[T]] { self
val out = Outlet[T]("RestartWithBackoffSource.out")
override def shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Source", shape, minBackoff, maxBackoff, randomFactor
) {
override protected def logSource = self.getClass
override protected def startGraph() = {
val sinkIn = createSubInlet(out)
sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer)
if (isAvailable(out)) {
sinkIn.pull()
}
}
override protected def backoff() = {
setHandler(out, new OutHandler {
override def onPull() = ()
})
}
backoff()
}
}
/**
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
* example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
* RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
*/
object RestartSink {
/**
* Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
* The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
* happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
* simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
* graph.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
* sent may have been lost.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param sinkFactory A factory for producing the [[Sink]] to wrap.
*/
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () Sink[T, _]): Sink[T, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor))
}
}
private final class RestartWithBackoffSink[T](
sinkFactory: () Sink[T, _],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
) extends GraphStage[SinkShape[T]] { self
val in = Inlet[T]("RestartWithBackoffSink.in")
override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Sink", shape, minBackoff, maxBackoff, randomFactor
) {
override protected def logSource = self.getClass
override protected def startGraph() = {
val sourceOut = createSubOutlet(in)
Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer)
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
}
backoff()
}
}
/**
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for
* example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The
* RestartFlow ensures that the graph can continue running while the [[Flow]] restarts.
*/
object RestartFlow {
/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
* backoff.
*
* This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
* completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
* signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
* will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor))
}
}
private final class RestartWithBackoffFlow[In, Out](
flowFactory: () Flow[In, Out, _],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
) extends GraphStage[FlowShape[In, Out]] { self
val in = Inlet[In]("RestartWithBackoffFlow.in")
val out = Outlet[Out]("RestartWithBackoffFlow.out")
override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Flow", shape, minBackoff, maxBackoff, randomFactor
) {
var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
override protected def logSource = self.getClass
override protected def startGraph() = {
val sourceOut = createSubOutlet(in)
val sinkIn = createSubInlet(out)
Source.fromGraph(sourceOut.source).via(flowFactory()).runWith(sinkIn.sink)(subFusingMaterializer)
if (isAvailable(out)) {
sinkIn.pull()
}
activeOutIn = Some((sourceOut, sinkIn))
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
setHandler(out, new OutHandler {
override def onPull() = ()
})
// We need to ensure that the other end of the sub flow is also completed, so that we don't
// receive any callbacks from it.
activeOutIn.foreach {
case (sourceOut, sinkIn)
if (!sourceOut.isClosed) {
sourceOut.complete()
}
if (!sinkIn.isClosed) {
sinkIn.cancel()
}
activeOutIn = None
}
}
backoff()
}
}
/**
* Shared logic for all restart with backoff logics.
*/
private abstract class RestartWithBackoffLogic[S <: Shape](
name: String,
shape: S,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
) extends TimerGraphStageLogicWithLogging(shape) {
var restartCount = 0
var resetDeadline = minBackoff.fromNow
// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
// don't want to restart the sub inlet when it finishes, we just finish normally.
var finishing = false
protected def startGraph(): Unit
protected def backoff(): Unit
protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = {
val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn")
sinkIn.setHandler(new InHandler {
override def onPush() = push(out, sinkIn.grab())
override def onUpstreamFinish() = {
if (finishing) {
complete(out)
} else {
log.debug("Graph out finished")
onCompleteOrFailure()
}
}
override def onUpstreamFailure(ex: Throwable) = {
if (finishing) {
fail(out, ex)
} else {
log.error(ex, "Restarting graph due to failure")
onCompleteOrFailure()
}
}
})
setHandler(out, new OutHandler {
override def onPull() = sinkIn.pull()
override def onDownstreamFinish() = {
finishing = true
sinkIn.cancel()
}
})
sinkIn
}
protected final def createSubOutlet[T](in: Inlet[T]): SubSourceOutlet[T] = {
val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut")
sourceOut.setHandler(new OutHandler {
override def onPull() = if (isAvailable(in)) {
sourceOut.push(grab(in))
} else {
pull(in)
}
override def onDownstreamFinish() = {
if (finishing) {
cancel(in)
} else {
log.debug("Graph in finished")
onCompleteOrFailure()
}
}
})
setHandler(in, new InHandler {
override def onPush() = if (sourceOut.isAvailable) {
sourceOut.push(grab(in))
}
override def onUpstreamFinish() = {
finishing = true
sourceOut.complete()
}
override def onUpstreamFailure(ex: Throwable) = {
finishing = true
sourceOut.fail(ex)
}
})
sourceOut
}
// Set a timer to restart after the calculated delay
protected final def onCompleteOrFailure() = {
// Check if the last start attempt was more than the minimum backoff
if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)
restartCount = 0
}
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
log.debug("Restarting graph in {}", restartDelay)
scheduleOnce("RestartTimer", restartDelay)
restartCount += 1
// And while we wait, we go into backoff mode
backoff()
}
// Invoked when the backoff timer ticks
override protected def onTimer(timerKey: Any) = {
startGraph()
resetDeadline = minBackoff.fromNow
}
// When the stage starts, start the source
override def preStart() = startGraph()
}

View file

@ -275,7 +275,7 @@ object Source {
* Streams the elements of an asynchronous source once its given `completion` stage completes.
* If the `completion` fails the stream is failed with that exception.
*/
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
/**
* Elements are emitted periodically with the specified interval.