stream operator docs: example above RS semantics (#28357)

This commit is contained in:
Arnout Engelen 2019-12-12 12:07:16 +01:00 committed by Christopher Batey
parent d115b61821
commit 2a812b7dc6
24 changed files with 163 additions and 183 deletions

View file

@ -17,6 +17,14 @@ Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complet
Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream
completes. If the stream completes with no elements the @scala[`Future`] @java[`CompletionStage`] is failed.
## Example
Scala
: @@snip [LastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala) { #last-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #last-operator-example }
## Reactive Streams semantics
@@@div { .callout }
@ -26,11 +34,3 @@ completes. If the stream completes with no elements the @scala[`Future`] @java[`
**backpressures** never
@@@
## Example
Scala
: @@snip [LastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala) { #last-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #last-operator-example }

View file

@ -18,6 +18,14 @@ Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`]
emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes. if the stream completes with no elements the `CompletionStage` is
completed with @scala[`None`] @java[an empty `Optional`].
## Example
Scala
: @@snip [LastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala) { #lastOption-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #lastOption-operator-example }
## Reactive Streams semantics
@@@div { .callout }
@ -27,11 +35,3 @@ completed with @scala[`None`] @java[an empty `Optional`].
**backpressures** never
@@@
## Example
Scala
: @@snip [LastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala) { #lastOption-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #lastOption-operator-example }

View file

@ -19,6 +19,14 @@ receives the two first elements of the flow.
Materializes into a @scala[`Future`] @java[`CompletionStage`] that will be completed by the last result of the reduction function.
## Example
Scala
: @@snip [SinkReduceSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #reduce-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #reduce-operator-example }
## Reactive Streams semantics
@@@div { .callout }
@ -28,11 +36,3 @@ Materializes into a @scala[`Future`] @java[`CompletionStage`] that will be compl
**backpressures** when the previous reduction function invocation has not yet completed
@@@
## Example
Scala
: @@snip [SinkReduceSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #reduce-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #reduce-operator-example }

View file

@ -20,6 +20,14 @@ of elements taken at that point.
If the stream never completes, the @scala[`Future`] @java[`CompletionStage`] will never complete.
If there is a failure signaled in the stream the @scala[`Future`] @java[`CompletionStage`] will be completed with failure.
## Example
Scala
: @@snip [TakeLastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala) { #takeLast-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #takeLast-operator-example }
## Reactive Streams semantics
@@@div { .callout }
@ -29,11 +37,3 @@ If there is a failure signaled in the stream the @scala[`Future`] @java[`Complet
**backpressures** never
@@@
## Example
Scala
: @@snip [TakeLastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala) { #takeLast-operator-example }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #takeLast-operator-example }

View file

@ -16,6 +16,13 @@ After completion of the original upstream the elements of the given source will
After completion of the original upstream the elements of the given source will be emitted.
## Example
Scala
: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concat }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concat }
## Reactive Streams semantics
@@@div { .callout }
@ -27,11 +34,3 @@ After completion of the original upstream the elements of the given source will
**completes** when all upstreams complete
@@@
## Example
Scala
: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concat }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concat }

View file

@ -17,6 +17,13 @@ Emits a specifiable number of elements from the original source, then from the p
Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one
source completes the rest of the other stream will be emitted.
## Example
Scala
: @@snip [FlowInterleaveSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala) { #interleave }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #interleave }
## Reactive Streams semantics
@@@div { .callout }
@ -28,11 +35,3 @@ source completes the rest of the other stream will be emitted.
**completes** when both upstreams have completed
@@@
## Example
Scala
: @@snip [FlowInterleaveSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala) { #interleave }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #interleave }

View file

@ -16,6 +16,11 @@ Transform each element in the stream by calling a mapping function with it and p
Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.
## Examples
Scala
: @@snip [Flow.scala](/akka-docs/src/test/scala/docs/stream/operators/Map.scala) { #imports #map }
## Reactive Streams semantics
@@@div { .callout }
@ -27,12 +32,3 @@ Transform each element in the stream by calling a mapping function with it and p
**completes** when upstream completes
@@@
## Examples
Scala
: @@snip [Flow.scala](/akka-docs/src/test/scala/docs/stream/operators/Map.scala) { #imports #map }

View file

@ -16,6 +16,13 @@ Merge multiple sources.
Merge multiple sources. Picks elements randomly if all sources has elements ready.
## Example
Scala
: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #merge }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge }
## Reactive Streams semantics
@@@div { .callout }
@ -27,11 +34,3 @@ Merge multiple sources. Picks elements randomly if all sources has elements read
**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
@@@
## Example
Scala
: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #merge }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge }

View file

@ -17,6 +17,13 @@ Merge multiple sources.
Merge multiple sources. Waits for one element to be ready from each input stream and emits the
smallest element.
## Example
Scala
: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #merge-sorted }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge-sorted }
## Reactive Streams semantics
@@@div { .callout }
@ -28,11 +35,3 @@ smallest element.
**completes** when all upstreams complete
@@@
## Example
Scala
: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #merge-sorted }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge-sorted }

View file

@ -22,6 +22,13 @@ the source of elements or is cancelled.
Signal errors downstream, regardless which of the two sources emitted the error.
## Example
Scala
: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala) { #or-else }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #or-else }
## Reactive Streams semantics
@@@div { .callout }
@ -35,11 +42,3 @@ is available from the second stream
without emitting and the secondary stream already has completed or when the secondary stream completes
@@@
## Example
Scala
: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala) { #or-else }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #or-else }

View file

@ -18,6 +18,13 @@ Prepends the given source to the flow, consuming it until completion before the
If materialized values needs to be collected `prependMat` is available.
## Example
Scala
: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prepend }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #prepend }
## Reactive Streams semantics
@@@div { .callout }
@ -29,11 +36,3 @@ If materialized values needs to be collected `prependMat` is available.
**completes** when all upstreams complete
@@@
## Example
Scala
: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prepend }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #prepend }

View file

@ -16,6 +16,14 @@ Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*
Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
## Examples
Scala
: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip }
## Reactive Streams semantics
@@@div { .callout }
@ -27,10 +35,3 @@ Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*
**completes** when any upstream completes
@@@
## Example
Scala
: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip }

View file

@ -16,6 +16,13 @@ Combines all elements from each of multiple sources into @scala[tuples] @java[*P
Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
## Example
Scala
: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip }
## Reactive Streams semantics
@@@div { .callout }
@ -27,10 +34,3 @@ Combines all elements from each of multiple sources into @scala[tuples] @java[*P
**completes** when all upstream completes
@@@
## Example
Scala
: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip }

View file

@ -17,6 +17,14 @@ Combines elements from multiple sources through a `combine` function and passes
Combines elements from multiple sources through a `combine` function and passes the
returned value downstream.
## Examples
Scala
: @@snip [FlowZipWithSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala) { #zip-with }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip-with }
## Reactive Streams semantics
@@@div { .callout }
@ -28,11 +36,3 @@ returned value downstream.
**completes** when any upstream completes
@@@
## Example
Scala
: @@snip [FlowZipWithSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala) { #zip-with }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip-with }

View file

@ -16,6 +16,14 @@ Zips elements of current flow with its indices.
Zips elements of current flow with its indices.
## Example
Scala
: @@snip [FlowZipWithIndexSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala) { #zip-with-index }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip-with-index }
## Reactive Streams semantics
@@@div { .callout }
@ -27,11 +35,3 @@ Zips elements of current flow with its indices.
**completes** when upstream completes
@@@
## Example
Scala
: @@snip [FlowZipWithIndexSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala) { #zip-with-index }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip-with-index }

View file

@ -23,6 +23,14 @@ already buffered elements will be sent out before signaling completion.
Sending `akka.actor.PoisonPill` will signal completion immediately but this behavior is deprecated and scheduled to be removed.
Using `akka.actor.ActorSystem.stop` to stop the actor and complete the stream is *not supported*.
## Examples
Scala
: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRef }
Java
: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref }
## Reactive Streams semantics
@@@div { .callout }
@ -32,12 +40,3 @@ Using `akka.actor.ActorSystem.stop` to stop the actor and complete the stream is
**completes** when the actor is stopped by sending it a particular message as described above
@@@
## Examples
Scala
: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRef }
Java
: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref }

View file

@ -15,6 +15,14 @@ Materialize an `ActorRef`; sending messages to it will emit them on the stream.
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor responds with the provided ack message
once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
## Examples
Scala
: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithBackpressure }
Java
: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actorRefWithBackpressure }
## Reactive Streams semantics
@@@div { .callout }
@ -24,12 +32,3 @@ once the element could be emitted allowing for backpressure from the source. Sen
**completes** when the `ActorRef` is sent `akka.actor.Status.Success`
@@@
## Examples
Scala
: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithBackpressure }
Java
: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actorRefWithBackpressure }

View file

@ -19,17 +19,6 @@ when the original iterator runs out of elements to process it will start all ove
provided by the evaluation of provided parameter. If the method argument provides an empty iterator the stream will be
terminated with an exception.
## Reactive Streams semantics
@@@div { .callout }
**emits** the next value returned from cycled iterator
**completes** never
@@@
## Examples
Scala
@ -46,3 +35,13 @@ Scala
Java
: @@snip [cycle.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #cycle-error }
## Reactive Streams semantics
@@@div { .callout }
**emits** the next value returned from cycled iterator
**completes** never
@@@

View file

@ -18,6 +18,11 @@ Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].
Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`]. @java[Make sure the `Iterable` is immutable or at least not modified after being used
as a source. Otherwise the stream may fail with `ConcurrentModificationException` or other more subtle errors may occur.]
## Examples
Java
: @@snip [from.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #imports #source-from-example }
## Reactive Streams semantics
@@@div { .callout }
@ -27,9 +32,3 @@ as a source. Otherwise the stream may fail with `ConcurrentModificationException
**completes** when the last element of the seq has been emitted
@@@
## Examples
Java
: @@snip [from.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #imports #source-from-example }

View file

@ -28,6 +28,3 @@ If the future fails the stream is failed with that exception.
**completes** after the future has completed
@@@
## Example

View file

@ -19,6 +19,10 @@ If the future fails the stream is failed with that exception.
For the corresponding operator for the Java standard library `CompletionStage` see @ref:[completionStage](completionStage.md).
## Example
Scala
: @@snip [SourceFromFuture.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture }
## Reactive Streams semantics
@@@div { .callout }
@ -28,8 +32,3 @@ For the corresponding operator for the Java standard library `CompletionStage` s
**completes** after the future has completed
@@@
## Example
Scala
: @@snip [SourceFromFuture.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture }

View file

@ -32,4 +32,3 @@ the laziness and will trigger the factory immediately.
**completes** depends on the wrapped `Source`
@@@

View file

@ -17,16 +17,6 @@ Emit each integer in a range, with an option to take bigger steps than 1.
Emit each integer in a range, with an option to take bigger steps than 1. @scala[In Scala, use the `apply` method to generate a sequence of integers.]
## Reactive Streams semantics
@@@div { .callout }
**emits** when there is demand, the next value
**completes** when the end of the range has been reached
@@@
## Examples
Define the range of integers.
@ -38,3 +28,13 @@ Print out the stream of integers.
Java
: @@snip [SourceDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #run-range}
## Reactive Streams semantics
@@@div { .callout }
**emits** when there is demand, the next value
**completes** when the end of the range has been reached
@@@

View file

@ -16,6 +16,14 @@ Stream a single object
Stream a single object
## Examples
Scala
: @@snip [source.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #source-single }
Java
: @@snip [source.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #imports #source-single }
## Reactive Streams semantics
@@@div { .callout }
@ -25,13 +33,3 @@ Stream a single object
**completes** when the single value has been emitted
@@@
## Examples
Scala
: @@snip [source.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #source-single }
Java
: @@snip [source.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #imports #source-single }