Introducing watching for completion earlier

So we can terminate the ActorSystem
This commit is contained in:
Arnout Engelen 2017-04-06 11:17:10 +02:00
parent 952fa34c70
commit 18eee18b8c
5 changed files with 71 additions and 36 deletions

View file

@ -0,0 +1,7 @@
//#main-app
public class Main {
public static void main(String[] argv) {
// Code here
}
}
//#main-app

View file

@ -32,19 +32,13 @@ import org.junit.*;
*/ */
public class QuickStartDocTest extends AbstractJavaTest { public class QuickStartDocTest extends AbstractJavaTest {
static
//#create-materializer
final ActorSystem system = ActorSystem.create("QuickStart");
final Materializer materializer = ActorMaterializer.create(system);
//#create-materializer
@AfterClass
public static void teardown() {
system.terminate();
}
@Test @Test
public void demonstrateSource() throws InterruptedException, ExecutionException { public void demonstrateSource() throws InterruptedException, ExecutionException {
//#create-materializer
final ActorSystem system = ActorSystem.create("QuickStart");
final Materializer materializer = ActorMaterializer.create(system);
//#create-materializer
//#create-source //#create-source
final Source<Integer, NotUsed> source = Source.range(1, 100); final Source<Integer, NotUsed> source = Source.range(1, 100);
//#create-source //#create-source
@ -69,16 +63,22 @@ public class QuickStartDocTest extends AbstractJavaTest {
//#use-transformed-sink //#use-transformed-sink
//#add-streams //#add-streams
final CompletionStage<Done> done = factorials
factorials .zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num))
.zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num)) .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping())
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) //#add-streams
//#add-streams .take(2)
.take(2) //#add-streams
//#add-streams .runForeach(s -> System.out.println(s), materializer);
.runForeach(s -> System.out.println(s), materializer);
//#add-streams //#add-streams
//#run-source-and-terminate
final CompletionStage<Done> done =
source.runForeach(i -> System.out.println(i), materializer);
done.thenRun(() -> system.terminate());
//#run-source-and-terminate
done.toCompletableFuture().get(); done.toCompletableFuture().get();
} }

View file

@ -15,6 +15,10 @@ If you want to execute the code samples while you read through the quick start g
.. includecode:: ../code/jdocs/stream/QuickStartDocTest.java#other-imports .. includecode:: ../code/jdocs/stream/QuickStartDocTest.java#other-imports
And a class to hold your code, for example:
.. includecode:: ../code/jdocs/stream/Main.java#main-app
Now we will start with a rather simple source, emitting the integers 1 to 100: Now we will start with a rather simple source, emitting the integers 1 to 100:
.. includecode:: ../code/jdocs/stream/QuickStartDocTest.java#create-source .. includecode:: ../code/jdocs/stream/QuickStartDocTest.java#create-source
@ -38,6 +42,12 @@ setup to an Actor that runs it. This activation is signaled by having “run”
part of the method name; there are other methods that run Akka Streams, and part of the method name; there are other methods that run Akka Streams, and
they all follow this pattern. they all follow this pattern.
When running this program you might notice it does not
terminate, because the :class:`ActorSystem` is never terminated. Luckily
``runForeach`` returns a :class:`CompletionStage<Done>` which resolves when the stream finishes:
.. includecode:: ../code/jdocs/stream/QuickStartDocTest.java#run-source-and-terminate
You may wonder where the Actor gets created that runs the stream, and you are You may wonder where the Actor gets created that runs the stream, and you are
probably also asking yourself what this ``materializer`` means. In order to get probably also asking yourself what this ``materializer`` means. In order to get
this value we first need to create an Actor system: this value we first need to create an Actor system:

View file

@ -20,21 +20,23 @@ import java.nio.file.Paths
import org.scalatest._ import org.scalatest._
import org.scalatest.concurrent._ import org.scalatest.concurrent._
//#main-app
object Main extends App {
// Code here
}
//#main-app
class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures { class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures {
implicit val patience = PatienceConfig(5.seconds) implicit val patience = PatienceConfig(5.seconds)
//#create-materializer
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
//#create-materializer
override def afterAll(): Unit = {
system.terminate()
}
def println(any: Any) = () // silence printing stuff def println(any: Any) = () // silence printing stuff
"demonstrate Source" in { "demonstrate Source" in {
//#create-materializer
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
//#create-materializer
//#create-source //#create-source
val source: Source[Int, NotUsed] = Source(1 to 100) val source: Source[Int, NotUsed] = Source(1 to 100)
//#create-source //#create-source
@ -57,16 +59,22 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture
//#use-transformed-sink //#use-transformed-sink
//#add-streams //#add-streams
val done: Future[Done] = factorials
factorials .zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num")
.zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num") .throttle(1, 1.second, 1, ThrottleMode.shaping)
.throttle(1, 1.second, 1, ThrottleMode.shaping) //#add-streams
//#add-streams .take(3)
.take(3) //#add-streams
//#add-streams .runForeach(println)
.runForeach(println)
//#add-streams //#add-streams
//#run-source-and-terminate
val done: Future[Done] = source.runForeach(i => println(i))(materializer)
implicit val ec = system.dispatcher
done.onComplete(_ => system.terminate())
//#run-source-and-terminate
done.futureValue done.futureValue
} }

View file

@ -15,6 +15,10 @@ If you want to execute the code samples while you read through the quick start g
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#other-imports .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#other-imports
And an object to hold your code, for example:
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#main-app
Now we will start with a rather simple source, emitting the integers 1 to 100: Now we will start with a rather simple source, emitting the integers 1 to 100:
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-source .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-source
@ -38,6 +42,12 @@ setup to an Actor that runs it. This activation is signaled by having “run”
part of the method name; there are other methods that run Akka Streams, and part of the method name; there are other methods that run Akka Streams, and
they all follow this pattern. they all follow this pattern.
When running this source in a :class:`scala.App` you might notice it does not
terminate, because the :class:`ActorSystem` is never terminated. Luckily
``runForeach`` returns a :class:`Future[Done]` which resolves when the stream finishes:
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#run-source-and-terminate
You may wonder where the Actor gets created that runs the stream, and you are You may wonder where the Actor gets created that runs the stream, and you are
probably also asking yourself what this ``materializer`` means. In order to get probably also asking yourself what this ``materializer`` means. In order to get
this value we first need to create an Actor system: this value we first need to create an Actor system: