From 6c12ae117acc3a65d01eda24ce887a9d7693e73c Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Mon, 13 Jul 2020 10:46:16 +0200 Subject: [PATCH] PR comments --- .../stream/operators/Source-or-Flow/monitor.md | 8 ++++---- .../stream/operators/sourceorflow/Monitor.java | 16 +++++----------- .../stream/operators/sourceorflow/Monitor.scala | 16 ++++------------ 3 files changed, 13 insertions(+), 27 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md index e453b85141..f116989cf6 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/monitor.md @@ -6,14 +6,14 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple ## Signature -@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" } -@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" } +@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" } +@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" } ## Description -Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise -passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an +Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stream. Elements +pass through unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek and get information about the stream. diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index a304b73d8b..21188e19be 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -26,12 +26,12 @@ public class Monitor { // #monitor private static void printMonitorState(FlowMonitorState.StreamState state) { - if (FlowMonitorState.Finished$.class.isInstance(state)) { + if (state == FlowMonitorState.finished()) { System.out.println("Stream is initialized but hasn't processed any element"); - } else if (FlowMonitorState.Received.class.isInstance(state)) { + } else if (state instanceof FlowMonitorState.Received) { FlowMonitorState.Received msg = (FlowMonitorState.Received) state; System.out.println("Last message received: " + msg.msg()); - } else if (FlowMonitorState.Failed.class.isInstance(state)) { + } else if (state instanceof FlowMonitorState.Failed) { Throwable cause = ((FlowMonitorState.Failed) state).cause(); System.out.println("Stream failed with cause: " + cause.getMessage()); } else { @@ -60,14 +60,8 @@ public class Monitor { // Periodically check the monitor Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") - .map( - __ -> { - printMonitorState(monitor.state()); - return NotUsed.getInstance(); - }) - .to(Sink.ignore()) - .run(actorSystem); - // #monitor + .runForeach(__ -> printMonitorState(monitor.state()), actorSystem); + // #monitor run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index 26f8e1df95..eb2ad1a6de 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -4,7 +4,6 @@ package docs.stream.operators.sourceorflow -import akka.Done import akka.NotUsed import akka.actor.ActorSystem import akka.stream.FlowMonitor @@ -14,12 +13,8 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.Future import scala.concurrent.duration._ -/** - * - */ class Monitor { implicit val system = ActorSystem("monitor-sample-sys2") @@ -29,7 +24,7 @@ class Monitor { val source: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.from(0)) - def printMonitorState(flowMonitor: FlowMonitor[Int]) = { + def printMonitorState(flowMonitor: FlowMonitor[Int]) = flowMonitor.state match { case FlowMonitorState.Initialized => println("Stream is initialized but hasn't processed any element") @@ -39,21 +34,18 @@ class Monitor { println(s"Stream failed with cause $cause") case FlowMonitorState.Finished => println(s"Stream completed already") } - } val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right) - val monitoredStream: (FlowMonitor[Int], Future[Done]) = + val (flowMonitor, futureDone) = monitoredSource.toMat(Sink.foreach(println))(Keep.both).run() - val flowMonitor = monitoredStream._1 - // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(flowMonitor) // Periodically check the monitor - Source.tick(200.millis, 400.millis, "").map(_ => printMonitorState(flowMonitor)).to(Sink.ignore).run + Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor)) // #monitor - monitoredStream._2.onComplete(_ => system.terminate()) + futureDone.onComplete(_ => system.terminate()) }