From 024710a6e0861585ae8591358c9c503f65f473d1 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 7 Jul 2020 14:58:56 +0200 Subject: [PATCH] Headers and formats --- .../operators/sourceorflow/Monitor.java | 109 +++++++++--------- .../operators/sourceorflow/Monitor.scala | 15 ++- 2 files changed, 61 insertions(+), 63 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java index a831d0b92c..4a08e1023f 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Monitor.java @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package jdocs.stream.operators.sourceorflow; import akka.Done; @@ -16,66 +20,61 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -/** - * - */ +/** */ public class Monitor { - // #monitor - private static void printMonitorState(FlowMonitorState.StreamState state) { - if (FlowMonitorState.Finished$.class.isInstance(state)) { - System.out.println("Stream is initialized but hasn't processed any element"); - } else if (FlowMonitorState.Received.class.isInstance(state)) { - FlowMonitorState.Received msg = (FlowMonitorState.Received) state; - System.out.println("Last message received: " + msg.msg()); - } else if (FlowMonitorState.Failed.class.isInstance(state)) { - Throwable cause = ((FlowMonitorState.Failed) state).cause(); - System.out.println("Stream failed with cause: " + cause.getMessage()); - } else { - System.out.println("Stream completed already"); - } + // #monitor + private static void printMonitorState(FlowMonitorState.StreamState state) { + if (FlowMonitorState.Finished$.class.isInstance(state)) { + System.out.println("Stream is initialized but hasn't processed any element"); + } else if (FlowMonitorState.Received.class.isInstance(state)) { + FlowMonitorState.Received msg = (FlowMonitorState.Received) state; + System.out.println("Last message received: " + msg.msg()); + } else if (FlowMonitorState.Failed.class.isInstance(state)) { + Throwable cause = ((FlowMonitorState.Failed) state).cause(); + System.out.println("Stream failed with cause: " + cause.getMessage()); + } else { + System.out.println("Stream completed already"); } + } + // #monitor + + public static void main(String[] args) + throws InterruptedException, TimeoutException, ExecutionException { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + // #monitor + Source> monitoredSource = + Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) + .throttle(5, Duration.ofSeconds(1)) + .monitorMat(Keep.right()); + + Pair, CompletionStage> run = + monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); + + FlowMonitor monitor = run.first(); + + // if we peek on the stream too early it probably won't have processed any element. + printMonitorState(monitor.state()); + // #monitor + // exclude from rendered snippet + Thread.sleep(500); // #monitor + // ... + // sometime later, our code has progressed. We can peek in the stream + // again to see what's the latest element processed + printMonitorState(monitor.state()); - public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException { - ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + // #monitor + // exclude from rendered snippet + run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); + // #monitor + // #monitor + // Eventually, the stream completes and if we check the state it reports the streasm finished. + printMonitorState(monitor.state()); + // #monitor - // #monitor - Source> monitoredSource = - Source - .fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) - .throttle(5, Duration.ofSeconds(1)) - .monitorMat(Keep.right()); - - - Pair, CompletionStage> run = monitoredSource - .toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); - - FlowMonitor monitor = run.first(); - - // if we peek on the stream too early it probably won't have processed any element. - printMonitorState(monitor.state()); - // #monitor - // exclude from rendered snippet - Thread.sleep(500); - // #monitor - - // ... - // sometime later, our code has progressed. We can peek in the stream - // again to see what's the latest element processed - printMonitorState(monitor.state()); - - // #monitor - // exclude from rendered snippet - run.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - // #monitor - // #monitor - // Eventually, the stream completes and if we check the state it reports the streasm finished. - printMonitorState(monitor.state()); - // #monitor - - run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); - - } + run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate()); + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala index 48047e7718..e304ef581e 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Monitor.scala @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package docs.stream.operators.sourceorflow import akka.Done @@ -15,7 +19,7 @@ import scala.concurrent.Future import scala.concurrent.duration._ /** - * + * */ class Monitor { @@ -38,14 +42,9 @@ class Monitor { } } - val monitoredSource: Source[Int, FlowMonitor[Int]] = source - .take(6) - .throttle(5, 1.second) - .monitorMat(Keep.right) + val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right) val monitoredStream: (FlowMonitor[Int], Future[Done]) = - monitoredSource - .toMat(Sink.foreach(println))(Keep.both) - .run() + monitoredSource.toMat(Sink.foreach(println))(Keep.both).run() val flowMonitor = monitoredStream._1