Merge pull request #29356 from ignasi35/monitor-operator-sample-code
This commit is contained in:
commit
68fa87c58c
3 changed files with 152 additions and 5 deletions
|
|
@ -6,15 +6,43 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" }
|
@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" }
|
||||||
@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" }
|
@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" }
|
||||||
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise
|
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stream. Elements
|
||||||
passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an
|
pass through unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an
|
||||||
event, and may therefore affect performance.
|
event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek
|
||||||
|
and get information about the stream.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
The example below uses the `monitorMat` variant of `monitor`. The only difference between the two operators is
|
||||||
|
that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample
|
||||||
|
below be `Keep.right` so only the `FlowMonitor[Int]` is returned.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [Monitor.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala) { #monitor }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [Monitor.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java) { #monitor }
|
||||||
|
|
||||||
|
When run, the sample code will produce something similar to:
|
||||||
|
|
||||||
|
```
|
||||||
|
Stream is initialized but hasn't processed any element
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
Last element processed: 2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
Stream completed already
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
## Reactive Streams semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.stream.operators.sourceorflow;
|
||||||
|
|
||||||
|
import akka.Done;
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.japi.Pair;
|
||||||
|
import akka.stream.FlowMonitor;
|
||||||
|
import akka.stream.FlowMonitorState;
|
||||||
|
import akka.stream.javadsl.Keep;
|
||||||
|
import akka.stream.javadsl.Sink;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
/** */
|
||||||
|
public class Monitor {
|
||||||
|
|
||||||
|
// #monitor
|
||||||
|
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
|
||||||
|
if (state == FlowMonitorState.finished()) {
|
||||||
|
System.out.println("Stream is initialized but hasn't processed any element");
|
||||||
|
} else if (state instanceof FlowMonitorState.Received) {
|
||||||
|
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
|
||||||
|
System.out.println("Last message received: " + msg.msg());
|
||||||
|
} else if (state instanceof FlowMonitorState.Failed) {
|
||||||
|
Throwable cause = ((FlowMonitorState.Failed) state).cause();
|
||||||
|
System.out.println("Stream failed with cause: " + cause.getMessage());
|
||||||
|
} else {
|
||||||
|
System.out.println("Stream completed already");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// #monitor
|
||||||
|
|
||||||
|
public static void main(String[] args)
|
||||||
|
throws InterruptedException, TimeoutException, ExecutionException {
|
||||||
|
ActorSystem actorSystem = ActorSystem.create("25fps-stream");
|
||||||
|
|
||||||
|
// #monitor
|
||||||
|
Source<Integer, FlowMonitor<Integer>> monitoredSource =
|
||||||
|
Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
|
||||||
|
.throttle(5, Duration.ofSeconds(1))
|
||||||
|
.monitorMat(Keep.right());
|
||||||
|
|
||||||
|
Pair<FlowMonitor<Integer>, CompletionStage<Done>> run =
|
||||||
|
monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);
|
||||||
|
|
||||||
|
FlowMonitor<Integer> monitor = run.first();
|
||||||
|
|
||||||
|
// If we peek in the monitor too early, it's possible it was not initialized yet.
|
||||||
|
printMonitorState(monitor.state());
|
||||||
|
|
||||||
|
// Periodically check the monitor
|
||||||
|
Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
|
||||||
|
.runForeach(__ -> printMonitorState(monitor.state()), actorSystem);
|
||||||
|
// #monitor
|
||||||
|
|
||||||
|
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.stream.operators.sourceorflow
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.FlowMonitor
|
||||||
|
import akka.stream.FlowMonitorState
|
||||||
|
import akka.stream.scaladsl.Keep
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
|
||||||
|
import scala.concurrent.ExecutionContextExecutor
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
class Monitor {
|
||||||
|
|
||||||
|
implicit val system = ActorSystem("monitor-sample-sys2")
|
||||||
|
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
|
||||||
|
|
||||||
|
// #monitor
|
||||||
|
val source: Source[Int, NotUsed] =
|
||||||
|
Source.fromIterator(() => Iterator.from(0))
|
||||||
|
|
||||||
|
def printMonitorState(flowMonitor: FlowMonitor[Int]) =
|
||||||
|
flowMonitor.state match {
|
||||||
|
case FlowMonitorState.Initialized =>
|
||||||
|
println("Stream is initialized but hasn't processed any element")
|
||||||
|
case FlowMonitorState.Received(msg) =>
|
||||||
|
println(s"Last element processed: $msg")
|
||||||
|
case FlowMonitorState.Failed(cause) =>
|
||||||
|
println(s"Stream failed with cause $cause")
|
||||||
|
case FlowMonitorState.Finished => println(s"Stream completed already")
|
||||||
|
}
|
||||||
|
|
||||||
|
val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right)
|
||||||
|
val (flowMonitor, futureDone) =
|
||||||
|
monitoredSource.toMat(Sink.foreach(println))(Keep.both).run()
|
||||||
|
|
||||||
|
// If we peek in the monitor too early, it's possible it was not initialized yet.
|
||||||
|
printMonitorState(flowMonitor)
|
||||||
|
|
||||||
|
// Periodically check the monitor
|
||||||
|
Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor))
|
||||||
|
|
||||||
|
// #monitor
|
||||||
|
futureDone.onComplete(_ => system.terminate())
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue