diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/collection.md b/akka-docs/src/main/paradox/stream/operators/Sink/collection.md
new file mode 100644
index 0000000000..176058df1e
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Sink/collection.md
@@ -0,0 +1,25 @@
+# Sink.collection
+
+@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](seq.md)].
+
+@ref[Sink operators](../index.md#sink-operators)
+
+@@@div { .group-scala }
+
+## Signature
+
+@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #collection }
+
+## Description
+
+@scala[Collect values emitted from the stream into an arbitrary collection `That`. The resulting collection is available through a `Future[That]` or when the stream completes. Note that the collection boundaries are those defined in the `CanBuildFrom` associated with the chosen collection. See [The Architecture of Scala 2.13's Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-213-collections.html) for more info. The [`seq`](seq.html) operator is a shorthand for `Sink.collection[T, Vector[T]]`.]@java[Operator only available in the Scala API. The closest operator in the Java API is [`Sink.seq`](seq.html).]
+
+## Reactive Streams semantics
+
+@@@
+
+@@@div { .group-scala .callout }
+
+**cancels** If too many values are collected
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index adad53abae..0e7ae92a6a 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -58,6 +58,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|Sink|@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.|
|Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
+|Sink|@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
@@ -473,6 +474,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [lastOption](Sink/lastOption.md)
* [takeLast](Sink/takeLast.md)
* [seq](Sink/seq.md)
+* [collection](Sink/collection.md)
* [asPublisher](Sink/asPublisher.md)
* [ignore](Sink/ignore.md)
* [foreach](Sink/foreach.md)
diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala
index 3b8c271b31..e1dee3bcd8 100644
--- a/project/StreamOperatorsIndexGenerator.scala
+++ b/project/StreamOperatorsIndexGenerator.scala
@@ -12,12 +12,10 @@ import scala.util.control.NonFatal
*/
object StreamOperatorsIndexGenerator extends AutoPlugin {
- override val projectSettings: Seq[Setting[_]] = inConfig(Compile)(Seq(
- resourceGenerators +=
- generateAlphabeticalIndex(sourceDirectory,
- _ / "paradox" / "stream" / "operators" / "index.md"
- )
- ))
+ override val projectSettings: Seq[Setting[_]] = inConfig(Compile)(
+ Seq(
+ resourceGenerators +=
+ generateAlphabeticalIndex(sourceDirectory, _ / "paradox" / "stream" / "operators" / "index.md")))
val categories = Seq(
"Source operators",
@@ -35,8 +33,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"Fan-out operators",
"Watching status operators",
"Actor interop operators",
- "Error handling"
- )
+ "Error handling")
def categoryId(name: String): String = name.toLowerCase.replace(' ', '-')
@@ -78,9 +75,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
// FIXME document these methods as well
val pendingTestCases = Map(
- "Source" -> (pendingSourceOrFlow ++ Seq(
- "preMaterialize"
- )),
+ "Source" -> (pendingSourceOrFlow ++ Seq("preMaterialize")),
"Flow" -> (pendingSourceOrFlow ++ Seq(
"lazyInit",
"fromProcessorMat",
@@ -89,11 +84,9 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"of",
"join",
"joinMat",
- "fromFunction"
- )),
+ "fromFunction")),
"Sink" -> Seq(
"lazyInit",
- "collection",
"contramap",
"named",
"addAttributes",
@@ -108,18 +101,24 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"newOnCompleteStage",
"actorRefWithAck" // deprecated
),
- "ActorSink" -> Seq(
- "actorRefWithAck" // deprecated
+ "ActorSink" -> Seq("actorRefWithAck" // deprecated
),
- "ActorSource" -> Seq(
- "actorRefWithAck" // deprecated
- )
- )
+ "ActorSource" -> Seq("actorRefWithAck" // deprecated
+ ))
val ignore =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
- Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
+ Set(
+ "create",
+ "apply",
+ "ops",
+ "appendJava",
+ "andThen",
+ "andThenMat",
+ "isIdentity",
+ "withAttributes",
+ "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
Set("++", "onPush", "onPull", "actorRefWithAck")
@@ -157,19 +156,19 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala",
"akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala",
"akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala",
-
// akka-stream-typed
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala",
- "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala"
- ).flatMap{ f =>
+ "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala").flatMap { f =>
val slashesNr = f.count(_ == '/')
val element = f.split("/")(slashesNr).split("\\.")(0)
- IO.read(new File(f)).split("\n")
- .map(_.trim).filter(_.startsWith("def "))
+ IO.read(new File(f))
+ .split("\n")
+ .map(_.trim)
+ .filter(_.startsWith("def "))
.map(_.drop(4).takeWhile(c => c != '[' && c != '(' && c != ':'))
.filter(op => !isPending(element, op))
.filter(op => !ignore.contains(op))
@@ -180,10 +179,10 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
(noElement, "Broadcast"),
(noElement, "Balance"),
(noElement, "Unzip"),
- (noElement, "UnzipWith")
- )
+ (noElement, "UnzipWith"))
- val sourceAndFlow = defs.collect { case ("Source", method) => method } intersect defs.collect { case ("Flow", method) => method }
+ val sourceAndFlow =
+ defs.collect { case ("Source", method) => method }.intersect(defs.collect { case ("Flow", method) => method })
val groupedDefs =
defs.map {
@@ -195,35 +194,40 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
(element, method, s"$element/$method.md")
}.distinct
- val tablePerCategory = groupedDefs.map { case (element, method, md) =>
- val (description, category) = getDetails(file.getParentFile / md)
- category -> (element, method, md, description)
- }
+ val tablePerCategory = groupedDefs
+ .map {
+ case (element, method, md) =>
+ val (description, category) = getDetails(file.getParentFile / md)
+ category -> (element, method, md, description)
+ }
.groupBy(_._1)
- .mapValues(lines =>
- "| |Operator|Description|\n" ++ // TODO mini images here too
+ .mapValues(
+ lines =>
+ "| |Operator|Description|\n" ++ // TODO mini images here too
"|--|--|--|\n" ++
lines
.map(_._2)
.sortBy(_._2)
- .map { case (element, method, md, description) =>
- s"""|$element|@ref[${methodToShow(method)}]($md)|$description|"""
+ .map {
+ case (element, method, md, description) =>
+ s"""|$element|@ref[${methodToShow(method)}]($md)|$description|"""
}
- .mkString("\n")
- )
+ .mkString("\n"))
- val tables = categories.map { category =>
- s"## $category\n\n" ++
+ val tables = categories
+ .map { category =>
+ s"## $category\n\n" ++
IO.read(dir.value / "categories" / (categoryId(category) + ".md")) ++ "\n\n" ++
tablePerCategory(category)
- }.mkString("\n\n")
+ }
+ .mkString("\n\n")
val content =
"\n" +
"# Operators\n\n" +
- tables +
- "\n\n@@@ index\n\n" +
- groupedDefs.map { case (_, method, md) => s"* [${methodToShow(method)}]($md)" }.mkString("\n") + "\n\n@@@\n"
+ tables +
+ "\n\n@@@ index\n\n" +
+ groupedDefs.map { case (_, method, md) => s"* [${methodToShow(method)}]($md)" }.mkString("\n") + "\n\n@@@\n"
if (!file.exists || IO.read(file) != content) IO.write(file, content)
Seq(file)
@@ -231,30 +235,33 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
def methodToShow(method: String): String = method match {
case "from" => "@scala[apply]@java[from]"
- case other => other
+ case other => other
}
- def getDetails(file: File): (String, String) = try {
- val contents = IO.read(file)
- val lines = contents.split("\\r?\\n")
- require(
- lines.size >= 5,
- s"There must be at least 5 lines in $file, including the title, description, category link and an empty line between each two of them"
- )
- // This forces the short description to be on a single line. We could make this smarter,
- // but 'forcing' the short description to be really short seems nice as well.
- val description = lines(2)
- require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line")
- val categoryLink = lines(4)
- require(categoryLink.startsWith("@ref"), s"""category link in $file should start with @ref, but saw \"$categoryLink\"""")
- val categoryName = categoryLink.drop(5).takeWhile(_ != ']')
- val categoryLinkId = categoryLink.dropWhile(_ != '#').drop(1).takeWhile(_ != ')')
- require(categories.contains(categoryName), s"category $categoryName in $file should be known")
- require(categoryLinkId == categoryId(categoryName), s"category id $categoryLinkId in $file")
- (description, categoryName)
- } catch {
- case NonFatal(ex) =>
- throw new RuntimeException(s"Unable to extract details from $file", ex)
- }
+ def getDetails(file: File): (String, String) =
+ try {
+ val contents = IO.read(file)
+ val lines = contents.split("\\r?\\n")
+ require(
+ lines.size >= 5,
+ s"There must be at least 5 lines in $file, including the title, description, category link and an empty line between each two of them")
+ // This forces the short description to be on a single line. We could make this smarter,
+ // but 'forcing' the short description to be really short seems nice as well.
+ val description = lines(2)
+ .replaceAll("]\\(", "](" + file.getAbsolutePath.replaceFirst(".*/([^/]+/).*", "$1"))
+ require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line")
+ val categoryLink = lines(4)
+ require(
+ categoryLink.startsWith("@ref"),
+ s"""category link in $file should start with @ref, but saw \"$categoryLink\"""")
+ val categoryName = categoryLink.drop(5).takeWhile(_ != ']')
+ val categoryLinkId = categoryLink.dropWhile(_ != '#').drop(1).takeWhile(_ != ')')
+ require(categories.contains(categoryName), s"category $categoryName in $file should be known")
+ require(categoryLinkId == categoryId(categoryName), s"category id $categoryLinkId in $file")
+ (description, categoryName)
+ } catch {
+ case NonFatal(ex) =>
+ throw new RuntimeException(s"Unable to extract details from $file", ex)
+ }
}