diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watchTermination.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watchTermination.md index ec863081f4..1ce0835b40 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watchTermination.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watchTermination.md @@ -15,6 +15,16 @@ Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be complet Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed. The operators otherwise passes through elements unchanged. +## Examples + +Scala +: @@snip [WatchTermination.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/WatchTermination.scala) { #watchTermination } + +Java +: @@snip [WatchTermination.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #watchTermination } + +You can also use the lambda function expected by `watchTermination` to map the materialized value of the stream. Additionally, the completion of the @scala[`Future`]@java[`CompletionStage`] provided as a second parameter of the lambda can be used to perform cleanup operations of the resources used by the stream itself. + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 5a03a5adab..60d0235ae8 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -23,9 +23,11 @@ import akka.japi.function.Function2; // #interleave // #merge // #merge-sorted +import akka.stream.javadsl.Keep; import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; -import java.util.Arrays; + +import java.util.*; // #merge-sorted // #merge @@ -44,10 +46,9 @@ import akka.stream.Attributes; // #log import java.time.Duration; -import java.util.Collections; -import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.IntSupplier; class SourceOrFlow { private static ActorSystem system = null; @@ -489,6 +490,60 @@ class SourceOrFlow { // #groupBy } + void watchTerminationExample() { + // #watchTermination + Source.range(1, 5) + .watchTermination( + (prevMatValue, completionStage) -> { + completionStage.whenComplete( + (done, exc) -> { + if (done != null) + System.out.println("The stream materialized " + prevMatValue.toString()); + else System.out.println(exc.getMessage()); + }); + return prevMatValue; + }) + .runForeach(System.out::println, system); + + /* + Prints: + 1 + 2 + 3 + 4 + 5 + The stream materialized NotUsed + */ + + Source.range(1, 5) + .watchTermination( + (prevMatValue, completionStage) -> { + // this function will be run when the stream terminates + // the CompletionStage provided as a second parameter indicates whether + // the stream completed successfully or failed + completionStage.whenComplete( + (done, exc) -> { + if (done != null) + System.out.println("The stream materialized " + prevMatValue.toString()); + else System.out.println(exc.getMessage()); + }); + return prevMatValue; + }) + .runForeach( + element -> { + if (element == 3) throw new Exception("Boom"); + else System.out.println(element); + }, + system); + /* + Prints: + 1 + 2 + Boom + */ + // #watchTermination + } + static CompletionStage completionTimeoutExample() { // #completionTimeout Source source = Source.range(1, 100000).map(number -> number * number); diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/WatchTermination.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/WatchTermination.scala new file mode 100644 index 0000000000..ea36fee803 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/WatchTermination.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +import scala.concurrent.ExecutionContext +import scala.util.{ Failure, Success } + +object WatchTermination { + + def watchTerminationExample(): Unit = { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContext = ??? + + //#watchTermination + Source(1 to 5) + .watchTermination()( + (prevMatValue, future) => + // this function will be run when the stream terminates + // the Future provided as a second parameter indicates whether the stream completed successfully or failed + future.onComplete { + case Failure(exception) => println(exception.getMessage) + case Success(_) => println(s"The stream materialized $prevMatValue") + }) + .runForeach(println) + /* + Prints: + 1 + 2 + 3 + 4 + 5 + The stream materialized NotUsed + */ + + Source(1 to 5) + .watchTermination()((prevMatValue, future) => + future.onComplete { + case Failure(exception) => println(exception.getMessage) + case Success(_) => println(s"The stream materialized $prevMatValue") + }) + .runForeach(e => if (e == 3) throw new Exception("Boom") else println(e)) + /* + Prints: + 1 + 2 + Boom + */ + //#watchTermination + } +}