diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/collect.md b/akka-docs/src/main/paradox/stream/operators/Sink/collect.md
new file mode 100644
index 0000000000..f1493a0433
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Sink/collect.md
@@ -0,0 +1,34 @@
+# Sink.collect
+
+Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.collect](Sink$) { java="#collect(java.util.stream.Collector)" }
+
+## Description
+
+A @javadoc[Sink](akka.stream.javadsl.Sink) which materializes into a @javadoc[CompletionStage](java.util.concurrent.CompletionStage)
+which will be completed with a result of the Java @javadoc[Collector](java.util.stream.Collector) transformation and reduction operations.
+
+## Example
+
+Given a stream of numbers we can collect the numbers into a collection with the `seq` operator
+
+Java
+: @@snip [SinkTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java) { #collect-to-list }
+
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** when the @javadoc[Collector](java.util.stream.Collector) throws an exception
+
+**backpressures** when the @javadoc[Collector](java.util.stream.Collector)'s previous accumulation is still in progress
+
+@@@
+
+
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index a3b777f49e..e2f108b2e1 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -56,6 +56,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|--|--|--|
|Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
+|Sink|@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).|
|Sink|@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
@@ -399,6 +400,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [buffer](Source-or-Flow/buffer.md)
* [cancelled](Sink/cancelled.md)
* [collect](Source-or-Flow/collect.md)
+* [collect](Sink/collect.md)
* [collection](Sink/collection.md)
* [collectType](Source-or-Flow/collectType.md)
* [combine](Source/combine.md)
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java
index 4558d90f12..727feaf6ea 100644
--- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java
@@ -77,6 +77,17 @@ public class SinkTest extends StreamTest {
CompletionStage> result = Source.from(list).runWith(collectorSink, system);
assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void mustBeAbleToUseCollectorOnSink() throws Exception {
+ //#collect-to-list
+ final List list = Arrays.asList(1, 2, 3);
+ CompletionStage> result =
+ Source.from(list)
+ .runWith(Sink.collect(Collectors.toList()), system);
+ //#collect-to-list
+ assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
+ }
@Test
public void mustBeAbleToCombine() throws Exception {
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
index 3eeef8b63d..4921d8ab96 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
@@ -4,11 +4,26 @@
package akka.stream.javadsl
+import akka._
+import akka.actor.ActorRef
+import akka.actor.ClassicActorSystemProvider
+import akka.actor.Status
+import akka.dispatch.ExecutionContexts
+import akka.japi.function
+import akka.japi.function.Creator
+import akka.stream._
+import akka.stream.impl.LinearTraversalBuilder
+import akka.stream.javadsl
+import akka.stream.scaladsl
+import akka.stream.scaladsl.SinkToCompletionStage
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
-
+import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
@@ -16,17 +31,6 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try
-import org.reactivestreams.{ Publisher, Subscriber }
-
-import akka._
-import akka.actor.{ ActorRef, ClassicActorSystemProvider, Status }
-import akka.dispatch.ExecutionContexts
-import akka.japi.function
-import akka.japi.function.Creator
-import akka.stream.{ javadsl, scaladsl, _ }
-import akka.stream.impl.LinearTraversalBuilder
-import akka.stream.scaladsl.SinkToCompletionStage
-
/** Java API */
object Sink {
@@ -52,6 +56,16 @@ object Sink {
f: function.Function2[U, In, CompletionStage[U]]): javadsl.Sink[In, CompletionStage[U]] =
new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).toScala).toCompletionStage())
+ /**
+ * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
+ * transformation and reduction operations. This allows usage of Java streams transformations for reactive streams.
+ * The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
+ * result container, optionally transformed into a final representation after all input elements have been processed.
+ * The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially.
+ */
+ def collect[U, In](collector: Collector[In, _ <: Any, U]): Sink[In, CompletionStage[U]] =
+ StreamConverters.javaCollector(() => collector)
+
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.