diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index 3f3f3c626c..e453b85141 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -19,7 +19,7 @@ and get information about the stream. ## Example -The example below uses the `monitorMat` variant of `monitor`. The only difference betwee the two operators is +The example below uses the `monitorMat` variant of `monitor`. The only difference between the two operators is that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample below be `Keep.right` so only the `FlowMonitor[Int]` is returned. diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index cab2594d8b..a831d0b92c 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -1,16 +1,13 @@ package jdocs.stream.operators.sourceorflow; import akka.Done; -import akka.NotUsed; import akka.actor.ActorSystem; import akka.japi.Pair; -import akka.japi.pf.Match; import akka.stream.FlowMonitor; import akka.stream.FlowMonitorState; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import scala.PartialFunction; import java.time.Duration; import java.util.Arrays; @@ -24,36 +21,27 @@ import java.util.concurrent.TimeoutException; */ public class Monitor { - private static final PartialFunction printMonitorState = Match.match( - FlowMonitorState.Initialized$.class, - __ -> { - System.out.println("Stream is initialized but hasn't processed any element"); - return NotUsed.getInstance(); - } - ).match( - FlowMonitorState.Received.class, - (msg) -> { - System.out.println("Last message received: " + msg.msg()); - return NotUsed.getInstance(); - } - ).match( - FlowMonitorState.Failed.class, - (failed) -> { - System.out.println("Stream failed with cause: " + failed.cause().getMessage()); - return NotUsed.getInstance(); - } - ).match( - FlowMonitorState.Finished$.class, - __ -> { - System.out.println("Stream completed already"); - return NotUsed.getInstance(); - } - ).build(); + // #monitor + private static void printMonitorState(FlowMonitorState.StreamState state) { + if (FlowMonitorState.Finished$.class.isInstance(state)) { + System.out.println("Stream is initialized but hasn't processed any element"); + } else if (FlowMonitorState.Received.class.isInstance(state)) { + FlowMonitorState.Received msg = (FlowMonitorState.Received) state; + System.out.println("Last message received: " + msg.msg()); + } else if (FlowMonitorState.Failed.class.isInstance(state)) { + Throwable cause = ((FlowMonitorState.Failed) state).cause(); + System.out.println("Stream failed with cause: " + cause.getMessage()); + } else { + System.out.println("Stream completed already"); + } + } + // #monitor public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + // #monitor Source> monitoredSource = Source .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) @@ -67,16 +55,25 @@ public class Monitor { FlowMonitor monitor = run.first(); // if we peek on the stream too early it probably won't have processed any element. - printMonitorState.apply(monitor.state()); - - // wait a few millis and peek in the stream again to see what's the latest element processed + printMonitorState(monitor.state()); + // #monitor + // exclude from rendered snippet Thread.sleep(500); - printMonitorState.apply(monitor.state()); + // #monitor - // wait until the stream completed + // ... + // sometime later, our code has progressed. We can peek in the stream + // again to see what's the latest element processed + printMonitorState(monitor.state()); + + // #monitor + // exclude from rendered snippet run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - printMonitorState.apply(monitor.state()); - + // #monitor + // #monitor + // Eventually, the stream completes and if we check the state it reports the streasm finished. + printMonitorState(monitor.state()); + // #monitor run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index 0234a4af37..48047e7718 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -48,13 +48,25 @@ class Monitor { .run() val flowMonitor = monitoredStream._1 + // if we peek on the stream early enough it probably won't have processed any element. printMonitorState(flowMonitor) - // wait a few millis and peek in the stream again to see what's the latest element processed + // #monitor + // exclude from rendered snippet Thread.sleep(500) + // #monitor + + // ... + // sometime later, our code has progressed. We can peek in the stream + // again to see what's the latest element processed printMonitorState(flowMonitor) - // wait until the stream completed + + // #monitor + // exclude from rendered snippet Await.result(monitoredStream._2, 3.seconds) + // #monitor + // #monitor + // Eventually, the stream completes and if we check the state it reports the streasm finished. printMonitorState(flowMonitor) // #monitor