diff --git a/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java b/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java new file mode 100644 index 0000000000..e70e7e48d3 --- /dev/null +++ b/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java @@ -0,0 +1,140 @@ +package docs.stream; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.stream.*; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.JavaTestKit; +import docs.AbstractJavaTest; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +class KillSwitchDocTest extends AbstractJavaTest { + + static ActorSystem system; + static Materializer mat; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("GraphDSLDocTest"); + mat = ActorMaterializer.create(system); + } + + @AfterClass + public static void tearDown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + mat = null; + } + + @Test + public void compileOnlyTest() { + } + + public void uniqueKillSwitchShutdownExample() throws Exception { + //#unique-shutdown + final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Sink> lastSnk = Sink.last(); + + final Pair> stream = countingSrc + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(lastSnk, Keep.both()).run(mat); + + final UniqueKillSwitch killSwitch = stream.first(); + final CompletionStage completionStage = stream.second(); + + doSomethingElse(); + killSwitch.shutdown(); + + final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertEquals(2, finalCount); + //#unique-shutdown + } + + public static void uniqueKillSwitchAbortExample() throws Exception { + //#unique-abort + final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Sink> lastSnk = Sink.last(); + + final Pair> stream = countingSrc + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(lastSnk, Keep.both()).run(mat); + + final UniqueKillSwitch killSwitch = stream.first(); + final CompletionStage completionStage = stream.second(); + + final Exception error = new Exception("boom!"); + killSwitch.abort(error); + + final int result = completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + assertEquals(-1, result); + //#unique-abort + } + + public void sharedKillSwitchShutdownExample() throws Exception { + //#shared-shutdown + final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Sink> lastSnk = Sink.last(); + final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); + + final CompletionStage completionStage = countingSrc + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); + final CompletionStage completionStageDelayed = countingSrc + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()) + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); + + doSomethingElse(); + killSwitch.shutdown(); + + final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + final int finalCountDelayed = completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertEquals(2, finalCount); + assertEquals(1, finalCountDelayed); + //#shared-shutdown + } + + public static void sharedKillSwitchAbortExample() throws Exception { + //#shared-abort + final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Sink> lastSnk = Sink.last(); + final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); + + final CompletionStage completionStage1 = countingSrc + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); + final CompletionStage completionStage2 = countingSrc + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); + + final Exception error = new Exception("boom!"); + killSwitch.abort(error); + + final int result1 = completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + final int result2 = completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + assertEquals(-1, result1); + assertEquals(-1, result2); + //#shared-abort + } + + private static void doSomethingElse(){ + } +} diff --git a/akka-docs/rst/java/stream/index.rst b/akka-docs/rst/java/stream/index.rst index e1deaf2748..3bc5c32c61 100644 --- a/akka-docs/rst/java/stream/index.rst +++ b/akka-docs/rst/java/stream/index.rst @@ -13,6 +13,7 @@ Streams stream-graphs stream-composition stream-rate + stream-dynamic stream-customize stream-integrations stream-error diff --git a/akka-docs/rst/java/stream/stream-dynamic.rst b/akka-docs/rst/java/stream/stream-dynamic.rst new file mode 100644 index 0000000000..77a0e0a436 --- /dev/null +++ b/akka-docs/rst/java/stream/stream-dynamic.rst @@ -0,0 +1,63 @@ +.. _stream-dynamic-scala: + +####################### +Dynamic stream handling +####################### + +.. _kill-switch-scala: + +Controlling graph completion with KillSwitch +-------------------------------------------- + +A ``KillSwitch`` allows the completion of graphs of ``FlowShape`` from the outside. It consists of a flow element that +can be linked to a graph of ``FlowShape`` needing completion control. +The ``KillSwitch`` trait allows to complete or fail the graph(s). + +.. includecode:: ../../../../akka-stream/src/main/scala/akka/stream/KillSwitch.scala + :include: kill-switch + +After the first call to either ``shutdown`` and ``abort``, all subsequent calls to any of these methods will be ignored. +Graph completion is performed by both + +* completing its downstream +* cancelling (in case of ``shutdown``) or failing (in case of ``abort``) its upstream. + +A ``KillSwitch`` can control the completion of one or multiple streams, and therefore comes in two different flavours. + +.. _unique-kill-switch-scala: + +UniqueKillSwitch +^^^^^^^^^^^^^^^^ + +``UniqueKillSwitch`` allows to control the completion of **one** materialized ``Graph`` of ``FlowShape``. Refer to the +below for usage examples. + +* **Shutdown** + +.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#unique-shutdown + +* **Abort** + +.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#unique-abort + +.. _shared-kill-switch-scala: + +SharedKillSwitch +^^^^^^^^^^^^^^^^ + +A ``SharedKillSwitch`` allows to control the completion of an arbitrary number graphs of ``FlowShape``. It can be +materialized multiple times via its ``flow`` method, and all materialized graphs linked to it are controlled by the switch. +Refer to the below for usage examples. + +* **Shutdown** + +.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#shared-shutdown + +* **Abort** + +.. includecode:: ../code/docs/stream/KillSwitchDocTest.java#shared-abort + +.. note:: + A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed + before any materialization takes place. + diff --git a/akka-docs/rst/scala/code/docs/stream/KillSwitchDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/KillSwitchDocSpec.scala new file mode 100644 index 0000000000..b7a8b04201 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/stream/KillSwitchDocSpec.scala @@ -0,0 +1,108 @@ +package docs.stream + +import akka.stream.scaladsl._ +import akka.stream.{ ActorMaterializer, DelayOverflowStrategy, KillSwitches } +import akka.testkit.AkkaSpec +import docs.CompileOnlySpec + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class KillSwitchDocSpec extends AkkaSpec with CompileOnlySpec { + + implicit val materializer = ActorMaterializer() + + "Unique kill switch" must { + + "control graph completion with shutdown" in compileOnlySpec { + + // format: OFF + //#unique-shutdown + val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) + val lastSnk = Sink.last[Int] + + val (killSwitch, last) = countingSrc + .viaMat(KillSwitches.single)(Keep.right) + .toMat(lastSnk)(Keep.both) + .run() + + doSomethingElse() + + killSwitch.shutdown() + + Await.result(last, 1.second) shouldBe 2 + //#unique-shutdown + // format: ON + } + + "control graph completion with abort" in compileOnlySpec { + + // format: OFF + //#unique-abort + val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) + val lastSnk = Sink.last[Int] + + val (killSwitch, last) = countingSrc + .viaMat(KillSwitches.single)(Keep.right) + .toMat(lastSnk)(Keep.both).run() + + val error = new RuntimeException("boom!") + killSwitch.abort(error) + + Await.result(last.failed, 1.second) shouldBe error + //#unique-abort + // format: ON + } + } + + "Shared kill switch" must { + + "control graph completion with shutdown" in compileOnlySpec { + // format: OFF + //#shared-shutdown + val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) + val lastSnk = Sink.last[Int] + val sharedKillSwitch = KillSwitches.shared("my-kill-switch") + + val last = countingSrc + .via(sharedKillSwitch.flow) + .runWith(lastSnk) + + val delayedLast = countingSrc + .delay(1.second, DelayOverflowStrategy.backpressure) + .via(sharedKillSwitch.flow) + .runWith(lastSnk) + + doSomethingElse() + + sharedKillSwitch.shutdown() + + Await.result(last, 1.second) shouldBe 2 + Await.result(delayedLast, 1.second) shouldBe 1 + //#shared-shutdown + // format: ON + } + + "control graph completion with abort" in compileOnlySpec { + + // format: OFF + //#shared-abort + val countingSrc = Source(Stream.from(1)).delay(1.second) + val lastSnk = Sink.last[Int] + val sharedKillSwitch = KillSwitches.shared("my-kill-switch") + + val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk) + val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk) + + val error = new RuntimeException("boom!") + sharedKillSwitch.abort(error) + + Await.result(last1.failed, 1.second) shouldBe error + Await.result(last2.failed, 1.second) shouldBe error + //#shared-abort + // format: ON + } + } + + private def doSomethingElse() = ??? +} diff --git a/akka-docs/rst/scala/stream/index.rst b/akka-docs/rst/scala/stream/index.rst index 485f4d00a3..a337529ac0 100644 --- a/akka-docs/rst/scala/stream/index.rst +++ b/akka-docs/rst/scala/stream/index.rst @@ -13,6 +13,7 @@ Streams stream-graphs stream-composition stream-rate + stream-dynamic stream-customize stream-integrations stream-error diff --git a/akka-docs/rst/scala/stream/stream-dynamic.rst b/akka-docs/rst/scala/stream/stream-dynamic.rst new file mode 100644 index 0000000000..8716f934a6 --- /dev/null +++ b/akka-docs/rst/scala/stream/stream-dynamic.rst @@ -0,0 +1,63 @@ +.. _stream-dynamic-scala: + +####################### +Dynamic stream handling +####################### + +.. _kill-switch-scala: + +Controlling graph completion with KillSwitch +-------------------------------------------- + +A ``KillSwitch`` allows the completion of graphs of ``FlowShape`` from the outside. It consists of a flow element that +can be linked to a graph of ``FlowShape`` needing completion control. +The ``KillSwitch`` trait allows to complete or fail the graph(s). + +.. includecode:: ../../../../akka-stream/src/main/scala/akka/stream/KillSwitch.scala + :include: kill-switch + +After the first call to either ``shutdown`` and ``abort``, all subsequent calls to any of these methods will be ignored. +Graph completion is performed by both + +* completing its downstream +* cancelling (in case of ``shutdown``) or failing (in case of ``abort``) its upstream. + +A ``KillSwitch`` can control the completion of one or multiple streams, and therefore comes in two different flavours. + +.. _unique-kill-switch-scala: + +UniqueKillSwitch +^^^^^^^^^^^^^^^^ + +``UniqueKillSwitch`` allows to control the completion of **one** materialized ``Graph`` of ``FlowShape``. Refer to the +below for usage examples. + +* **Shutdown** + +.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#unique-shutdown + +* **Abort** + +.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#unique-abort + +.. _shared-kill-switch-scala: + +SharedKillSwitch +^^^^^^^^^^^^^^^^ + +A ``SharedKillSwitch`` allows to control the completion of an arbitrary number graphs of ``FlowShape``. It can be +materialized multiple times via its ``flow`` method, and all materialized graphs linked to it are controlled by the switch. +Refer to the below for usage examples. + +* **Shutdown** + +.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#shared-shutdown + +* **Abort** + +.. includecode:: ../code/docs/stream/KillSwitchDocSpec.scala#shared-abort + +.. note:: + A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed + before any materialization takes place. + diff --git a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala index c0bb6800ab..0aaa7de794 100644 --- a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala +++ b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala @@ -132,6 +132,7 @@ object KillSwitches { * multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of * this interface. */ +//#kill-switch trait KillSwitch { /** * After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally. @@ -142,6 +143,7 @@ trait KillSwitch { */ def abort(ex: Throwable): Unit } +//#kill-switch /** * A [[UniqueKillSwitch]] is always a result of a materialization (unlike [[SharedKillSwitch]] which is constructed @@ -182,7 +184,7 @@ final class UniqueKillSwitch private[stream] (private val promise: Promise[Done] /** * A [[SharedKillSwitch]] is a provider for [[Graph]]s of [[FlowShape]] that can be completed or failed from the outside. * A [[Graph]] returned by the switch can be materialized arbitrary amount of times: every newly materialized [[Graph]] - * belongs to the switch from which it was aquired. Multiple [[SharedKillSwitch]] instances are isolated from each other, + * belongs to the switch from which it was acquired. Multiple [[SharedKillSwitch]] instances are isolated from each other, * shutting down or aborting on instance does not affect the [[Graph]]s provided by another instance. * * After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the @@ -226,7 +228,7 @@ final class SharedKillSwitch private[stream] (val name: String) extends KillSwit def abort(reason: Throwable): Unit = shutdownPromise.tryFailure(reason) /** - * Retrurns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking + * Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking * [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this * switch will be stopped normally or failed. *