Simplify code: instanceof, hide sleeps,...

This commit is contained in:
Ignasi Marimon-Clos 2020-07-07 14:25:27 +02:00
parent 95a000a51b
commit 0e3b4605ad
3 changed files with 47 additions and 38 deletions

View file

@ -19,7 +19,7 @@ and get information about the stream.
## Example ## Example
The example below uses the `monitorMat` variant of `monitor`. The only difference betwee the two operators is The example below uses the `monitorMat` variant of `monitor`. The only difference between the two operators is
that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample
below be `Keep.right` so only the `FlowMonitor[Int]` is returned. below be `Keep.right` so only the `FlowMonitor[Int]` is returned.

View file

@ -1,16 +1,13 @@
package jdocs.stream.operators.sourceorflow; package jdocs.stream.operators.sourceorflow;
import akka.Done; import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.pf.Match;
import akka.stream.FlowMonitor; import akka.stream.FlowMonitor;
import akka.stream.FlowMonitorState; import akka.stream.FlowMonitorState;
import akka.stream.javadsl.Keep; import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import scala.PartialFunction;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
@ -24,36 +21,27 @@ import java.util.concurrent.TimeoutException;
*/ */
public class Monitor { public class Monitor {
private static final PartialFunction<Object, NotUsed> printMonitorState = Match.match( // #monitor
FlowMonitorState.Initialized$.class, private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
__ -> { if (FlowMonitorState.Finished$.class.isInstance(state)) {
System.out.println("Stream is initialized but hasn't processed any element"); System.out.println("Stream is initialized but hasn't processed any element");
return NotUsed.getInstance(); } else if (FlowMonitorState.Received.class.isInstance(state)) {
} FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
).match( System.out.println("Last message received: " + msg.msg());
FlowMonitorState.Received.class, } else if (FlowMonitorState.Failed.class.isInstance(state)) {
(msg) -> { Throwable cause = ((FlowMonitorState.Failed) state).cause();
System.out.println("Last message received: " + msg.msg()); System.out.println("Stream failed with cause: " + cause.getMessage());
return NotUsed.getInstance(); } else {
} System.out.println("Stream completed already");
).match( }
FlowMonitorState.Failed.class, }
(failed) -> { // #monitor
System.out.println("Stream failed with cause: " + failed.cause().getMessage());
return NotUsed.getInstance();
}
).match(
FlowMonitorState.Finished$.class,
__ -> {
System.out.println("Stream completed already");
return NotUsed.getInstance();
}
).build();
public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException {
ActorSystem actorSystem = ActorSystem.create("25fps-stream"); ActorSystem actorSystem = ActorSystem.create("25fps-stream");
// #monitor
Source<Integer, FlowMonitor<Integer>> monitoredSource = Source<Integer, FlowMonitor<Integer>> monitoredSource =
Source Source
.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
@ -67,16 +55,25 @@ public class Monitor {
FlowMonitor<Integer> monitor = run.first(); FlowMonitor<Integer> monitor = run.first();
// if we peek on the stream too early it probably won't have processed any element. // if we peek on the stream too early it probably won't have processed any element.
printMonitorState.apply(monitor.state()); printMonitorState(monitor.state());
// #monitor
// wait a few millis and peek in the stream again to see what's the latest element processed // exclude from rendered snippet
Thread.sleep(500); Thread.sleep(500);
printMonitorState.apply(monitor.state()); // #monitor
// wait until the stream completed // ...
// sometime later, our code has progressed. We can peek in the stream
// again to see what's the latest element processed
printMonitorState(monitor.state());
// #monitor
// exclude from rendered snippet
run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); run.second().toCompletableFuture().get(1, TimeUnit.SECONDS);
printMonitorState.apply(monitor.state()); // #monitor
// #monitor
// Eventually, the stream completes and if we check the state it reports the streasm finished.
printMonitorState(monitor.state());
// #monitor
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());

View file

@ -48,13 +48,25 @@ class Monitor {
.run() .run()
val flowMonitor = monitoredStream._1 val flowMonitor = monitoredStream._1
// if we peek on the stream early enough it probably won't have processed any element. // if we peek on the stream early enough it probably won't have processed any element.
printMonitorState(flowMonitor) printMonitorState(flowMonitor)
// wait a few millis and peek in the stream again to see what's the latest element processed // #monitor
// exclude from rendered snippet
Thread.sleep(500) Thread.sleep(500)
// #monitor
// ...
// sometime later, our code has progressed. We can peek in the stream
// again to see what's the latest element processed
printMonitorState(flowMonitor) printMonitorState(flowMonitor)
// wait until the stream completed
// #monitor
// exclude from rendered snippet
Await.result(monitoredStream._2, 3.seconds) Await.result(monitoredStream._2, 3.seconds)
// #monitor
// #monitor
// Eventually, the stream completes and if we check the state it reports the streasm finished.
printMonitorState(flowMonitor) printMonitorState(flowMonitor)
// #monitor // #monitor