From 6ed320fedcdd38aece3e14e73865b4274f9c8dbd Mon Sep 17 00:00:00 2001 From: kerr Date: Mon, 30 May 2022 17:36:58 +0800 Subject: [PATCH] +str Add Sink.collect to javadsl. (#31371) --- .../paradox/stream/operators/Sink/collect.md | 34 +++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../java/akka/stream/javadsl/SinkTest.java | 11 ++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 38 +++++++++++++------ 4 files changed, 73 insertions(+), 12 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/collect.md diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/collect.md b/akka-docs/src/main/paradox/stream/operators/Sink/collect.md new file mode 100644 index 0000000000..f1493a0433 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/collect.md @@ -0,0 +1,34 @@ +# Sink.collect + +Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector). + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.collect](Sink$) { java="#collect(java.util.stream.Collector)" } + +## Description + +A @javadoc[Sink](akka.stream.javadsl.Sink) which materializes into a @javadoc[CompletionStage](java.util.concurrent.CompletionStage) +which will be completed with a result of the Java @javadoc[Collector](java.util.stream.Collector) transformation and reduction operations. + +## Example + +Given a stream of numbers we can collect the numbers into a collection with the `seq` operator + +Java +: @@snip [SinkTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java) { #collect-to-list } + + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** when the @javadoc[Collector](java.util.stream.Collector) throws an exception + +**backpressures** when the @javadoc[Collector](java.util.stream.Collector)'s previous accumulation is still in progress + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index a3b777f49e..e2f108b2e1 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -56,6 +56,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |--|--|--| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| +|Sink|@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).| |Sink|@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].| |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | @@ -399,6 +400,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [buffer](Source-or-Flow/buffer.md) * [cancelled](Sink/cancelled.md) * [collect](Source-or-Flow/collect.md) +* [collect](Sink/collect.md) * [collection](Sink/collection.md) * [collectType](Source-or-Flow/collectType.md) * [combine](Source/combine.md) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 4558d90f12..727feaf6ea 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -77,6 +77,17 @@ public class SinkTest extends StreamTest { CompletionStage> result = Source.from(list).runWith(collectorSink, system); assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS)); } + + @Test + public void mustBeAbleToUseCollectorOnSink() throws Exception { + //#collect-to-list + final List list = Arrays.asList(1, 2, 3); + CompletionStage> result = + Source.from(list) + .runWith(Sink.collect(Collectors.toList()), system); + //#collect-to-list + assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS)); + } @Test public void mustBeAbleToCombine() throws Exception { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 3eeef8b63d..4921d8ab96 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -4,11 +4,26 @@ package akka.stream.javadsl +import akka._ +import akka.actor.ActorRef +import akka.actor.ClassicActorSystemProvider +import akka.actor.Status +import akka.dispatch.ExecutionContexts +import akka.japi.function +import akka.japi.function.Creator +import akka.stream._ +import akka.stream.impl.LinearTraversalBuilder +import akka.stream.javadsl +import akka.stream.scaladsl +import akka.stream.scaladsl.SinkToCompletionStage +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + import java.util.Optional import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import java.util.function.BiFunction - +import java.util.stream.Collector import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.compat.java8.FutureConverters._ @@ -16,17 +31,6 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionContext import scala.util.Try -import org.reactivestreams.{ Publisher, Subscriber } - -import akka._ -import akka.actor.{ ActorRef, ClassicActorSystemProvider, Status } -import akka.dispatch.ExecutionContexts -import akka.japi.function -import akka.japi.function.Creator -import akka.stream.{ javadsl, scaladsl, _ } -import akka.stream.impl.LinearTraversalBuilder -import akka.stream.scaladsl.SinkToCompletionStage - /** Java API */ object Sink { @@ -52,6 +56,16 @@ object Sink { f: function.Function2[U, In, CompletionStage[U]]): javadsl.Sink[In, CompletionStage[U]] = new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).toScala).toCompletionStage()) + /** + * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector`` + * transformation and reduction operations. This allows usage of Java streams transformations for reactive streams. + * The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable + * result container, optionally transformed into a final representation after all input elements have been processed. + * The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially. + */ + def collect[U, In](collector: Collector[In, _ <: Any, U]): Sink[In, CompletionStage[U]] = + StreamConverters.javaCollector(() => collector) + /** * A `Sink` that will invoke the given function for every received element, giving it its previous * output (from the second element) and the element as input.