Prefer monitoring from a separate stream (without blocking calls)

This commit is contained in:
Ignasi Marimon-Clos 2020-07-07 17:08:11 +02:00
parent 2b71abe78e
commit f187abddd1
2 changed files with 15 additions and 38 deletions

View file

@ -5,6 +5,7 @@
package jdocs.stream.operators.sourceorflow;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.FlowMonitor;
@ -54,28 +55,20 @@ public class Monitor {
FlowMonitor<Integer> monitor = run.first();
// if we peek on the stream too early it probably won't have processed any element.
// If we peek in the monitor too early, it's possible it was not initialized yet.
printMonitorState(monitor.state());
// At this point, the application will continue to run and future
// invocations to `printMonitorState(flowMonitor)` will continue to show
// the progress in the stream
// Periodically check the monitor
Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
.map(
__ -> {
printMonitorState(monitor.state());
return NotUsed.getInstance();
})
.to(Sink.ignore())
.run(actorSystem);
// #monitor
// Don't use `Thread#sleep` in your code. It's a blocking call
// that can starve the thread-pool.
Thread.sleep(500);
// sometime later, our code has progressed. We can peek in the stream
// again to see what's the latest element processed
printMonitorState(monitor.state());
// Don't use `CompletableFuture#get` in your code. It's a blocking call
// that can starve the thread-pool.
run.second().toCompletableFuture().get(1, TimeUnit.SECONDS);
// Eventually, the stream completes and if we check the state it reports the streasm finished.
printMonitorState(monitor.state());
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
}
}

View file

@ -13,7 +13,6 @@ import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import scala.concurrent.Await
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import scala.concurrent.duration._
@ -48,28 +47,13 @@ class Monitor {
val flowMonitor = monitoredStream._1
// if we peek on the stream early enough it probably won't have processed any element.
// If we peek in the monitor too early, it's possible it was not initialized yet.
printMonitorState(flowMonitor)
// At this point, the application will continue to run and future
// invocations to `printMonitorState(flowMonitor)` will continue to show
// the progress in the stream
// Periodically check the monitor
Source.tick(200.millis, 400.millis, "").map(_ => printMonitorState(flowMonitor)).to(Sink.ignore).run
// #monitor
// Don't use `Thread#sleep` in your code. It's a blocking call
// that can starve the thread-pool.
Thread.sleep(500)
// sometime later, our code has progressed. We can peek in the stream
// again to see what's the latest element processed
printMonitorState(flowMonitor)
// Don't use `Await#result` in your code. It's a blocking call
// that can starve the thread-pool.
Await.result(monitoredStream._2, 3.seconds)
// Eventually, the stream completes and if we check the state it reports the streasm finished.
printMonitorState(flowMonitor)
monitoredStream._2.onComplete(_ => system.terminate())
}