diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watch.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watch.md index e43ef076d3..6004b5d23b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watch.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/watch.md @@ -18,6 +18,18 @@ Watch a specific `ActorRef` and signal a failure downstream once the actor termi The signaled failure will be an @java[@javadoc:[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)] @scala[@scaladoc[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)]. +## Example + +An `ActorRef` can be can be watched and the stream will fail with `WatchedActorTerminatedException` when the +actor terminates. + +Scala +: @@snip [Watch.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Watch.scala) { #watch } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #watch } + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index df3711f469..91e5d605d1 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -4,6 +4,7 @@ package jdocs.stream.operators; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.japi.pf.PFBuilder; import akka.stream.javadsl.Flow; @@ -41,7 +42,6 @@ import akka.stream.Attributes; // #log import java.time.Duration; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.concurrent.CompletableFuture; @@ -426,4 +426,18 @@ class SourceOrFlow { // -1 // #dropWhile } + + void watchExample() { + // #watch + final ActorRef ref = someActor(); + Flow flow = + Flow.of(String.class) + .watch(ref) + .recover(akka.stream.WatchedActorTerminatedException.class, () -> ref + " terminated"); + // #watch + } + + private ActorRef someActor() { + return null; + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Watch.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Watch.scala new file mode 100644 index 0000000000..0779df5509 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Watch.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorRef +import akka.stream.WatchedActorTerminatedException +import akka.stream.scaladsl.Flow + +object Watch { + + def someActor(): ActorRef = ??? + + def watchExample(): Unit = { + //#watch + val ref: ActorRef = someActor() + val flow: Flow[String, String, NotUsed] = + Flow[String].watch(ref).recover { + case _: WatchedActorTerminatedException => s"$ref terminated" + } + //#watch + } + +}