Adds sample code for the monitor operator

This commit is contained in:
Ignasi Marimon-Clos 2020-07-07 13:09:54 +02:00
parent 2f2ee9e67c
commit 95a000a51b
3 changed files with 176 additions and 1 deletions

View file

@ -14,7 +14,35 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise
passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an
event, and may therefore affect performance.
event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek
and get information about the stream.
## Example
The example below uses the `monitorMat` variant of `monitor`. The only difference betwee the two operators is
that `monitorMat` has a `combine` argument so we can decide which materialization value to keep. In the sample
below be `Keep.right` so only the `FlowMonitor[Int]` is returned.
Scala
: @@snip [Monitor.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala) { #monitor }
Java
: @@snip [Monitor.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java) { #monitor }
When run, the sample code will produce something similar to:
```
Stream is initialized but hasn't processed any element
0
1
2
Last element processed: 2
3
4
5
Stream completed already
```
## Reactive Streams semantics

View file

@ -0,0 +1,84 @@
package jdocs.stream.operators.sourceorflow;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.japi.pf.Match;
import akka.stream.FlowMonitor;
import akka.stream.FlowMonitorState;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.PartialFunction;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*
*/
public class Monitor {
private static final PartialFunction<Object, NotUsed> printMonitorState = Match.match(
FlowMonitorState.Initialized$.class,
__ -> {
System.out.println("Stream is initialized but hasn't processed any element");
return NotUsed.getInstance();
}
).match(
FlowMonitorState.Received.class,
(msg) -> {
System.out.println("Last message received: " + msg.msg());
return NotUsed.getInstance();
}
).match(
FlowMonitorState.Failed.class,
(failed) -> {
System.out.println("Stream failed with cause: " + failed.cause().getMessage());
return NotUsed.getInstance();
}
).match(
FlowMonitorState.Finished$.class,
__ -> {
System.out.println("Stream completed already");
return NotUsed.getInstance();
}
).build();
public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException {
ActorSystem actorSystem = ActorSystem.create("25fps-stream");
Source<Integer, FlowMonitor<Integer>> monitoredSource =
Source
.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
.throttle(5, Duration.ofSeconds(1))
.monitorMat(Keep.right());
Pair<FlowMonitor<Integer>, CompletionStage<Done>> run = monitoredSource
.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);
FlowMonitor<Integer> monitor = run.first();
// if we peek on the stream too early it probably won't have processed any element.
printMonitorState.apply(monitor.state());
// wait a few millis and peek in the stream again to see what's the latest element processed
Thread.sleep(500);
printMonitorState.apply(monitor.state());
// wait until the stream completed
run.second().toCompletableFuture().get(1, TimeUnit.SECONDS);
printMonitorState.apply(monitor.state());
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
}
}

View file

@ -0,0 +1,63 @@
package docs.stream.operators.sourceorflow
import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.FlowMonitor
import akka.stream.FlowMonitorState
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import scala.concurrent.Await
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import scala.concurrent.duration._
/**
*
*/
class Monitor {
implicit val system = ActorSystem("monitor-sample-sys2")
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
// #monitor
val source: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.from(0))
def printMonitorState(flowMonitor: FlowMonitor[Int]) = {
flowMonitor.state match {
case FlowMonitorState.Initialized =>
println("Stream is initialized but hasn't processed any element")
case FlowMonitorState.Received(msg) =>
println(s"Last element processed: $msg")
case FlowMonitorState.Failed(cause) =>
println(s"Stream failed with cause $cause")
case FlowMonitorState.Finished => println(s"Stream completed already")
}
}
val monitoredSource: Source[Int, FlowMonitor[Int]] = source
.take(6)
.throttle(5, 1.second)
.monitorMat(Keep.right)
val monitoredStream: (FlowMonitor[Int], Future[Done]) =
monitoredSource
.toMat(Sink.foreach(println))(Keep.both)
.run()
val flowMonitor = monitoredStream._1
// if we peek on the stream early enough it probably won't have processed any element.
printMonitorState(flowMonitor)
// wait a few millis and peek in the stream again to see what's the latest element processed
Thread.sleep(500)
printMonitorState(flowMonitor)
// wait until the stream completed
Await.result(monitoredStream._2, 3.seconds)
printMonitorState(flowMonitor)
// #monitor
monitoredStream._2.onComplete(_ => system.terminate())
}