Adding docs for KillSwitch #20265

This commit is contained in:
Stefano Bonetti 2016-06-03 13:23:11 +01:00 committed by Johan Andrén
parent 896ea53dd3
commit b84c6c5271
7 changed files with 380 additions and 2 deletions

View file

@ -0,0 +1,140 @@
package docs.stream;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
class KillSwitchDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("GraphDSLDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
@Test
public void compileOnlyTest() {
}
public void uniqueKillSwitchShutdownExample() throws Exception {
//#unique-shutdown
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
.viaMat(KillSwitches.single(), Keep.right())
.toMat(lastSnk, Keep.both()).run(mat);
final UniqueKillSwitch killSwitch = stream.first();
final CompletionStage<Integer> completionStage = stream.second();
doSomethingElse();
killSwitch.shutdown();
final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(2, finalCount);
//#unique-shutdown
}
public static void uniqueKillSwitchAbortExample() throws Exception {
//#unique-abort
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
.viaMat(KillSwitches.single(), Keep.right())
.toMat(lastSnk, Keep.both()).run(mat);
final UniqueKillSwitch killSwitch = stream.first();
final CompletionStage<Integer> completionStage = stream.second();
final Exception error = new Exception("boom!");
killSwitch.abort(error);
final int result = completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
assertEquals(-1, result);
//#unique-abort
}
public void sharedKillSwitchShutdownExample() throws Exception {
//#shared-shutdown
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
final CompletionStage<Integer> completionStage = countingSrc
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
final CompletionStage<Integer> completionStageDelayed = countingSrc
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
doSomethingElse();
killSwitch.shutdown();
final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
final int finalCountDelayed = completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(2, finalCount);
assertEquals(1, finalCountDelayed);
//#shared-shutdown
}
public static void sharedKillSwitchAbortExample() throws Exception {
//#shared-abort
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
final CompletionStage<Integer> completionStage1 = countingSrc
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
final CompletionStage<Integer> completionStage2 = countingSrc
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
final Exception error = new Exception("boom!");
killSwitch.abort(error);
final int result1 = completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
final int result2 = completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
assertEquals(-1, result1);
assertEquals(-1, result2);
//#shared-abort
}
private static void doSomethingElse(){
}
}

View file

@ -13,6 +13,7 @@ Streams
stream-graphs
stream-composition
stream-rate
stream-dynamic
stream-customize
stream-integrations
stream-error

View file

@ -0,0 +1,63 @@
.. _stream-dynamic-scala:
#######################
Dynamic stream handling
#######################
.. _kill-switch-scala:
Controlling graph completion with KillSwitch
--------------------------------------------
A ``KillSwitch`` allows the completion of graphs of ``FlowShape`` from the outside. It consists of a flow element that
can be linked to a graph of ``FlowShape`` needing completion control.
The ``KillSwitch`` trait allows to complete or fail the graph(s).
.. includecode:: ../../../../akka-stream/src/main/scala/akka/stream/KillSwitch.scala
:include: kill-switch
After the first call to either ``shutdown`` and ``abort``, all subsequent calls to any of these methods will be ignored.
Graph completion is performed by both
* completing its downstream
* cancelling (in case of ``shutdown``) or failing (in case of ``abort``) its upstream.
A ``KillSwitch`` can control the completion of one or multiple streams, and therefore comes in two different flavours.
.. _unique-kill-switch-scala:
UniqueKillSwitch
^^^^^^^^^^^^^^^^
``UniqueKillSwitch`` allows to control the completion of **one** materialized ``Graph`` of ``FlowShape``. Refer to the
below for usage examples.
* **Shutdown**
.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#unique-shutdown
* **Abort**
.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#unique-abort
.. _shared-kill-switch-scala:
SharedKillSwitch
^^^^^^^^^^^^^^^^
A ``SharedKillSwitch`` allows to control the completion of an arbitrary number graphs of ``FlowShape``. It can be
materialized multiple times via its ``flow`` method, and all materialized graphs linked to it are controlled by the switch.
Refer to the below for usage examples.
* **Shutdown**
.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#shared-shutdown
* **Abort**
.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#shared-abort
.. note::
A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed
before any materialization takes place.

View file

