Doc example of Streams watch operator, #25468 (#28752)

This commit is contained in:
Patrik Nordwall 2020-03-26 18:00:58 +01:00 committed by GitHub
parent 5605f04cb7
commit 37d87811b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 1 deletions

View file

@ -18,6 +18,18 @@ Watch a specific `ActorRef` and signal a failure downstream once the actor termi
The signaled failure will be an @java[@javadoc:[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)] The signaled failure will be an @java[@javadoc:[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)]
@scala[@scaladoc[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)]. @scala[@scaladoc[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)].
## Example
An `ActorRef` can be can be watched and the stream will fail with `WatchedActorTerminatedException` when the
actor terminates.
Scala
: @@snip [Watch.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Watch.scala) { #watch }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #watch }
## Reactive Streams semantics ## Reactive Streams semantics
@@@div { .callout } @@@div { .callout }

View file

@ -4,6 +4,7 @@
package jdocs.stream.operators; package jdocs.stream.operators;
import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.japi.pf.PFBuilder; import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow; import akka.stream.javadsl.Flow;
@ -41,7 +42,6 @@ import akka.stream.Attributes;
// #log // #log
import java.time.Duration; import java.time.Duration;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -426,4 +426,18 @@ class SourceOrFlow {
// -1 // -1
// #dropWhile // #dropWhile
} }
void watchExample() {
// #watch
final ActorRef ref = someActor();
Flow<String, String, NotUsed> flow =
Flow.of(String.class)
.watch(ref)
.recover(akka.stream.WatchedActorTerminatedException.class, () -> ref + " terminated");
// #watch
}
private ActorRef someActor() {
return null;
}
} }

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.WatchedActorTerminatedException
import akka.stream.scaladsl.Flow
object Watch {
def someActor(): ActorRef = ???
def watchExample(): Unit = {
//#watch
val ref: ActorRef = someActor()
val flow: Flow[String, String, NotUsed] =
Flow[String].watch(ref).recover {
case _: WatchedActorTerminatedException => s"$ref terminated"
}
//#watch
}
}