Docs: sink seq operator examples (#28616)

This commit is contained in:
Razvan Vacaru 2020-02-17 10:26:53 +01:00 committed by GitHub
parent 56c13dcde5
commit bb895619b9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 5 deletions

View file

@ -18,6 +18,16 @@ Collect values emitted from the stream into a collection, the collection is avai
which completes when the stream completes. Note that the collection is bounded to @scala[`Int.MaxValue`] @java[`Integer.MAX_VALUE`],
if more element are emitted the sink will cancel the stream
## Example
Given a stream of numbers we can collect the numbers into a collection with the `seq` operator
Scala
: @@snip [SinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #seq-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #seq-operator-example }
## Reactive Streams semantics
@@@div { .callout }

View file

@ -21,7 +21,7 @@ public class SinkDocExamples {
private static final ActorSystem system = ActorSystem.create("SourceFromExample");
static void reduceExample() throws InterruptedException, ExecutionException, TimeoutException {
static void reduceExample() {
// #reduce-operator-example
Source<Integer, NotUsed> ints = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
@ -31,7 +31,18 @@ public class SinkDocExamples {
// #reduce-operator-example
}
static void takeLastExample() throws InterruptedException, ExecutionException, TimeoutException {
static void seqExample() {
// #seq-operator-example
Source<Integer, NotUsed> ints = Source.from(Arrays.asList(1, 2, 3));
CompletionStage<List<Integer>> result = ints.runWith(Sink.seq(), system);
result.thenAccept(list -> list.forEach(System.out::println));
// 1
// 2
// 3
// #seq-operator-example
}
static void takeLastExample() {
// #takeLast-operator-example
// pair of (Name, GPA)
List<Pair> sortedStudents =
@ -64,7 +75,7 @@ public class SinkDocExamples {
// #takeLast-operator-example
}
static void lastExample() throws InterruptedException, ExecutionException, TimeoutException {
static void lastExample() {
// #last-operator-example
Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
CompletionStage<Integer> result = source.runWith(Sink.last(), system);
@ -73,8 +84,7 @@ public class SinkDocExamples {
// #last-operator-example
}
static void lastOptionExample()
throws InterruptedException, ExecutionException, TimeoutException {
static void lastOptionExample() {
// #lastOption-operator-example
Source<Integer, NotUsed> source = Source.empty();
CompletionStage<Optional<Integer>> result = source.runWith(Sink.lastOption(), system);

View file

@ -228,6 +228,21 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
}
"The seq sink" must {
"collect the streamed elements into a sequence" in {
// #seq-operator-example
val source = Source(1 to 3)
val result = source.runWith(Sink.seq[Int])
val seq = result.futureValue
seq.foreach(println)
// 1
// 2
// 3
// #seq-operator-example
assert(seq == Vector(1, 2, 3))
}
}
"Sink pre-materialization" must {
"materialize the sink and wrap its exposed publisher in a Source" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)