@ -0,0 +1,108 @@
package docs.stream
import akka.stream.scaladsl._
import akka.stream.{ ActorMaterializer, DelayOverflowStrategy, KillSwitches }
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
import scala.concurrent.Await
import scala.concurrent.duration._
class KillSwitchDocSpec extends AkkaSpec with CompileOnlySpec {
implicit val materializer = ActorMaterializer()
"Unique kill switch" must {
"control graph completion with shutdown" in compileOnlySpec {
// format: OFF
//#unique-shutdown
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
//#unique-shutdown
// format: ON
}
"control graph completion with abort" in compileOnlySpec {
// format: OFF
//#unique-abort
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both).run()
val error = new RuntimeException("boom!")
killSwitch.abort(error)
Await.result(last.failed, 1.second) shouldBe error
//#unique-abort
// format: ON
}
}
"Shared kill switch" must {
"control graph completion with shutdown" in compileOnlySpec {
// format: OFF
//#shared-shutdown
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last = countingSrc
.via(sharedKillSwitch.flow)
.runWith(lastSnk)
val delayedLast = countingSrc
.delay(1.second, DelayOverflowStrategy.backpressure)
.via(sharedKillSwitch.flow)
.runWith(lastSnk)
doSomethingElse()
sharedKillSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
Await.result(delayedLast, 1.second) shouldBe 1
//#shared-shutdown
// format: ON
}
"control graph completion with abort" in compileOnlySpec {
// format: OFF
//#shared-abort
val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val error = new RuntimeException("boom!")
sharedKillSwitch.abort(error)
Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error
//#shared-abort
// format: ON
}
}
private def doSomethingElse() = ???
}

View file

@ -13,6 +13,7 @@ Streams
stream-graphs
stream-composition
stream-rate
stream-dynamic
stream-customize
stream-integrations
stream-error

View file

@ -0,0 +1,63 @@
.. _stream-dynamic-scala:
#######################
Dynamic stream handling
#######################
.. _kill-switch-scala:
Controlling graph completion with KillSwitch
--------------------------------------------
A ``KillSwitch`` allows the completion of graphs of ``FlowShape`` from the outside. It consists of a flow element that
can be linked to a graph of ``FlowShape`` needing completion control.
The ``KillSwitch`` trait allows to complete or fail the graph(s).
.. includecode:: ../../../../akka-stream/src/main/scala/akka/stream/KillSwitch.scala
:include: kill-switch
After the first call to either ``shutdown`` and ``abort``, all subsequent calls to any of these methods will be ignored.
Graph completion is performed by both
* completing its downstream
* cancelling (in case of ``shutdown``) or failing (in case of ``abort``) its upstream.
A ``KillSwitch`` can control the completion of one or multiple streams, and therefore comes in two different flavours.
.. _unique-kill-switch-scala:
UniqueKillSwitch
^^^^^^^^^^^^^^^^
``UniqueKillSwitch`` allows to control the completion of **one** materialized ``Graph`` of ``FlowShape``. Refer to the
below for usage examples.
* **Shutdown**
.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#unique-shutdown
* **Abort**
.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#unique-abort
.. _shared-kill-switch-scala:
SharedKillSwitch
^^^^^^^^^^^^^^^^
A ``SharedKillSwitch`` allows to control the completion of an arbitrary number graphs of ``FlowShape``. It can be
materialized multiple times via its ``flow`` method, and all materialized graphs linked to it are controlled by the switch.
Refer to the below for usage examples.
* **Shutdown**
.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#shared-shutdown
* **Abort**
.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#shared-abort
.. note::
A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed
before any materialization takes place.

View file

@ -132,6 +132,7 @@ object KillSwitches {
* multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of
* this interface.
*/
//#kill-switch
trait KillSwitch {
/**
* After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
@ -142,6 +143,7 @@ trait KillSwitch {
*/
def abort(ex: Throwable): Unit
}
//#kill-switch
/**
* A [[UniqueKillSwitch]] is always a result of a materialization (unlike [[SharedKillSwitch]] which is constructed
@ -182,7 +184,7 @@ final class UniqueKillSwitch private[stream] (private val promise: Promise[Done]
/**
* A [[SharedKillSwitch]] is a provider for [[Graph]]s of [[FlowShape]] that can be completed or failed from the outside.
* A [[Graph]] returned by the switch can be materialized arbitrary amount of times: every newly materialized [[Graph]]
* belongs to the switch from which it was aquired. Multiple [[SharedKillSwitch]] instances are isolated from each other,
* belongs to the switch from which it was acquired. Multiple [[SharedKillSwitch]] instances are isolated from each other,
* shutting down or aborting on instance does not affect the [[Graph]]s provided by another instance.
*
* After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the
@ -226,7 +228,7 @@ final class SharedKillSwitch private[stream] (val name: String) extends KillSwit
def abort(reason: Throwable): Unit = shutdownPromise.tryFailure(reason)
/**
* Retrurns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
* Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
* [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this
* switch will be stopped normally or failed.
*