From 94d5b04d93554935b578fbf64f396d89decc9f60 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 18 Mar 2020 12:42:20 +0100 Subject: [PATCH] Doc example of Sink.ignore, #25468 (#28753) --- .../paradox/stream/operators/Sink/ignore.md | 15 +++++++- .../stream/operators/SinkDocExamples.java | 31 ++++++++++++++-- .../docs/stream/operators/sink/Ignore.scala | 37 +++++++++++++++++++ 3 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sink/Ignore.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/ignore.md b/akka-docs/src/main/paradox/stream/operators/Sink/ignore.md index b4ab4d2e48..06dcb28516 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/ignore.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/ignore.md @@ -15,7 +15,20 @@ Consume all elements but discards them. ## Description Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually -do anything with the elements. +do anything with the elements in the `Sink`. Processing of the elements may occur in the `Source`/`Flow`. + +## Example + +This examples reads lines from a file, saves them to a database, and stores the database identifiers in +another file. The stream is run with `Sink.ignore` because all processing of the elements have been performed +by the preceding stream operators. + +Scala +: @@snip [Ignore.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/Ignore.scala) { #ignore } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #ignore } + ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java index 8b8731e943..ac97a216c0 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -7,12 +7,14 @@ package jdocs.stream.operators; import akka.NotUsed; import akka.actor.ActorSystem; +import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; // #takeLast-operator-example import akka.japi.Pair; // #takeLast-operator-example import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -45,18 +47,19 @@ public class SinkDocExamples { static void takeLastExample() { // #takeLast-operator-example // pair of (Name, GPA) - List sortedStudents = + List> sortedStudents = Arrays.asList( new Pair<>("Benita", 2.1), new Pair<>("Adrian", 3.1), - new Pair<>("Alexis", 4), + new Pair<>("Alexis", 4.0), new Pair<>("Kendra", 4.2), new Pair<>("Jerrie", 4.3), new Pair<>("Alison", 4.7)); - Source studentSource = Source.from(sortedStudents); + Source, NotUsed> studentSource = Source.from(sortedStudents); - CompletionStage> topThree = studentSource.runWith(Sink.takeLast(3), system); + CompletionStage>> topThree = + studentSource.runWith(Sink.takeLast(3), system); topThree.thenAccept( result -> { @@ -92,4 +95,24 @@ public class SinkDocExamples { // Optional.empty // #lastOption-operator-example } + + static void ignoreExample() { + // #ignore + Source lines = readLinesFromFile(); + Source databaseIds = lines.mapAsync(1, line -> saveLineToDatabase(line)); + databaseIds.mapAsync(1, uuid -> writeIdToFile(uuid)).runWith(Sink.ignore(), system); + // #ignore + } + + private static Source readLinesFromFile() { + return Source.empty(); + } + + private static CompletionStage saveLineToDatabase(String line) { + return CompletableFuture.completedFuture(UUID.randomUUID()); + } + + private static CompletionStage writeIdToFile(UUID uuid) { + return CompletableFuture.completedFuture(uuid); + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sink/Ignore.scala b/akka-docs/src/test/scala/docs/stream/operators/sink/Ignore.scala new file mode 100644 index 0000000000..3c4cf5e928 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sink/Ignore.scala @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sink + +import java.util.UUID + +import scala.concurrent.Future + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +object Ignore { + implicit val system: ActorSystem = ??? + + def ignoreExample(): Unit = { + //#ignore + val lines: Source[String, NotUsed] = readLinesFromFile() + val databaseIds: Source[UUID, NotUsed] = + lines.mapAsync(1)(line => saveLineToDatabase(line)) + databaseIds.mapAsync(1)(uuid => writeIdToFile(uuid)).runWith(Sink.ignore) + //#ignore + } + + private def readLinesFromFile(): Source[String, NotUsed] = + Source.empty + + private def saveLineToDatabase(line: String): Future[UUID] = + Future.successful(UUID.randomUUID()) + + private def writeIdToFile(uuid: UUID): Future[UUID] = + Future.successful(uuid) + +}