+str Add Sink.collect to javadsl. (#31371)

This commit is contained in:
kerr 2022-05-30 17:36:58 +08:00 committed by GitHub
parent e0439d61b6
commit 6ed320fedc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 12 deletions

View file

@ -0,0 +1,34 @@
# Sink.collect
Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).
@ref[Sink operators](../index.md#sink-operators)
## Signature
@apidoc[Sink.collect](Sink$) { java="#collect(java.util.stream.Collector)" }
## Description
A @javadoc[Sink](akka.stream.javadsl.Sink) which materializes into a @javadoc[CompletionStage](java.util.concurrent.CompletionStage)
which will be completed with a result of the Java @javadoc[Collector](java.util.stream.Collector) transformation and reduction operations.
## Example
Given a stream of numbers we can collect the numbers into a collection with the `seq` operator
Java
: @@snip [SinkTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java) { #collect-to-list }
## Reactive Streams semantics
@@@div { .callout }
**cancels** when the @javadoc[Collector](java.util.stream.Collector) throws an exception
**backpressures** when the @javadoc[Collector](java.util.stream.Collector)'s previous accumulation is still in progress
@@@

View file

@ -56,6 +56,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|--|--|--|
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|Sink|<a name="collect"></a>@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).|
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
@ -399,6 +400,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [buffer](Source-or-Flow/buffer.md)
* [cancelled](Sink/cancelled.md)
* [collect](Source-or-Flow/collect.md)
* [collect](Sink/collect.md)
* [collection](Sink/collection.md)
* [collectType](Source-or-Flow/collectType.md)
* [combine](Source/combine.md)

View file

@ -77,6 +77,17 @@ public class SinkTest extends StreamTest {
CompletionStage<List<Integer>> result = Source.from(list).runWith(collectorSink, system);
assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
}
@Test
public void mustBeAbleToUseCollectorOnSink() throws Exception {
//#collect-to-list
final List<Integer> list = Arrays.asList(1, 2, 3);
CompletionStage<List<Integer>> result =
Source.from(list)
.runWith(Sink.collect(Collectors.toList()), system);
//#collect-to-list
assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
}
@Test
public void mustBeAbleToCombine() throws Exception {

View file

@ -4,11 +4,26 @@
package akka.stream.javadsl
import akka._
import akka.actor.ActorRef
import akka.actor.ClassicActorSystemProvider
import akka.actor.Status
import akka.dispatch.ExecutionContexts
import akka.japi.function
import akka.japi.function.Creator
import akka.stream._
import akka.stream.impl.LinearTraversalBuilder
import akka.stream.javadsl
import akka.stream.scaladsl
import akka.stream.scaladsl.SinkToCompletionStage
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
@ -16,17 +31,6 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try
import org.reactivestreams.{ Publisher, Subscriber }
import akka._
import akka.actor.{ ActorRef, ClassicActorSystemProvider, Status }
import akka.dispatch.ExecutionContexts
import akka.japi.function
import akka.japi.function.Creator
import akka.stream.{ javadsl, scaladsl, _ }
import akka.stream.impl.LinearTraversalBuilder
import akka.stream.scaladsl.SinkToCompletionStage
/** Java API */
object Sink {
@ -52,6 +56,16 @@ object Sink {
f: function.Function2[U, In, CompletionStage[U]]): javadsl.Sink[In, CompletionStage[U]] =
new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).toScala).toCompletionStage())
/**
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
* transformation and reduction operations. This allows usage of Java streams transformations for reactive streams.
* The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
* result container, optionally transformed into a final representation after all input elements have been processed.
* The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially.
*/
def collect[U, In](collector: Collector[In, _ <: Any, U]): Sink[In, CompletionStage[U]] =
StreamConverters.javaCollector(() => collector)
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.