diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index dbce608196..a304b73d8b 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -5,6 +5,7 @@ package jdocs.stream.operators.sourceorflow; import akka.Done; +import akka.NotUsed; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.FlowMonitor; @@ -54,28 +55,20 @@ public class Monitor { FlowMonitor monitor = run.first(); - // if we peek on the stream too early it probably won't have processed any element. + // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(monitor.state()); - // At this point, the application will continue to run and future - // invocations to `printMonitorState(flowMonitor)` will continue to show - // the progress in the stream + // Periodically check the monitor + Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") + .map( + __ -> { + printMonitorState(monitor.state()); + return NotUsed.getInstance(); + }) + .to(Sink.ignore()) + .run(actorSystem); // #monitor - // Don't use `Thread#sleep` in your code. It's a blocking call - // that can starve the thread-pool. - Thread.sleep(500); - - // sometime later, our code has progressed. We can peek in the stream - // again to see what's the latest element processed - printMonitorState(monitor.state()); - - // Don't use `CompletableFuture#get` in your code. It's a blocking call - // that can starve the thread-pool. - run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - // Eventually, the stream completes and if we check the state it reports the streasm finished. - printMonitorState(monitor.state()); - run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index b90836b02b..26f8e1df95 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -13,7 +13,6 @@ import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import scala.concurrent.Await import scala.concurrent.ExecutionContextExecutor import scala.concurrent.Future import scala.concurrent.duration._ @@ -48,28 +47,13 @@ class Monitor { val flowMonitor = monitoredStream._1 - // if we peek on the stream early enough it probably won't have processed any element. + // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(flowMonitor) - // At this point, the application will continue to run and future - // invocations to `printMonitorState(flowMonitor)` will continue to show - // the progress in the stream + // Periodically check the monitor + Source.tick(200.millis, 400.millis, "").map(_ => printMonitorState(flowMonitor)).to(Sink.ignore).run + // #monitor - - // Don't use `Thread#sleep` in your code. It's a blocking call - // that can starve the thread-pool. - Thread.sleep(500) - - // sometime later, our code has progressed. We can peek in the stream - // again to see what's the latest element processed - printMonitorState(flowMonitor) - - // Don't use `Await#result` in your code. It's a blocking call - // that can starve the thread-pool. - Await.result(monitoredStream._2, 3.seconds) - // Eventually, the stream completes and if we check the state it reports the streasm finished. - printMonitorState(flowMonitor) - monitoredStream._2.onComplete(_ => system.terminate()) }