Headers and formats
This commit is contained in:
parent
0e3b4605ad
commit
024710a6e0
2 changed files with 61 additions and 63 deletions
|
|
@ -1,3 +1,7 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
package jdocs.stream.operators.sourceorflow;
|
package jdocs.stream.operators.sourceorflow;
|
||||||
|
|
||||||
import akka.Done;
|
import akka.Done;
|
||||||
|
|
@ -16,66 +20,61 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
/**
|
/** */
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class Monitor {
|
public class Monitor {
|
||||||
|
|
||||||
// #monitor
|
// #monitor
|
||||||
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
|
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
|
||||||
if (FlowMonitorState.Finished$.class.isInstance(state)) {
|
if (FlowMonitorState.Finished$.class.isInstance(state)) {
|
||||||
System.out.println("Stream is initialized but hasn't processed any element");
|
System.out.println("Stream is initialized but hasn't processed any element");
|
||||||
} else if (FlowMonitorState.Received.class.isInstance(state)) {
|
} else if (FlowMonitorState.Received.class.isInstance(state)) {
|
||||||
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
|
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
|
||||||
System.out.println("Last message received: " + msg.msg());
|
System.out.println("Last message received: " + msg.msg());
|
||||||
} else if (FlowMonitorState.Failed.class.isInstance(state)) {
|
} else if (FlowMonitorState.Failed.class.isInstance(state)) {
|
||||||
Throwable cause = ((FlowMonitorState.Failed) state).cause();
|
Throwable cause = ((FlowMonitorState.Failed) state).cause();
|
||||||
System.out.println("Stream failed with cause: " + cause.getMessage());
|
System.out.println("Stream failed with cause: " + cause.getMessage());
|
||||||
} else {
|
} else {
|
||||||
System.out.println("Stream completed already");
|
System.out.println("Stream completed already");
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// #monitor
|
||||||
|
|
||||||
|
public static void main(String[] args)
|
||||||
|
throws InterruptedException, TimeoutException, ExecutionException {
|
||||||
|
ActorSystem actorSystem = ActorSystem.create("25fps-stream");
|
||||||
|
|
||||||
|
// #monitor
|
||||||
|
Source<Integer, FlowMonitor<Integer>> monitoredSource =
|
||||||
|
Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
|
||||||
|
.throttle(5, Duration.ofSeconds(1))
|
||||||
|
.monitorMat(Keep.right());
|
||||||
|
|
||||||
|
Pair<FlowMonitor<Integer>, CompletionStage<Done>> run =
|
||||||
|
monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);
|
||||||
|
|
||||||
|
FlowMonitor<Integer> monitor = run.first();
|
||||||
|
|
||||||
|
// if we peek on the stream too early it probably won't have processed any element.
|
||||||
|
printMonitorState(monitor.state());
|
||||||
|
// #monitor
|
||||||
|
// exclude from rendered snippet
|
||||||
|
Thread.sleep(500);
|
||||||
// #monitor
|
// #monitor
|
||||||
|
|
||||||
|
// ...
|
||||||
|
// sometime later, our code has progressed. We can peek in the stream
|
||||||
|
// again to see what's the latest element processed
|
||||||
|
printMonitorState(monitor.state());
|
||||||
|
|
||||||
public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException {
|
// #monitor
|
||||||
ActorSystem actorSystem = ActorSystem.create("25fps-stream");
|
// exclude from rendered snippet
|
||||||
|
run.second().toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||||
|
// #monitor
|
||||||
|
// #monitor
|
||||||
|
// Eventually, the stream completes and if we check the state it reports the streasm finished.
|
||||||
|
printMonitorState(monitor.state());
|
||||||
|
// #monitor
|
||||||
|
|
||||||
// #monitor
|
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
|
||||||
Source<Integer, FlowMonitor<Integer>> monitoredSource =
|
}
|
||||||
Source
|
|
||||||
.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
|
|
||||||
.throttle(5, Duration.ofSeconds(1))
|
|
||||||
.monitorMat(Keep.right());
|
|
||||||
|
|
||||||
|
|
||||||
Pair<FlowMonitor<Integer>, CompletionStage<Done>> run = monitoredSource
|
|
||||||
.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);
|
|
||||||
|
|
||||||
FlowMonitor<Integer> monitor = run.first();
|
|
||||||
|
|
||||||
// if we peek on the stream too early it probably won't have processed any element.
|
|
||||||
printMonitorState(monitor.state());
|
|
||||||
// #monitor
|
|
||||||
// exclude from rendered snippet
|
|
||||||
Thread.sleep(500);
|
|
||||||
// #monitor
|
|
||||||
|
|
||||||
// ...
|
|
||||||
// sometime later, our code has progressed. We can peek in the stream
|
|
||||||
// again to see what's the latest element processed
|
|
||||||
printMonitorState(monitor.state());
|
|
||||||
|
|
||||||
// #monitor
|
|
||||||
// exclude from rendered snippet
|
|
||||||
run.second().toCompletableFuture().get(1, TimeUnit.SECONDS);
|
|
||||||
// #monitor
|
|
||||||
// #monitor
|
|
||||||
// Eventually, the stream completes and if we check the state it reports the streasm finished.
|
|
||||||
printMonitorState(monitor.state());
|
|
||||||
// #monitor
|
|
||||||
|
|
||||||
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,7 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
package docs.stream.operators.sourceorflow
|
package docs.stream.operators.sourceorflow
|
||||||
|
|
||||||
import akka.Done
|
import akka.Done
|
||||||
|
|
@ -15,7 +19,7 @@ import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class Monitor {
|
class Monitor {
|
||||||
|
|
||||||
|
|
@ -38,14 +42,9 @@ class Monitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val monitoredSource: Source[Int, FlowMonitor[Int]] = source
|
val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right)
|
||||||
.take(6)
|
|
||||||
.throttle(5, 1.second)
|
|
||||||
.monitorMat(Keep.right)
|
|
||||||
val monitoredStream: (FlowMonitor[Int], Future[Done]) =
|
val monitoredStream: (FlowMonitor[Int], Future[Done]) =
|
||||||
monitoredSource
|
monitoredSource.toMat(Sink.foreach(println))(Keep.both).run()
|
||||||
.toMat(Sink.foreach(println))(Keep.both)
|
|
||||||
.run()
|
|
||||||
|
|
||||||
val flowMonitor = monitoredStream._1
|
val flowMonitor = monitoredStream._1
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue