PR comments
This commit is contained in:
parent
f187abddd1
commit
6c12ae117a
3 changed files with 13 additions and 27 deletions
|
|
@ -6,14 +6,14 @@ Materializes to a `FlowMonitor` that monitors messages flowing through or comple
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" }
|
@apidoc[Source.monitor](Source) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" }
|
||||||
@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor(akka.japi.function.Function2)" java="#monitor()" }
|
@apidoc[Flow.monitor](Flow) { scala="#monitor[Mat2]()(combine:(Mat,akka.stream.FlowMonitor[Out])=>Mat2):FlowOpsMat.this.ReprMat[Out,Mat2]" java="#monitor()" }
|
||||||
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the operators. The operators otherwise
|
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stream. Elements
|
||||||
passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an
|
pass through unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an
|
||||||
event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek
|
event, and may therefore affect performance. The provided `FlowMonitor` contains a `state` field you can use to peek
|
||||||
and get information about the stream.
|
and get information about the stream.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,12 +26,12 @@ public class Monitor {
|
||||||
|
|
||||||
// #monitor
|
// #monitor
|
||||||
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
|
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
|
||||||
if (FlowMonitorState.Finished$.class.isInstance(state)) {
|
if (state == FlowMonitorState.finished()) {
|
||||||
System.out.println("Stream is initialized but hasn't processed any element");
|
System.out.println("Stream is initialized but hasn't processed any element");
|
||||||
} else if (FlowMonitorState.Received.class.isInstance(state)) {
|
} else if (state instanceof FlowMonitorState.Received) {
|
||||||
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
|
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
|
||||||
System.out.println("Last message received: " + msg.msg());
|
System.out.println("Last message received: " + msg.msg());
|
||||||
} else if (FlowMonitorState.Failed.class.isInstance(state)) {
|
} else if (state instanceof FlowMonitorState.Failed) {
|
||||||
Throwable cause = ((FlowMonitorState.Failed) state).cause();
|
Throwable cause = ((FlowMonitorState.Failed) state).cause();
|
||||||
System.out.println("Stream failed with cause: " + cause.getMessage());
|
System.out.println("Stream failed with cause: " + cause.getMessage());
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -60,13 +60,7 @@ public class Monitor {
|
||||||
|
|
||||||
// Periodically check the monitor
|
// Periodically check the monitor
|
||||||
Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
|
Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
|
||||||
.map(
|
.runForeach(__ -> printMonitorState(monitor.state()), actorSystem);
|
||||||
__ -> {
|
|
||||||
printMonitorState(monitor.state());
|
|
||||||
return NotUsed.getInstance();
|
|
||||||
})
|
|
||||||
.to(Sink.ignore())
|
|
||||||
.run(actorSystem);
|
|
||||||
// #monitor
|
// #monitor
|
||||||
|
|
||||||
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
|
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
|
|
||||||
package docs.stream.operators.sourceorflow
|
package docs.stream.operators.sourceorflow
|
||||||
|
|
||||||
import akka.Done
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.stream.FlowMonitor
|
import akka.stream.FlowMonitor
|
||||||
|
|
@ -14,12 +13,8 @@ import akka.stream.scaladsl.Sink
|
||||||
import akka.stream.scaladsl.Source
|
import akka.stream.scaladsl.Source
|
||||||
|
|
||||||
import scala.concurrent.ExecutionContextExecutor
|
import scala.concurrent.ExecutionContextExecutor
|
||||||
import scala.concurrent.Future
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
class Monitor {
|
class Monitor {
|
||||||
|
|
||||||
implicit val system = ActorSystem("monitor-sample-sys2")
|
implicit val system = ActorSystem("monitor-sample-sys2")
|
||||||
|
|
@ -29,7 +24,7 @@ class Monitor {
|
||||||
val source: Source[Int, NotUsed] =
|
val source: Source[Int, NotUsed] =
|
||||||
Source.fromIterator(() => Iterator.from(0))
|
Source.fromIterator(() => Iterator.from(0))
|
||||||
|
|
||||||
def printMonitorState(flowMonitor: FlowMonitor[Int]) = {
|
def printMonitorState(flowMonitor: FlowMonitor[Int]) =
|
||||||
flowMonitor.state match {
|
flowMonitor.state match {
|
||||||
case FlowMonitorState.Initialized =>
|
case FlowMonitorState.Initialized =>
|
||||||
println("Stream is initialized but hasn't processed any element")
|
println("Stream is initialized but hasn't processed any element")
|
||||||
|
|
@ -39,21 +34,18 @@ class Monitor {
|
||||||
println(s"Stream failed with cause $cause")
|
println(s"Stream failed with cause $cause")
|
||||||
case FlowMonitorState.Finished => println(s"Stream completed already")
|
case FlowMonitorState.Finished => println(s"Stream completed already")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right)
|
val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right)
|
||||||
val monitoredStream: (FlowMonitor[Int], Future[Done]) =
|
val (flowMonitor, futureDone) =
|
||||||
monitoredSource.toMat(Sink.foreach(println))(Keep.both).run()
|
monitoredSource.toMat(Sink.foreach(println))(Keep.both).run()
|
||||||
|
|
||||||
val flowMonitor = monitoredStream._1
|
|
||||||
|
|
||||||
// If we peek in the monitor too early, it's possible it was not initialized yet.
|
// If we peek in the monitor too early, it's possible it was not initialized yet.
|
||||||
printMonitorState(flowMonitor)
|
printMonitorState(flowMonitor)
|
||||||
|
|
||||||
// Periodically check the monitor
|
// Periodically check the monitor
|
||||||
Source.tick(200.millis, 400.millis, "").map(_ => printMonitorState(flowMonitor)).to(Sink.ignore).run
|
Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor))
|
||||||
|
|
||||||
// #monitor
|
// #monitor
|
||||||
monitoredStream._2.onComplete(_ => system.terminate())
|
futureDone.onComplete(_ => system.terminate())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue