diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/seq.md b/akka-docs/src/main/paradox/stream/operators/Sink/seq.md index ab5259b913..662e764504 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/seq.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/seq.md @@ -18,6 +18,16 @@ Collect values emitted from the stream into a collection, the collection is avai which completes when the stream completes. Note that the collection is bounded to @scala[`Int.MaxValue`] @java[`Integer.MAX_VALUE`], if more element are emitted the sink will cancel the stream +## Example + +Given a stream of numbers we can collect the numbers into a collection with the `seq` operator + +Scala +: @@snip [SinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #seq-operator-example } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #seq-operator-example } + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java index f2a999e14a..8b8731e943 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -21,7 +21,7 @@ public class SinkDocExamples { private static final ActorSystem system = ActorSystem.create("SourceFromExample"); - static void reduceExample() throws InterruptedException, ExecutionException, TimeoutException { + static void reduceExample() { // #reduce-operator-example Source ints = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); @@ -31,7 +31,18 @@ public class SinkDocExamples { // #reduce-operator-example } - static void takeLastExample() throws InterruptedException, ExecutionException, TimeoutException { + static void seqExample() { + // #seq-operator-example + Source ints = Source.from(Arrays.asList(1, 2, 3)); + CompletionStage> result = ints.runWith(Sink.seq(), system); + result.thenAccept(list -> list.forEach(System.out::println)); + // 1 + // 2 + // 3 + // #seq-operator-example + } + + static void takeLastExample() { // #takeLast-operator-example // pair of (Name, GPA) List sortedStudents = @@ -64,7 +75,7 @@ public class SinkDocExamples { // #takeLast-operator-example } - static void lastExample() throws InterruptedException, ExecutionException, TimeoutException { + static void lastExample() { // #last-operator-example Source source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); CompletionStage result = source.runWith(Sink.last(), system); @@ -73,8 +84,7 @@ public class SinkDocExamples { // #last-operator-example } - static void lastOptionExample() - throws InterruptedException, ExecutionException, TimeoutException { + static void lastOptionExample() { // #lastOption-operator-example Source source = Source.empty(); CompletionStage> result = source.runWith(Sink.lastOption(), system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index ea98b59fff..307bc8de85 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -228,6 +228,21 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The seq sink" must { + "collect the streamed elements into a sequence" in { + // #seq-operator-example + val source = Source(1 to 3) + val result = source.runWith(Sink.seq[Int]) + val seq = result.futureValue + seq.foreach(println) + // 1 + // 2 + // 3 + // #seq-operator-example + assert(seq == Vector(1, 2, 3)) + } + } + "Sink pre-materialization" must { "materialize the sink and wrap its exposed publisher in a Source" in { val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)