From 432d94d5a45c44923516f6309571e6890a976e01 Mon Sep 17 00:00:00 2001 From: Muskan Gupta Date: Tue, 20 Oct 2020 17:13:26 +0530 Subject: [PATCH] Adding example of cancelled operator (#29749) Co-authored-by: Arnout Engelen Co-authored-by: Renato Cavalcanti --- .../stream/operators/Sink/cancelled.md | 12 ++++++++-- .../stream/operators/SinkDocExamples.java | 8 +++++++ .../stream/operators/sink/Cancelled.scala | 22 +++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sink/Cancelled.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/cancelled.md b/akka-docs/src/main/paradox/stream/operators/Sink/cancelled.md index 900fa47e8c..c3efa374a0 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/cancelled.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/cancelled.md @@ -13,6 +13,16 @@ Immediately cancel the stream Immediately cancel the stream +## Example + +In this example, we have a source that generates numbers from 1 to 5 but as we have used cancelled we get `NotUsed` as materialized value and stream cancels. + +Scala +: @@snip [Cancelled.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/Cancelled.scala) { #cancelled } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #cancelled } + ## Reactive Streams semantics @@@div { .callout } @@ -20,5 +30,3 @@ Immediately cancel the stream **cancels** immediately @@@ - - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java index 1d6f77bc93..2277fd976c 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -114,6 +114,14 @@ public class SinkDocExamples { // #fold } + static NotUsed cancelledExample() { + // #cancelled + Source source = Source.range(1, 5); + NotUsed sum = source.runWith(Sink.cancelled(), system); + return sum; + // #cancelled + } + static void headOptionExample() { // #headoption Source source = Source.empty(); diff --git a/akka-docs/src/test/scala/docs/stream/operators/sink/Cancelled.scala b/akka-docs/src/test/scala/docs/stream/operators/sink/Cancelled.scala new file mode 100644 index 0000000000..e2b7a64c15 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sink/Cancelled.scala @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sink + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.ExecutionContextExecutor + +object Cancelled { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def cancelledExample(): NotUsed = { + //#cancelled + val source = Source(1 to 5) + source.runWith(Sink.cancelled) + //#cancelled + } +}