Doc example of Sink.ignore, #25468 (#28753)

This commit is contained in:
Patrik Nordwall 2020-03-18 12:42:20 +01:00 committed by GitHub
parent 816e0498f9
commit 94d5b04d93
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 5 deletions

View file

@ -7,12 +7,14 @@ package jdocs.stream.operators;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
// #takeLast-operator-example
import akka.japi.Pair;
// #takeLast-operator-example
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@ -45,18 +47,19 @@ public class SinkDocExamples {
static void takeLastExample() {
// #takeLast-operator-example
// pair of (Name, GPA)
List<Pair> sortedStudents =
List<Pair<String, Double>> sortedStudents =
Arrays.asList(
new Pair<>("Benita", 2.1),
new Pair<>("Adrian", 3.1),
new Pair<>("Alexis", 4),
new Pair<>("Alexis", 4.0),
new Pair<>("Kendra", 4.2),
new Pair<>("Jerrie", 4.3),
new Pair<>("Alison", 4.7));
Source<Pair, NotUsed> studentSource = Source.from(sortedStudents);
Source<Pair<String, Double>, NotUsed> studentSource = Source.from(sortedStudents);
CompletionStage<List<Pair>> topThree = studentSource.runWith(Sink.takeLast(3), system);
CompletionStage<List<Pair<String, Double>>> topThree =
studentSource.runWith(Sink.takeLast(3), system);
topThree.thenAccept(
result -> {
@ -92,4 +95,24 @@ public class SinkDocExamples {
// Optional.empty
// #lastOption-operator-example
}
static void ignoreExample() {
// #ignore
Source<String, NotUsed> lines = readLinesFromFile();
Source<UUID, NotUsed> databaseIds = lines.mapAsync(1, line -> saveLineToDatabase(line));
databaseIds.mapAsync(1, uuid -> writeIdToFile(uuid)).runWith(Sink.ignore(), system);
// #ignore
}
private static Source<String, NotUsed> readLinesFromFile() {
return Source.empty();
}
private static CompletionStage<UUID> saveLineToDatabase(String line) {
return CompletableFuture.completedFuture(UUID.randomUUID());
}
private static CompletionStage<UUID> writeIdToFile(UUID uuid) {
return CompletableFuture.completedFuture(uuid);
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sink
import java.util.UUID
import scala.concurrent.Future
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Ignore {
implicit val system: ActorSystem = ???
def ignoreExample(): Unit = {
//#ignore
val lines: Source[String, NotUsed] = readLinesFromFile()
val databaseIds: Source[UUID, NotUsed] =
lines.mapAsync(1)(line => saveLineToDatabase(line))
databaseIds.mapAsync(1)(uuid => writeIdToFile(uuid)).runWith(Sink.ignore)
//#ignore
}
private def readLinesFromFile(): Source[String, NotUsed] =
Source.empty
private def saveLineToDatabase(line: String): Future[UUID] =
Future.successful(UUID.randomUUID())
private def writeIdToFile(uuid: UUID): Future[UUID] =
Future.successful(uuid)
}