From 95a000a51bcdcb58172d8627ed8f650750539bfd Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 13:09:54 +0200 Subject: [PATCH 1/7] Adds sample code for the monitor operator --- .../operators/Source-or-Flow/monitor.md | 30 ++++++- .../operators/sourceorflow/Monitor.java | 84 +++++++++++++++++++ .../operators/sourceorflow/Monitor.scala | 63 ++++++++++++++ 3 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index e8930f8119..3f3f3c626c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -14,7 +14,35 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an -event, and may therefore affect performance. +event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek +and get information about the stream. + +## Example + +The example below uses the `monitorMat` variant of `monitor`. The only difference betwee the two operators is +that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample +below be `Keep.right` so only the `FlowMonitor[Int]` is returned. + +Scala +: @@snip [Monitor.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala) { #monitor } + +Java +: @@snip [Monitor.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java) { #monitor } + +When run, the sample code will produce something similar to: + +``` +Stream is initialized but hasn't processed any element +0 +1 +2 +Last element processed: 2 +3 +4 +5 +Stream completed already +``` + ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java new file mode 100644 index 0000000000..cab2594d8b --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -0,0 +1,84 @@ +package jdocs.stream.operators.sourceorflow; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.japi.pf.Match; +import akka.stream.FlowMonitor; +import akka.stream.FlowMonitorState; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import scala.PartialFunction; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * + */ +public class Monitor { + + private static final PartialFunction printMonitorState = Match.match( + FlowMonitorState.Initialized$.class, + __ -> { + System.out.println("Stream is initialized but hasn't processed any element"); + return NotUsed.getInstance(); + } + ).match( + FlowMonitorState.Received.class, + (msg) -> { + System.out.println("Last message received: " + msg.msg()); + return NotUsed.getInstance(); + } + ).match( + FlowMonitorState.Failed.class, + (failed) -> { + System.out.println("Stream failed with cause: " + failed.cause().getMessage()); + return NotUsed.getInstance(); + } + ).match( + FlowMonitorState.Finished$.class, + __ -> { + System.out.println("Stream completed already"); + return NotUsed.getInstance(); + } + ).build(); + + + public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + Source> monitoredSource = + Source + .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) + .throttle(5, Duration.ofSeconds(1)) + .monitorMat(Keep.right()); + + + Pair, CompletionStage> run = monitoredSource + .toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); + + FlowMonitor monitor = run.first(); + + // if we peek on the stream too early it probably won't have processed any element. + printMonitorState.apply(monitor.state()); + + // wait a few millis and peek in the stream again to see what's the latest element processed + Thread.sleep(500); + printMonitorState.apply(monitor.state()); + + // wait until the stream completed + run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); + printMonitorState.apply(monitor.state()); + + + run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); + + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala new file mode 100644 index 0000000000..0234a4af37 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -0,0 +1,63 @@ +package docs.stream.operators.sourceorflow + +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.FlowMonitor +import akka.stream.FlowMonitorState +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import scala.concurrent.Await +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** + * + */ +class Monitor { + + implicit val system = ActorSystem("monitor-sample-sys2") + implicit val executionContext: ExecutionContextExecutor = system.dispatcher + + // #monitor + val source: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.from(0)) + + def printMonitorState(flowMonitor: FlowMonitor[Int]) = { + flowMonitor.state match { + case FlowMonitorState.Initialized => + println("Stream is initialized but hasn't processed any element") + case FlowMonitorState.Received(msg) => + println(s"Last element processed: $msg") + case FlowMonitorState.Failed(cause) => + println(s"Stream failed with cause $cause") + case FlowMonitorState.Finished => println(s"Stream completed already") + } + } + + val monitoredSource: Source[Int, FlowMonitor[Int]] = source + .take(6) + .throttle(5, 1.second) + .monitorMat(Keep.right) + val monitoredStream: (FlowMonitor[Int], Future[Done]) = + monitoredSource + .toMat(Sink.foreach(println))(Keep.both) + .run() + + val flowMonitor = monitoredStream._1 + // if we peek on the stream early enough it probably won't have processed any element. + printMonitorState(flowMonitor) + // wait a few millis and peek in the stream again to see what's the latest element processed + Thread.sleep(500) + printMonitorState(flowMonitor) + // wait until the stream completed + Await.result(monitoredStream._2, 3.seconds) + printMonitorState(flowMonitor) + // #monitor + + monitoredStream._2.onComplete(_ => system.terminate()) + +} From 0e3b4605ad6e7468f2ca200c33dabe76cd1c3b7d Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 14:25:27 +0200 Subject: [PATCH 2/7] Simplify code: instanceof, hide sleeps,... --- .../operators/Source-or-Flow/monitor.md | 2 +- .../operators/sourceorflow/Monitor.java | 67 +++++++++---------- .../operators/sourceorflow/Monitor.scala | 16 ++++- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index 3f3f3c626c..e453b85141 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -19,7 +19,7 @@ and get information about the stream. ## Example -The example below uses the `monitorMat` variant of `monitor`. The only difference betwee the two operators is +The example below uses the `monitorMat` variant of `monitor`. The only difference between the two operators is that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample below be `Keep.right` so only the `FlowMonitor[Int]` is returned. diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index cab2594d8b..a831d0b92c 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -1,16 +1,13 @@ package jdocs.stream.operators.sourceorflow; import akka.Done; -import akka.NotUsed; import akka.actor.ActorSystem; import akka.japi.Pair; -import akka.japi.pf.Match; import akka.stream.FlowMonitor; import akka.stream.FlowMonitorState; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import scala.PartialFunction; import java.time.Duration; import java.util.Arrays; @@ -24,36 +21,27 @@ import java.util.concurrent.TimeoutException; */ public class Monitor { - private static final PartialFunction printMonitorState = Match.match( - FlowMonitorState.Initialized$.class, - __ -> { - System.out.println("Stream is initialized but hasn't processed any element"); - return NotUsed.getInstance(); - } - ).match( - FlowMonitorState.Received.class, - (msg) -> { - System.out.println("Last message received: " + msg.msg()); - return NotUsed.getInstance(); - } - ).match( - FlowMonitorState.Failed.class, - (failed) -> { - System.out.println("Stream failed with cause: " + failed.cause().getMessage()); - return NotUsed.getInstance(); - } - ).match( - FlowMonitorState.Finished$.class, - __ -> { - System.out.println("Stream completed already"); - return NotUsed.getInstance(); - } - ).build(); + // #monitor + private static void printMonitorState(FlowMonitorState.StreamState state) { + if (FlowMonitorState.Finished$.class.isInstance(state)) { + System.out.println("Stream is initialized but hasn't processed any element"); + } else if (FlowMonitorState.Received.class.isInstance(state)) { + FlowMonitorState.Received msg = (FlowMonitorState.Received) state; + System.out.println("Last message received: " + msg.msg()); + } else if (FlowMonitorState.Failed.class.isInstance(state)) { + Throwable cause = ((FlowMonitorState.Failed) state).cause(); + System.out.println("Stream failed with cause: " + cause.getMessage()); + } else { + System.out.println("Stream completed already"); + } + } + // #monitor public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + // #monitor Source> monitoredSource = Source .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) @@ -67,16 +55,25 @@ public class Monitor { FlowMonitor monitor = run.first(); // if we peek on the stream too early it probably won't have processed any element. - printMonitorState.apply(monitor.state()); - - // wait a few millis and peek in the stream again to see what's the latest element processed + printMonitorState(monitor.state()); + // #monitor + // exclude from rendered snippet Thread.sleep(500); - printMonitorState.apply(monitor.state()); + // #monitor - // wait until the stream completed + // ... + // sometime later, our code has progressed. We can peek in the stream + // again to see what's the latest element processed + printMonitorState(monitor.state()); + + // #monitor + // exclude from rendered snippet run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - printMonitorState.apply(monitor.state()); - + // #monitor + // #monitor + // Eventually, the stream completes and if we check the state it reports the streasm finished. + printMonitorState(monitor.state()); + // #monitor run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index 0234a4af37..48047e7718 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -48,13 +48,25 @@ class Monitor { .run() val flowMonitor = monitoredStream._1 + // if we peek on the stream early enough it probably won't have processed any element. printMonitorState(flowMonitor) - // wait a few millis and peek in the stream again to see what's the latest element processed + // #monitor + // exclude from rendered snippet Thread.sleep(500) + // #monitor + + // ... + // sometime later, our code has progressed. We can peek in the stream + // again to see what's the latest element processed printMonitorState(flowMonitor) - // wait until the stream completed + + // #monitor + // exclude from rendered snippet Await.result(monitoredStream._2, 3.seconds) + // #monitor + // #monitor + // Eventually, the stream completes and if we check the state it reports the streasm finished. printMonitorState(flowMonitor) // #monitor From 024710a6e0861585ae8591358c9c503f65f473d1 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 14:58:56 +0200 Subject: [PATCH 3/7] Headers and formats --- .../operators/sourceorflow/Monitor.java | 109 +++++++++--------- .../operators/sourceorflow/Monitor.scala | 15 ++- 2 files changed, 61 insertions(+), 63 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index a831d0b92c..4a08e1023f 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package jdocs.stream.operators.sourceorflow; import akka.Done; @@ -16,66 +20,61 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -/** - * - */ +/** */ public class Monitor { - // #monitor - private static void printMonitorState(FlowMonitorState.StreamState state) { - if (FlowMonitorState.Finished$.class.isInstance(state)) { - System.out.println("Stream is initialized but hasn't processed any element"); - } else if (FlowMonitorState.Received.class.isInstance(state)) { - FlowMonitorState.Received msg = (FlowMonitorState.Received) state; - System.out.println("Last message received: " + msg.msg()); - } else if (FlowMonitorState.Failed.class.isInstance(state)) { - Throwable cause = ((FlowMonitorState.Failed) state).cause(); - System.out.println("Stream failed with cause: " + cause.getMessage()); - } else { - System.out.println("Stream completed already"); - } + // #monitor + private static void printMonitorState(FlowMonitorState.StreamState state) { + if (FlowMonitorState.Finished$.class.isInstance(state)) { + System.out.println("Stream is initialized but hasn't processed any element"); + } else if (FlowMonitorState.Received.class.isInstance(state)) { + FlowMonitorState.Received msg = (FlowMonitorState.Received) state; + System.out.println("Last message received: " + msg.msg()); + } else if (FlowMonitorState.Failed.class.isInstance(state)) { + Throwable cause = ((FlowMonitorState.Failed) state).cause(); + System.out.println("Stream failed with cause: " + cause.getMessage()); + } else { + System.out.println("Stream completed already"); } + } + // #monitor + + public static void main(String[] args) + throws InterruptedException, TimeoutException, ExecutionException { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + // #monitor + Source> monitoredSource = + Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) + .throttle(5, Duration.ofSeconds(1)) + .monitorMat(Keep.right()); + + Pair, CompletionStage> run = + monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); + + FlowMonitor monitor = run.first(); + + // if we peek on the stream too early it probably won't have processed any element. + printMonitorState(monitor.state()); + // #monitor + // exclude from rendered snippet + Thread.sleep(500); // #monitor + // ... + // sometime later, our code has progressed. We can peek in the stream + // again to see what's the latest element processed + printMonitorState(monitor.state()); - public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { - ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + // #monitor + // exclude from rendered snippet + run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); + // #monitor + // #monitor + // Eventually, the stream completes and if we check the state it reports the streasm finished. + printMonitorState(monitor.state()); + // #monitor - // #monitor - Source> monitoredSource = - Source - .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) - .throttle(5, Duration.ofSeconds(1)) - .monitorMat(Keep.right()); - - - Pair, CompletionStage> run = monitoredSource - .toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); - - FlowMonitor monitor = run.first(); - - // if we peek on the stream too early it probably won't have processed any element. - printMonitorState(monitor.state()); - // #monitor - // exclude from rendered snippet - Thread.sleep(500); - // #monitor - - // ... - // sometime later, our code has progressed. We can peek in the stream - // again to see what's the latest element processed - printMonitorState(monitor.state()); - - // #monitor - // exclude from rendered snippet - run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - // #monitor - // #monitor - // Eventually, the stream completes and if we check the state it reports the streasm finished. - printMonitorState(monitor.state()); - // #monitor - - run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); - - } + run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index 48047e7718..e304ef581e 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package docs.stream.operators.sourceorflow import akka.Done @@ -15,7 +19,7 @@ import scala.concurrent.Future import scala.concurrent.duration._ /** - * + * */ class Monitor { @@ -38,14 +42,9 @@ class Monitor { } } - val monitoredSource: Source[Int, FlowMonitor[Int]] = source - .take(6) - .throttle(5, 1.second) - .monitorMat(Keep.right) + val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right) val monitoredStream: (FlowMonitor[Int], Future[Done]) = - monitoredSource - .toMat(Sink.foreach(println))(Keep.both) - .run() + monitoredSource.toMat(Sink.foreach(println))(Keep.both).run() val flowMonitor = monitoredStream._1 From 2b71abe78e091467e901b9e7a1b9bf667a4dc8d0 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 15:27:25 +0200 Subject: [PATCH 4/7] Show less code. More informative comments (still hides blocking calls --- .../operators/sourceorflow/Monitor.java | 19 ++++++++++--------- .../operators/sourceorflow/Monitor.scala | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index 4a08e1023f..dbce608196 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -56,24 +56,25 @@ public class Monitor { // if we peek on the stream too early it probably won't have processed any element. printMonitorState(monitor.state()); - // #monitor - // exclude from rendered snippet - Thread.sleep(500); + + // At this point, the application will continue to run and future + // invocations to `printMonitorState(flowMonitor)` will continue to show + // the progress in the stream // #monitor - // ... + // Don't use `Thread#sleep` in your code. It's a blocking call + // that can starve the thread-pool. + Thread.sleep(500); + // sometime later, our code has progressed. We can peek in the stream // again to see what's the latest element processed printMonitorState(monitor.state()); - // #monitor - // exclude from rendered snippet + // Don't use `CompletableFuture#get` in your code. It's a blocking call + // that can starve the thread-pool. run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - // #monitor - // #monitor // Eventually, the stream completes and if we check the state it reports the streasm finished. printMonitorState(monitor.state()); - // #monitor run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index e304ef581e..b90836b02b 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -50,24 +50,25 @@ class Monitor { // if we peek on the stream early enough it probably won't have processed any element. printMonitorState(flowMonitor) - // #monitor - // exclude from rendered snippet - Thread.sleep(500) + + // At this point, the application will continue to run and future + // invocations to `printMonitorState(flowMonitor)` will continue to show + // the progress in the stream // #monitor - // ... + // Don't use `Thread#sleep` in your code. It's a blocking call + // that can starve the thread-pool. + Thread.sleep(500) + // sometime later, our code has progressed. We can peek in the stream // again to see what's the latest element processed printMonitorState(flowMonitor) - // #monitor - // exclude from rendered snippet + // Don't use `Await#result` in your code. It's a blocking call + // that can starve the thread-pool. Await.result(monitoredStream._2, 3.seconds) - // #monitor - // #monitor // Eventually, the stream completes and if we check the state it reports the streasm finished. printMonitorState(flowMonitor) - // #monitor monitoredStream._2.onComplete(_ => system.terminate()) From f187abddd17abfdf149e7e9dc5fa62b3809646f4 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 17:08:11 +0200 Subject: [PATCH 5/7] Prefer monitoring from a separate stream (without blocking calls) --- .../operators/sourceorflow/Monitor.java | 29 +++++++------------ .../operators/sourceorflow/Monitor.scala | 24 +++------------ 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index dbce608196..a304b73d8b 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -5,6 +5,7 @@ package jdocs.stream.operators.sourceorflow; import akka.Done; +import akka.NotUsed; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.FlowMonitor; @@ -54,28 +55,20 @@ public class Monitor { FlowMonitor monitor = run.first(); - // if we peek on the stream too early it probably won't have processed any element. + // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(monitor.state()); - // At this point, the application will continue to run and future - // invocations to `printMonitorState(flowMonitor)` will continue to show - // the progress in the stream + // Periodically check the monitor + Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") + .map( + __ -> { + printMonitorState(monitor.state()); + return NotUsed.getInstance(); + }) + .to(Sink.ignore()) + .run(actorSystem); // #monitor - // Don't use `Thread#sleep` in your code. It's a blocking call - // that can starve the thread-pool. - Thread.sleep(500); - - // sometime later, our code has progressed. We can peek in the stream - // again to see what's the latest element processed - printMonitorState(monitor.state()); - - // Don't use `CompletableFuture#get` in your code. It's a blocking call - // that can starve the thread-pool. - run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - // Eventually, the stream completes and if we check the state it reports the streasm finished. - printMonitorState(monitor.state()); - run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index b90836b02b..26f8e1df95 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -13,7 +13,6 @@ import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import scala.concurrent.Await import scala.concurrent.ExecutionContextExecutor import scala.concurrent.Future import scala.concurrent.duration._ @@ -48,28 +47,13 @@ class Monitor { val flowMonitor = monitoredStream._1 - // if we peek on the stream early enough it probably won't have processed any element. + // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(flowMonitor) - // At this point, the application will continue to run and future - // invocations to `printMonitorState(flowMonitor)` will continue to show - // the progress in the stream + // Periodically check the monitor + Source.tick(200.millis, 400.millis, "").map(_ => printMonitorState(flowMonitor)).to(Sink.ignore).run + // #monitor - - // Don't use `Thread#sleep` in your code. It's a blocking call - // that can starve the thread-pool. - Thread.sleep(500) - - // sometime later, our code has progressed. We can peek in the stream - // again to see what's the latest element processed - printMonitorState(flowMonitor) - - // Don't use `Await#result` in your code. It's a blocking call - // that can starve the thread-pool. - Await.result(monitoredStream._2, 3.seconds) - // Eventually, the stream completes and if we check the state it reports the streasm finished. - printMonitorState(flowMonitor) - monitoredStream._2.onComplete(_ => system.terminate()) } From 6c12ae117acc3a65d01eda24ce887a9d7693e73c Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Mon, 13 Jul 2020 10:46:16 +0200 Subject: [PATCH 6/7] PR comments --- .../stream/operators/Source-or-Flow/monitor.md | 8 ++++---- .../stream/operators/sourceorflow/Monitor.java | 16 +++++----------- .../stream/operators/sourceorflow/Monitor.scala | 16 ++++------------ 3 files changed, 13 insertions(+), 27 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index e453b85141..f116989cf6 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -6,14 +6,14 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple ## Signature -@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" } -@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" } +@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" } +@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" } ## Description -Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise -passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an +Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stream. Elements +pass through unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek and get information about the stream. diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index a304b73d8b..21188e19be 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -26,12 +26,12 @@ public class Monitor { // #monitor private static void printMonitorState(FlowMonitorState.StreamState state) { - if (FlowMonitorState.Finished$.class.isInstance(state)) { + if (state == FlowMonitorState.finished()) { System.out.println("Stream is initialized but hasn't processed any element"); - } else if (FlowMonitorState.Received.class.isInstance(state)) { + } else if (state instanceof FlowMonitorState.Received) { FlowMonitorState.Received msg = (FlowMonitorState.Received) state; System.out.println("Last message received: " + msg.msg()); - } else if (FlowMonitorState.Failed.class.isInstance(state)) { + } else if (state instanceof FlowMonitorState.Failed) { Throwable cause = ((FlowMonitorState.Failed) state).cause(); System.out.println("Stream failed with cause: " + cause.getMessage()); } else { @@ -60,14 +60,8 @@ public class Monitor { // Periodically check the monitor Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") - .map( - __ -> { - printMonitorState(monitor.state()); - return NotUsed.getInstance(); - }) - .to(Sink.ignore()) - .run(actorSystem); - // #monitor + .runForeach(__ -> printMonitorState(monitor.state()), actorSystem); + // #monitor run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index 26f8e1df95..eb2ad1a6de 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -4,7 +4,6 @@ package docs.stream.operators.sourceorflow -import akka.Done import akka.NotUsed import akka.actor.ActorSystem import akka.stream.FlowMonitor @@ -14,12 +13,8 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.Future import scala.concurrent.duration._ -/** - * - */ class Monitor { implicit val system = ActorSystem("monitor-sample-sys2") @@ -29,7 +24,7 @@ class Monitor { val source: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.from(0)) - def printMonitorState(flowMonitor: FlowMonitor[Int]) = { + def printMonitorState(flowMonitor: FlowMonitor[Int]) = flowMonitor.state match { case FlowMonitorState.Initialized => println("Stream is initialized but hasn't processed any element") @@ -39,21 +34,18 @@ class Monitor { println(s"Stream failed with cause $cause") case FlowMonitorState.Finished => println(s"Stream completed already") } - } val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right) - val monitoredStream: (FlowMonitor[Int], Future[Done]) = + val (flowMonitor, futureDone) = monitoredSource.toMat(Sink.foreach(println))(Keep.both).run() - val flowMonitor = monitoredStream._1 - // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(flowMonitor) // Periodically check the monitor - Source.tick(200.millis, 400.millis, "").map(_ => printMonitorState(flowMonitor)).to(Sink.ignore).run + Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor)) // #monitor - monitoredStream._2.onComplete(_ => system.terminate()) + futureDone.onComplete(_ => system.terminate()) } From 6ca92cc54355d85e3a33e829c433b8a7b4f7bd89 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Mon, 13 Jul 2020 14:04:28 +0200 Subject: [PATCH 7/7] javafmt --- .../java/jdocs/stream/operators/sourceorflow/Monitor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index 21188e19be..bd2a77ea7d 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -60,8 +60,8 @@ public class Monitor { // Periodically check the monitor Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") - .runForeach(__ -> printMonitorState(monitor.state()), actorSystem); - // #monitor + .runForeach(__ -> printMonitorState(monitor.state()), actorSystem); + // #monitor run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); }