Operators: Add example for watchTermination (#29888)

This commit is contained in:
Matteo Di Pirro 2020-12-15 13:47:07 +01:00 committed by GitHub
parent 8c0ffe3b2a
commit 01e4b4de27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 3 deletions

View file

@ -15,6 +15,16 @@ Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be complet
Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed.
The operators otherwise passes through elements unchanged.
## Examples
Scala
: @@snip [WatchTermination.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/WatchTermination.scala) { #watchTermination }
Java
: @@snip [WatchTermination.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #watchTermination }
You can also use the lambda function expected by `watchTermination` to map the materialized value of the stream. Additionally, the completion of the @scala[`Future`]@java[`CompletionStage`] provided as a second parameter of the lambda can be used to perform cleanup operations of the resources used by the stream itself.
## Reactive Streams semantics
@@@div { .callout }

View file

@ -23,9 +23,11 @@ import akka.japi.function.Function2;
// #interleave
// #merge
// #merge-sorted
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import java.util.Arrays;
import java.util.*;
// #merge-sorted
// #merge
@ -44,10 +46,9 @@ import akka.stream.Attributes;
// #log
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.IntSupplier;
class SourceOrFlow {
private static ActorSystem system = null;
@ -489,6 +490,60 @@ class SourceOrFlow {
// #groupBy
}
void watchTerminationExample() {
// #watchTermination
Source.range(1, 5)
.watchTermination(
(prevMatValue, completionStage) -> {
completionStage.whenComplete(
(done, exc) -> {
if (done != null)
System.out.println("The stream materialized " + prevMatValue.toString());
else System.out.println(exc.getMessage());
});
return prevMatValue;
})
.runForeach(System.out::println, system);
/*
Prints:
1
2
3
4
5
The stream materialized NotUsed
*/
Source.range(1, 5)
.watchTermination(
(prevMatValue, completionStage) -> {
// this function will be run when the stream terminates
// the CompletionStage provided as a second parameter indicates whether
// the stream completed successfully or failed
completionStage.whenComplete(
(done, exc) -> {
if (done != null)
System.out.println("The stream materialized " + prevMatValue.toString());
else System.out.println(exc.getMessage());
});
return prevMatValue;
})
.runForeach(
element -> {
if (element == 3) throw new Exception("Boom");
else System.out.println(element);
},
system);
/*
Prints:
1
2
Boom
*/
// #watchTermination
}
static CompletionStage<Done> completionTimeoutExample() {
// #completionTimeout
Source<Integer, NotUsed> source = Source.range(1, 100000).map(number -> number * number);

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }
object WatchTermination {
def watchTerminationExample(): Unit = {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContext = ???
//#watchTermination
Source(1 to 5)
.watchTermination()(
(prevMatValue, future) =>
// this function will be run when the stream terminates
// the Future provided as a second parameter indicates whether the stream completed successfully or failed
future.onComplete {
case Failure(exception) => println(exception.getMessage)
case Success(_) => println(s"The stream materialized $prevMatValue")
})
.runForeach(println)
/*
Prints:
1
2
3
4
5
The stream materialized NotUsed
*/
Source(1 to 5)
.watchTermination()((prevMatValue, future) =>
future.onComplete {
case Failure(exception) => println(exception.getMessage)
case Success(_) => println(s"The stream materialized $prevMatValue")
})
.runForeach(e => if (e == 3) throw new Exception("Boom") else println(e))
/*
Prints:
1
2
Boom
*/
//#watchTermination
}
}