From 95a000a51bcdcb58172d8627ed8f650750539bfd Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 13:09:54 +0200 Subject: [PATCH] Adds sample code for the monitor operator --- .../operators/Source-or-Flow/monitor.md | 30 ++++++- .../operators/sourceorflow/Monitor.java | 84 +++++++++++++++++++ .../operators/sourceorflow/Monitor.scala | 63 ++++++++++++++ 3 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index e8930f8119..3f3f3c626c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -14,7 +14,35 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an -event, and may therefore affect performance. +event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek +and get information about the stream. + +## Example + +The example below uses the `monitorMat` variant of `monitor`. The only difference betwee the two operators is +that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample +below be `Keep.right` so only the `FlowMonitor[Int]` is returned. + +Scala +: @@snip [Monitor.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala) { #monitor } + +Java +: @@snip [Monitor.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java) { #monitor } + +When run, the sample code will produce something similar to: + +``` +Stream is initialized but hasn't processed any element +0 +1 +2 +Last element processed: 2 +3 +4 +5 +Stream completed already +``` + ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java new file mode 100644 index 0000000000..cab2594d8b --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -0,0 +1,84 @@ +package jdocs.stream.operators.sourceorflow; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.japi.pf.Match; +import akka.stream.FlowMonitor; +import akka.stream.FlowMonitorState; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import scala.PartialFunction; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * + */ +public class Monitor { + + private static final PartialFunction printMonitorState = Match.match( + FlowMonitorState.Initialized$.class, + __ -> { + System.out.println("Stream is initialized but hasn't processed any element"); + return NotUsed.getInstance(); + } + ).match( + FlowMonitorState.Received.class, + (msg) -> { + System.out.println("Last message received: " + msg.msg()); + return NotUsed.getInstance(); + } + ).match( + FlowMonitorState.Failed.class, + (failed) -> { + System.out.println("Stream failed with cause: " + failed.cause().getMessage()); + return NotUsed.getInstance(); + } + ).match( + FlowMonitorState.Finished$.class, + __ -> { + System.out.println("Stream completed already"); + return NotUsed.getInstance(); + } + ).build(); + + + public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + Source> monitoredSource = + Source + .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) + .throttle(5, Duration.ofSeconds(1)) + .monitorMat(Keep.right()); + + + Pair, CompletionStage> run = monitoredSource + .toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); + + FlowMonitor monitor = run.first(); + + // if we peek on the stream too early it probably won't have processed any element. + printMonitorState.apply(monitor.state()); + + // wait a few millis and peek in the stream again to see what's the latest element processed + Thread.sleep(500); + printMonitorState.apply(monitor.state()); + + // wait until the stream completed + run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); + printMonitorState.apply(monitor.state()); + + + run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); + + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala new file mode 100644 index 0000000000..0234a4af37 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -0,0 +1,63 @@ +package docs.stream.operators.sourceorflow + +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.FlowMonitor +import akka.stream.FlowMonitorState +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import scala.concurrent.Await +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** + * + */ +class Monitor { + + implicit val system = ActorSystem("monitor-sample-sys2") + implicit val executionContext: ExecutionContextExecutor = system.dispatcher + + // #monitor + val source: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.from(0)) + + def printMonitorState(flowMonitor: FlowMonitor[Int]) = { + flowMonitor.state match { + case FlowMonitorState.Initialized => + println("Stream is initialized but hasn't processed any element") + case FlowMonitorState.Received(msg) => + println(s"Last element processed: $msg") + case FlowMonitorState.Failed(cause) => + println(s"Stream failed with cause $cause") + case FlowMonitorState.Finished => println(s"Stream completed already") + } + } + + val monitoredSource: Source[Int, FlowMonitor[Int]] = source + .take(6) + .throttle(5, 1.second) + .monitorMat(Keep.right) + val monitoredStream: (FlowMonitor[Int], Future[Done]) = + monitoredSource + .toMat(Sink.foreach(println))(Keep.both) + .run() + + val flowMonitor = monitoredStream._1 + // if we peek on the stream early enough it probably won't have processed any element. + printMonitorState(flowMonitor) + // wait a few millis and peek in the stream again to see what's the latest element processed + Thread.sleep(500) + printMonitorState(flowMonitor) + // wait until the stream completed + Await.result(monitoredStream._2, 3.seconds) + printMonitorState(flowMonitor) + // #monitor + + monitoredStream._2.onComplete(_ => system.terminate()) + +}