diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index e8930f8119..f116989cf6 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -6,15 +6,43 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple ## Signature -@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" } -@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" } +@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" } +@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" } ## Description -Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise -passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an -event, and may therefore affect performance. +Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stream. Elements +pass through unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an +event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek +and get information about the stream. + +## Example + +The example below uses the `monitorMat` variant of `monitor`. The only difference between the two operators is +that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample +below be `Keep.right` so only the `FlowMonitor[Int]` is returned. + +Scala +: @@snip [Monitor.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala) { #monitor } + +Java +: @@snip [Monitor.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java) { #monitor } + +When run, the sample code will produce something similar to: + +``` +Stream is initialized but hasn't processed any element +0 +1 +2 +Last element processed: 2 +3 +4 +5 +Stream completed already +``` + ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java new file mode 100644 index 0000000000..bd2a77ea7d --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.stream.FlowMonitor; +import akka.stream.FlowMonitorState; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** */ +public class Monitor { + + // #monitor + private static void printMonitorState(FlowMonitorState.StreamState state) { + if (state == FlowMonitorState.finished()) { + System.out.println("Stream is initialized but hasn't processed any element"); + } else if (state instanceof FlowMonitorState.Received) { + FlowMonitorState.Received msg = (FlowMonitorState.Received) state; + System.out.println("Last message received: " + msg.msg()); + } else if (state instanceof FlowMonitorState.Failed) { + Throwable cause = ((FlowMonitorState.Failed) state).cause(); + System.out.println("Stream failed with cause: " + cause.getMessage()); + } else { + System.out.println("Stream completed already"); + } + } + // #monitor + + public static void main(String[] args) + throws InterruptedException, TimeoutException, ExecutionException { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + // #monitor + Source> monitoredSource = + Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) + .throttle(5, Duration.ofSeconds(1)) + .monitorMat(Keep.right()); + + Pair, CompletionStage> run = + monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); + + FlowMonitor monitor = run.first(); + + // If we peek in the monitor too early, it's possible it was not initialized yet. + printMonitorState(monitor.state()); + + // Periodically check the monitor + Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") + .runForeach(__ -> printMonitorState(monitor.state()), actorSystem); + // #monitor + + run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala new file mode 100644 index 0000000000..eb2ad1a6de --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.FlowMonitor +import akka.stream.FlowMonitorState +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ + +class Monitor { + + implicit val system = ActorSystem("monitor-sample-sys2") + implicit val executionContext: ExecutionContextExecutor = system.dispatcher + + // #monitor + val source: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.from(0)) + + def printMonitorState(flowMonitor: FlowMonitor[Int]) = + flowMonitor.state match { + case FlowMonitorState.Initialized => + println("Stream is initialized but hasn't processed any element") + case FlowMonitorState.Received(msg) => + println(s"Last element processed: $msg") + case FlowMonitorState.Failed(cause) => + println(s"Stream failed with cause $cause") + case FlowMonitorState.Finished => println(s"Stream completed already") + } + + val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right) + val (flowMonitor, futureDone) = + monitoredSource.toMat(Sink.foreach(println))(Keep.both).run() + + // If we peek in the monitor too early, it's possible it was not initialized yet. + printMonitorState(flowMonitor) + + // Periodically check the monitor + Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor)) + + // #monitor + futureDone.onComplete(_ => system.terminate()) + +}