Add docs for stream operator Sink.collection (#28210)
This commit is contained in:
parent
949ee7ea21
commit
041b6243a5
3 changed files with 102 additions and 68 deletions
|
|
@ -0,0 +1,25 @@
|
|||
# Sink.collection
|
||||
|
||||
@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](seq.md)].
|
||||
|
||||
@ref[Sink operators](../index.md#sink-operators)
|
||||
|
||||
@@@div { .group-scala }
|
||||
|
||||
## Signature
|
||||
|
||||
@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #collection }
|
||||
|
||||
## Description
|
||||
|
||||
@scala[Collect values emitted from the stream into an arbitrary collection `That`. The resulting collection is available through a `Future[That]` or when the stream completes. Note that the collection boundaries are those defined in the `CanBuildFrom` associated with the chosen collection. See [The Architecture of Scala 2.13's Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-213-collections.html) for more info. The [`seq`](seq.html) operator is a shorthand for `Sink.collection[T, Vector[T]]`.]@java[Operator only available in the Scala API. The closest operator in the Java API is [`Sink.seq`](seq.html).]
|
||||
|
||||
## Reactive Streams semantics
|
||||
|
||||
@@@
|
||||
|
||||
@@@div { .group-scala .callout }
|
||||
|
||||
**cancels** If too many values are collected
|
||||
|
||||
@@@
|
||||
|
|
@ -58,6 +58,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|
|||
|Sink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.|
|
||||
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|
||||
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|
||||
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|
||||
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|
||||
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|
||||
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
||||
|
|
@ -473,6 +474,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
|||
* [lastOption](Sink/lastOption.md)
|
||||
* [takeLast](Sink/takeLast.md)
|
||||
* [seq](Sink/seq.md)
|
||||
* [collection](Sink/collection.md)
|
||||
* [asPublisher](Sink/asPublisher.md)
|
||||
* [ignore](Sink/ignore.md)
|
||||
* [foreach](Sink/foreach.md)
|
||||
|
|
|
|||
|
|
@ -12,12 +12,10 @@ import scala.util.control.NonFatal
|
|||
*/
|
||||
object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||
|
||||
override val projectSettings: Seq[Setting[_]] = inConfig(Compile)(Seq(
|
||||
resourceGenerators +=
|
||||
generateAlphabeticalIndex(sourceDirectory,
|
||||
_ / "paradox" / "stream" / "operators" / "index.md"
|
||||
)
|
||||
))
|
||||
override val projectSettings: Seq[Setting[_]] = inConfig(Compile)(
|
||||
Seq(
|
||||
resourceGenerators +=
|
||||
generateAlphabeticalIndex(sourceDirectory, _ / "paradox" / "stream" / "operators" / "index.md")))
|
||||
|
||||
val categories = Seq(
|
||||
"Source operators",
|
||||
|
|
@ -35,8 +33,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"Fan-out operators",
|
||||
"Watching status operators",
|
||||
"Actor interop operators",
|
||||
"Error handling"
|
||||
)
|
||||
"Error handling")
|
||||
|
||||
def categoryId(name: String): String = name.toLowerCase.replace(' ', '-')
|
||||
|
||||
|
|
@ -78,9 +75,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
|
||||
// FIXME document these methods as well
|
||||
val pendingTestCases = Map(
|
||||
"Source" -> (pendingSourceOrFlow ++ Seq(
|
||||
"preMaterialize"
|
||||
)),
|
||||
"Source" -> (pendingSourceOrFlow ++ Seq("preMaterialize")),
|
||||
"Flow" -> (pendingSourceOrFlow ++ Seq(
|
||||
"lazyInit",
|
||||
"fromProcessorMat",
|
||||
|
|
@ -89,11 +84,9 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"of",
|
||||
"join",
|
||||
"joinMat",
|
||||
"fromFunction"
|
||||
)),
|
||||
"fromFunction")),
|
||||
"Sink" -> Seq(
|
||||
"lazyInit",
|
||||
"collection",
|
||||
"contramap",
|
||||
"named",
|
||||
"addAttributes",
|
||||
|
|
@ -108,18 +101,24 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"newOnCompleteStage",
|
||||
"actorRefWithAck" // deprecated
|
||||
),
|
||||
"ActorSink" -> Seq(
|
||||
"actorRefWithAck" // deprecated
|
||||
"ActorSink" -> Seq("actorRefWithAck" // deprecated
|
||||
),
|
||||
"ActorSource" -> Seq(
|
||||
"actorRefWithAck" // deprecated
|
||||
)
|
||||
)
|
||||
"ActorSource" -> Seq("actorRefWithAck" // deprecated
|
||||
))
|
||||
|
||||
val ignore =
|
||||
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
|
||||
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
|
||||
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
|
||||
Set(
|
||||
"create",
|
||||
"apply",
|
||||
"ops",
|
||||
"appendJava",
|
||||
"andThen",
|
||||
"andThenMat",
|
||||
"isIdentity",
|
||||
"withAttributes",
|
||||
"transformMaterializing") ++
|
||||
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
|
||||
Set("++", "onPush", "onPull", "actorRefWithAck")
|
||||
|
||||
|
|
@ -157,19 +156,19 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala",
|
||||
"akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala",
|
||||
"akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala",
|
||||
|
||||
// akka-stream-typed
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala"
|
||||
).flatMap{ f =>
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala").flatMap { f =>
|
||||
val slashesNr = f.count(_ == '/')
|
||||
val element = f.split("/")(slashesNr).split("\\.")(0)
|
||||
IO.read(new File(f)).split("\n")
|
||||
.map(_.trim).filter(_.startsWith("def "))
|
||||
IO.read(new File(f))
|
||||
.split("\n")
|
||||
.map(_.trim)
|
||||
.filter(_.startsWith("def "))
|
||||
.map(_.drop(4).takeWhile(c => c != '[' && c != '(' && c != ':'))
|
||||
.filter(op => !isPending(element, op))
|
||||
.filter(op => !ignore.contains(op))
|
||||
|
|
@ -180,10 +179,10 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
(noElement, "Broadcast"),
|
||||
(noElement, "Balance"),
|
||||
(noElement, "Unzip"),
|
||||
(noElement, "UnzipWith")
|
||||
)
|
||||
(noElement, "UnzipWith"))
|
||||
|
||||
val sourceAndFlow = defs.collect { case ("Source", method) => method } intersect defs.collect { case ("Flow", method) => method }
|
||||
val sourceAndFlow =
|
||||
defs.collect { case ("Source", method) => method }.intersect(defs.collect { case ("Flow", method) => method })
|
||||
|
||||
val groupedDefs =
|
||||
defs.map {
|
||||
|
|
@ -195,35 +194,40 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
(element, method, s"$element/$method.md")
|
||||
}.distinct
|
||||
|
||||
val tablePerCategory = groupedDefs.map { case (element, method, md) =>
|
||||
val (description, category) = getDetails(file.getParentFile / md)
|
||||
category -> (element, method, md, description)
|
||||
}
|
||||
val tablePerCategory = groupedDefs
|
||||
.map {
|
||||
case (element, method, md) =>
|
||||
val (description, category) = getDetails(file.getParentFile / md)
|
||||
category -> (element, method, md, description)
|
||||
}
|
||||
.groupBy(_._1)
|
||||
.mapValues(lines =>
|
||||
"| |Operator|Description|\n" ++ // TODO mini images here too
|
||||
.mapValues(
|
||||
lines =>
|
||||
"| |Operator|Description|\n" ++ // TODO mini images here too
|
||||
"|--|--|--|\n" ++
|
||||
lines
|
||||
.map(_._2)
|
||||
.sortBy(_._2)
|
||||
.map { case (element, method, md, description) =>
|
||||
s"""|$element|<a name="${method.toLowerCase}"></a>@ref[${methodToShow(method)}]($md)|$description|"""
|
||||
.map {
|
||||
case (element, method, md, description) =>
|
||||
s"""|$element|<a name="${method.toLowerCase}"></a>@ref[${methodToShow(method)}]($md)|$description|"""
|
||||
}
|
||||
.mkString("\n")
|
||||
)
|
||||
.mkString("\n"))
|
||||
|
||||
val tables = categories.map { category =>
|
||||
s"## $category\n\n" ++
|
||||
val tables = categories
|
||||
.map { category =>
|
||||
s"## $category\n\n" ++
|
||||
IO.read(dir.value / "categories" / (categoryId(category) + ".md")) ++ "\n\n" ++
|
||||
tablePerCategory(category)
|
||||
}.mkString("\n\n")
|
||||
}
|
||||
.mkString("\n\n")
|
||||
|
||||
val content =
|
||||
"<!-- DO NOT EDIT DIRECTLY: This file is generated by `project/StreamOperatorsIndexGenerator`. See CONTRIBUTING.md for details. -->\n" +
|
||||
"# Operators\n\n" +
|
||||
tables +
|
||||
"\n\n@@@ index\n\n" +
|
||||
groupedDefs.map { case (_, method, md) => s"* [${methodToShow(method)}]($md)" }.mkString("\n") + "\n\n@@@\n"
|
||||
tables +
|
||||
"\n\n@@@ index\n\n" +
|
||||
groupedDefs.map { case (_, method, md) => s"* [${methodToShow(method)}]($md)" }.mkString("\n") + "\n\n@@@\n"
|
||||
|
||||
if (!file.exists || IO.read(file) != content) IO.write(file, content)
|
||||
Seq(file)
|
||||
|
|
@ -231,30 +235,33 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
|
||||
def methodToShow(method: String): String = method match {
|
||||
case "from" => "@scala[apply]@java[from]"
|
||||
case other => other
|
||||
case other => other
|
||||
}
|
||||
|
||||
def getDetails(file: File): (String, String) = try {
|
||||
val contents = IO.read(file)
|
||||
val lines = contents.split("\\r?\\n")
|
||||
require(
|
||||
lines.size >= 5,
|
||||
s"There must be at least 5 lines in $file, including the title, description, category link and an empty line between each two of them"
|
||||
)
|
||||
// This forces the short description to be on a single line. We could make this smarter,
|
||||
// but 'forcing' the short description to be really short seems nice as well.
|
||||
val description = lines(2)
|
||||
require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line")
|
||||
val categoryLink = lines(4)
|
||||
require(categoryLink.startsWith("@ref"), s"""category link in $file should start with @ref, but saw \"$categoryLink\"""")
|
||||
val categoryName = categoryLink.drop(5).takeWhile(_ != ']')
|
||||
val categoryLinkId = categoryLink.dropWhile(_ != '#').drop(1).takeWhile(_ != ')')
|
||||
require(categories.contains(categoryName), s"category $categoryName in $file should be known")
|
||||
require(categoryLinkId == categoryId(categoryName), s"category id $categoryLinkId in $file")
|
||||
(description, categoryName)
|
||||
} catch {
|
||||
case NonFatal(ex) =>
|
||||
throw new RuntimeException(s"Unable to extract details from $file", ex)
|
||||
}
|
||||
def getDetails(file: File): (String, String) =
|
||||
try {
|
||||
val contents = IO.read(file)
|
||||
val lines = contents.split("\\r?\\n")
|
||||
require(
|
||||
lines.size >= 5,
|
||||
s"There must be at least 5 lines in $file, including the title, description, category link and an empty line between each two of them")
|
||||
// This forces the short description to be on a single line. We could make this smarter,
|
||||
// but 'forcing' the short description to be really short seems nice as well.
|
||||
val description = lines(2)
|
||||
.replaceAll("]\\(", "](" + file.getAbsolutePath.replaceFirst(".*/([^/]+/).*", "$1"))
|
||||
require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line")
|
||||
val categoryLink = lines(4)
|
||||
require(
|
||||
categoryLink.startsWith("@ref"),
|
||||
s"""category link in $file should start with @ref, but saw \"$categoryLink\"""")
|
||||
val categoryName = categoryLink.drop(5).takeWhile(_ != ']')
|
||||
val categoryLinkId = categoryLink.dropWhile(_ != '#').drop(1).takeWhile(_ != ')')
|
||||
require(categories.contains(categoryName), s"category $categoryName in $file should be known")
|
||||
require(categoryLinkId == categoryId(categoryName), s"category id $categoryLinkId in $file")
|
||||
(description, categoryName)
|
||||
} catch {
|
||||
case NonFatal(ex) =>
|
||||
throw new RuntimeException(s"Unable to extract details from $file", ex)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue