diff --git a/akka-docs/src/main/paradox/java/stream/stages-overview.md b/akka-docs/src/main/paradox/java/stream/stages-overview.md index fc91415e51..734e2da1ae 100644 --- a/akka-docs/src/main/paradox/java/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/java/stream/stages-overview.md @@ -1,1770 +1 @@ -# Overview of built-in stages and their semantics - -
- -## Source stages - -These built-in sources are available from `akka.stream.javadsl.Source`: - ---------------------------------------------------------------- - -### fromIterator - -Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew on -each materialization of the source which is the reason the factory takes a `Creator` rather than an `Iterator` directly. - -If the iterator perform blocking operations, make sure to run it on a separate dispatcher. - -**emits** the next value returned from the iterator - -**completes** when the iterator reaches its end - ---------------------------------------------------------------- - -### from - -Stream the values of an `Iterable`. Make sure the `Iterable` is immutable or at least not modified after being used -as a source. - -**emits** the next value of the iterable - -**completes** after the last element of the iterable has been emitted - ---------------------------------------------------------------- - -### single - -Stream a single object - -**emits** the value once - -**completes** when the single value has been emitted - ---------------------------------------------------------------- - -### repeat - -Stream a single object repeatedly - -**emits** the same value repeatedly when there is demand - -**completes** never - ---------------------------------------------------------------- - -### cycle - -Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning -when original iterator runs out of elements process will start all over again from the beginning of the iterator -provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with -exception. - -**emits** the next value returned from cycled iterator - -**completes** never - ---------------------------------------------------------------- - -### tick - -A periodical repetition of an arbitrary object. Delay of first tick is specified -separately from interval of the following ticks. - -**emits** periodically, if there is downstream backpressure ticks are skipped - -**completes** never - ---------------------------------------------------------------- - -### fromCompletionStage - -Send the single value of the `CompletionStage` when it completes and there is demand. -If the `CompletionStage` fails the stream is failed with that exception. - -**emits** when the `CompletionStage` completes - -**completes** after the `CompletionStage` has completed or when it fails - ---------------------------------------------------------------- - -### fromFuture - -Send the single value of the Scala `Future` when it completes and there is demand. -If the future fails the stream is failed with that exception. - -**emits** the future completes - -**completes** after the future has completed - ---------------------------------------------------------------- - -### fromFutureSource - -Streams the elements of the given future source once it successfully completes. -If the future fails the stream is failed. - -**emits** the next value from the *future* source, once it has completed - -**completes** after the *future* source completes - ---------------------------------------------------------------- - -### fromSourceCompletionStage - -Streams the elements of an asynchronous source once its given *completion* stage completes. -If the *completion* fails the stream is failed with that exception. - -**emits** the next value from the asynchronous source, once its *completion stage* has completed - -**completes** after the asynchronous source completes - ---------------------------------------------------------------- - -### unfold - -Stream the result of a function as long as it returns a `Optional`, the value inside the optional -consists of a pair where the first value is a state passed back into the next call to the function allowing -to pass a state. The first invocation of the provided fold function will receive the `zero` state. - -Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. - -**emits** when there is demand and the unfold function over the previous state returns non empty value - -**completes** when the unfold function returns an empty value - ---------------------------------------------------------------- - -### unfoldAsync - -Just like `unfold` but the fold function returns a `CompletionStage` which will cause the source to -complete or emit when it completes. - -Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. - -**emits** when there is demand and unfold state returned CompletionStage completes with some value - -**completes** when the CompletionStage returned by the unfold function completes with an empty value - ---------------------------------------------------------------- - -### empty - -Complete right away without ever emitting any elements. Useful when you have to provide a source to -an API but there are no elements to emit. - -**emits** never - -**completes** directly - ---------------------------------------------------------------- - -### maybe - -Materialize a `CompletionStage` that can be completed with an `Optional`. -If it is completed with a value it will be eimitted from the source if it is an empty `Optional` it will -complete directly. - -**emits** when the returned promise is completed with some value - -**completes** after emitting some value, or directly if the promise is completed with no value - ---------------------------------------------------------------- - -### failed - -Fail directly with a user specified exception. - -**emits** never - -**completes** fails the stream directly with the given exception - ---------------------------------------------------------------- - -### lazily - -Defers creation and materialization of a `Source` until there is demand. - -**emits** depends on the wrapped `Source` - -**completes** depends on the wrapped `Source` - ---------------------------------------------------------------- - -### actorPublisher - -Wrap an actor extending `ActorPublisher` as a source. - -**emits** depends on the actor implementation - -**completes** when the actor stops - ---------------------------------------------------------------- - -### actorRef - -Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain -a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping -elements or failing the stream, the strategy is chosen by the user. - -**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref - -**completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill` - ---------------------------------------------------------------- - -### combine - -Combine several sources, using a given strategy such as merge or concat, into one source. - -**emits** when there is demand, but depending on the strategy - -**completes** when all sources has completed - ---------------------------------------------------------------- - -### range - -Emit each integer in a range, with an option to take bigger steps than 1. - -**emits** when there is demand, the next value - -**completes** when the end of the range has been reached - ---------------------------------------------------------------- - -### unfoldResource - -Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. - -**emits** when there is demand and read method returns value - -**completes** when read function returns `None` - ---------------------------------------------------------------- - -### unfoldAsyncResource - -Wrap any resource that can be opened, queried for next element and closed using three distinct functions into a source. -Functions return `CompletionStage` result to achieve asynchronous processing - -**emits** when there is demand and `CompletionStage` from read function returns value - -**completes** when `CompletionStage` from read function returns `None` - ---------------------------------------------------------------- - -### queue - -Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains -a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with -a strategy specified by the user. Functionality for tracking when an element has been emitted is available through -`SourceQueue.offer`. - -**emits** when there is demand and the queue contains elements - -**completes** when downstream completes - ---------------------------------------------------------------- - -### asSubscriber - -Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`. - ---------------------------------------------------------------- - -### fromPublisher - -Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`. - ---------------------------------------------------------------- - -### zipN - -Combine the elements of multiple streams into a stream of sequences. - -**emits** when all of the inputs has an element available - -**completes** when any upstream completes - ---------------------------------------------------------------- - -### zipWithN - -Combine the elements of multiple streams into a stream of sequences using a combiner function. - -**emits** when all of the inputs has an element available - -**completes** when any upstream completes - ---------------------------------------------------------------- - -
- -## Sink stages - -These built-in sinks are available from `akka.stream.javadsl.Sink`: - ---------------------------------------------------------------- - -### head - -Materializes into a `CompletionStage` which completes with the first value arriving, -after this the stream is canceled. If no element is emitted, the CompletionStage is be failed. - -**cancels** after receiving one element - -**backpressures** never - ---------------------------------------------------------------- - -### headOption - -Materializes into a `CompletionStage>` which completes with the first value arriving wrapped in optional, -or an empty optional if the stream completes without any elements emitted. - -**cancels** after receiving one element - -**backpressures** never - ---------------------------------------------------------------- - -### last - -Materializes into a `CompletionStage` which will complete with the last value emitted when the stream -completes. If the stream completes with no elements the CompletionStage is failed. - -**cancels** never - -**backpressures** never - ---------------------------------------------------------------- - -### lastOption - -Materialize a `CompletionStage>` which completes with the last value -emitted wrapped in an optional when the stream completes. if the stream completes with no elements the `CompletionStage` is -completed with an empty optional. - -**cancels** never - -**backpressures** never - ---------------------------------------------------------------- - -### ignore - -Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually -do anything with the elements. - -**cancels** never - -**backpressures** never - ---------------------------------------------------------------- - -### cancelled - -Immediately cancel the stream - -**cancels** immediately - ---------------------------------------------------------------- - -### seq - -Collect values emitted from the stream into a collection, the collection is available through a `CompletionStage` or -which completes when the stream completes. Note that the collection is bounded to `Integer.MAX_VALUE`, -if more element are emitted the sink will cancel the stream - -**cancels** If too many values are collected - ---------------------------------------------------------------- - -### foreach - -Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. - -The sink materializes into a `CompletionStage>` which completes when the -stream completes, or fails if the stream fails. - -Note that it is not safe to mutate state from the procedure. - -**cancels** never - -**backpressures** when the previous procedure invocation has not yet completed - ---------------------------------------------------------------- - -### foreachParallel - -Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. - -**cancels** never - -**backpressures** when the previous parallel procedure invocations has not yet completed - ---------------------------------------------------------------- - -### onComplete - -Invoke a callback when the stream has completed or failed. - -**cancels** never - -**backpressures** never - ---------------------------------------------------------------- - -### lazyInit - -Invoke sinkFactory function to create a real sink upon receiving the first element. Internal `Sink` will not be created if there are no elements, -because of completion or error. *fallback* will be invoked if there was no elements and completed is received from upstream. - -**cancels** never - -**backpressures** when initialized and when created sink backpressures - ---------------------------------------------------------------- - -### queue - -Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. The queue contains -a buffer in case stream emitting elements faster than queue pulling them. - -**cancels** when `SinkQueue.cancel` is called - -**backpressures** when buffer has some space - ---------------------------------------------------------------- - -### fold - -Fold over emitted element with a function, where each invocation will get the new element and the result from the -previous fold invocation. The first invocation will be provided the `zero` value. - -Materializes into a CompletionStage that will complete with the last state when the stream has completed. - -This stage allows combining values into a result without a global mutable state by instead passing the state along -between invocations. - -**cancels** never - -**backpressures** when the previous fold function invocation has not yet completed - ---------------------------------------------------------------- - -### reduce - -Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation -receives the two first elements of the flow. - -Materializes into a CompletionStage that will be completed by the last result of the reduction function. - -**cancels** never - -**backpressures** when the previous reduction function invocation has not yet completed - ---------------------------------------------------------------- - -### combine - -Combine several sinks into one using a user specified strategy - -**cancels** depends on the strategy - -**backpressures** depends on the strategy - ---------------------------------------------------------------- - -### actorRef - -Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox. - -**cancels** when the actor terminates - -**backpressures** never - ---------------------------------------------------------------- - -### actorRefWithAck - -Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, -to provide back pressure onto the sink. - -**cancels** when the actor terminates - -**backpressures** when the actor acknowledgement has not arrived - ---------------------------------------------------------------- - -### actorSubscriber - -Create an actor from a `Props` upon materialization, where the actor implements `ActorSubscriber`, which will -receive the elements from the stream. - -Materializes into an `ActorRef` to the created actor. - -**cancels** when the actor terminates - -**backpressures** depends on the actor implementation - ---------------------------------------------------------------- - -### asPublisher - -Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`. - ---------------------------------------------------------------- - -### fromSubscriber - -Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink - ---------------------------------------------------------------- - -
- -## Additional Sink and Source converters - -Sources and sinks for integrating with `java.io.InputStream` and `java.io.OutputStream` can be found on -`StreamConverters`. As they are blocking APIs the implementations of these stages are run on a separate -dispatcher configured through the `akka.stream.blocking-io-dispatcher`. - ---------------------------------------------------------------- - -### fromOutputStream - -Create a sink that wraps an `OutputStream`. Takes a function that produces an `OutputStream`, when the sink is -materialized the function will be called and bytes sent to the sink will be written to the returned `OutputStream`. - -Materializes into a `CompletionStage` which will complete with a `IOResult` when the stream -completes. - -Note that a flow can be materialized multiple times, so the function producing the `OutputStream` must be able -to handle multiple invocations. - -The `OutputStream` will be closed when the stream that flows into the `Sink` is completed, and the `Sink` -will cancel its inflow when the `OutputStream` is no longer writable. - ---------------------------------------------------------------- - -### asInputStream - -Create a sink which materializes into an `InputStream` that can be read to trigger demand through the sink. -Bytes emitted through the stream will be available for reading through the `InputStream` - -The `InputStream` will be ended when the stream flowing into this `Sink` completes, and the closing the -`InputStream` will cancel the inflow of this `Sink`. - ---------------------------------------------------------------- - -### fromInputStream - -Create a source that wraps an `InputStream`. Takes a function that produces an `InputStream`, when the source is -materialized the function will be called and bytes from the `InputStream` will be emitted into the stream. - -Materializes into a `CompletionStage` which will complete with a `IOResult` when the stream -completes. - -Note that a flow can be materialized multiple times, so the function producing the `InputStream` must be able -to handle multiple invocations. - -The `InputStream` will be closed when the `Source` is canceled from its downstream, and reaching the end of the -`InputStream` will complete the `Source`. - ---------------------------------------------------------------- - -### asOutputStream - -Create a source that materializes into an `OutputStream`. When bytes are written to the `OutputStream` they -are emitted from the source. - -The `OutputStream` will no longer be writable when the `Source` has been canceled from its downstream, and -closing the `OutputStream` will complete the `Source`. - ---------------------------------------------------------------- - -### asJavaStream - -Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink. -Elements emitted through the stream will be available for reading through the Java 8 `Stream`. - -The Java 8 a `Stream` will be ended when the stream flowing into this `Sink` completes, and closing the Java -`Stream` will cancel the inflow of this `Sink`. Java `Stream` throws exception in case reactive stream failed. - -Be aware that Java 8 `Stream` blocks current thread while waiting on next element from downstream. - ---------------------------------------------------------------- - -### fromJavaStream - -Create a source that wraps Java 8 `Stream`. `Source` uses a stream iterator to get all its elements and send them -downstream on demand. - ---------------------------------------------------------------- - -### javaCollector - -Create a sink which materializes into a `CompletionStage` which will be completed with a result of the Java 8 `Collector` -transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. -The `Collector` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable -result container, optionally transformed into a final representation after all input elements have been processed. -The `Collector` can also do reduction at the end. Reduction processing is performed sequentially - -Note that a flow can be materialized multiple times, so the function producing the `Collector` must be able -to handle multiple invocations. - ---------------------------------------------------------------- - -### javaCollectorParallelUnordered - -Create a sink which materializes into a `CompletionStage` which will be completed with a result of the Java 8 Collector -transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. -The `Collector` will trigger demand downstream.. Elements emitted through the stream will be accumulated into a mutable -result container, optionally transformed into a final representation after all input elements have been processed. -The `Collector` can also do reduction at the end. Reduction processing is performed in parallel based on graph `Balance`. - -Note that a flow can be materialized multiple times, so the function producing the `Collector` must be able -to handle multiple invocations. - ---------------------------------------------------------------- - -
- -## File IO Sinks and Sources - -Sources and sinks for reading and writing files can be found on `FileIO`. - ---------------------------------------------------------------- - -### fromPath - -Emit the contents of a file, as `ByteString` s, materializes into a `CompletionStage` which will be completed with -a `IOResult` upon reaching the end of the file or if there is a failure. - ---------------------------------------------------------------- - -### toPath - -Create a sink which will write incoming `ByteString` s to a given file path. - ---------------------------------------------------------------- - -
- -## Flow stages - -All flows by default backpressure if the computation they encapsulate is not fast enough to keep up with the rate of -incoming elements from the preceding stage. There are differences though how the different stages handle when some of -their downstream stages backpressure them. - -Most stages stop and propagate the failure downstream as soon as any of their upstreams emit a failure. -This happens to ensure reliable teardown of streams and cleanup when failures happen. Failures are meant to -be to model unrecoverable conditions, therefore they are always eagerly propagated. -For in-band error handling of normal errors (dropping elements if a map fails for example) you should use the -supervision support, or explicitly wrap your element types in a proper container that can express error or success -states. - -
- -## Simple processing stages - -These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a -single input (e.g. `mapConcat') or consume multiple elements before emitting one output (e.g. `filter`). -However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the -rate is affected. This is in contrast with [detached stages](#backpressure-aware-stages) which can change their processing behavior -depending on being backpressured by downstream or not. - ---------------------------------------------------------------- - -### map - -Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. - -**emits** when the mapping function returns an element - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### mapConcat - -Transform each element into zero or more elements that are individually passed downstream. - -**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection - -**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection - -**completes** when upstream completes and all remaining elements has been emitted - ---------------------------------------------------------------- - -### statefulMapConcat - -Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that -the transformation function is created from a factory for every materialization of the flow. - -**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection - -**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection - -**completes** when upstream completes and all remaining elements has been emitted - ---------------------------------------------------------------- - -### filter - -Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if -it returns false the element is discarded. - -**emits** when the given predicate returns true for the element - -**backpressures** when the given predicate returns true for the element and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### filterNot - -Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if -it returns true the element is discarded. - -**emits** when the given predicate returns false for the element - -**backpressures** when the given predicate returns false for the element and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### collect - -Apply a partial function to each incoming element, if the partial function is defined for a value the returned -value is passed downstream. Can often replace `filter` followed by `map` to achieve the same in one single stage. - -**emits** when the provided partial function is defined for the element - -**backpressures** the partial function is defined for the element and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### grouped - -Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of -elements downstream. - -**emits** when the specified number of elements has been accumulated or upstream completed - -**backpressures** when a group has been assembled and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### sliding - -Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream. - -Note: the last window might be smaller than the requested size due to end of stream. - -**emits** the specified number of elements has been accumulated or upstream completed - -**backpressures** when a group has been assembled and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### scan - -Emit its current value which starts at `zero` and then applies the current and next value to the given function -emitting the next current value. - -Note that this means that scan emits one element downstream before and upstream elements will not be requested until -the second element is required from downstream. - -**emits** when the function scanning the element returns a new element - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### scanAsync - -Just like `scan` but receiving a function that results in a `CompletionStage` to the next value. - -**emits** when the `CompletionStage` resulting from the function scanning the element resolves to the next value - -**backpressures** when downstream backpressures - -**completes** when upstream completes and the last `CompletionStage` is resolved - ---------------------------------------------------------------- - -### fold - -Start with current value `zero` and then apply the current and next value to the given function, when upstream -complete the current value is emitted downstream. - -**emits** when upstream completes - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### foldAsync - -Just like `fold` but receiving a function that results in a `CompletionStage` to the next value. - -**emits** when upstream completes and the last `CompletionStage` is resolved - -**backpressures** when downstream backpressures - -**completes** when upstream completes and the last `CompletionStage` is resolved - ---------------------------------------------------------------- - -### reduce - -Start with first element and then apply the current and next value to the given function, when upstream -complete the current value is emitted downstream. Similar to `fold`. - -**emits** when upstream completes - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### drop - -Drop `n` elements and then pass any subsequent element downstream. - -**emits** when the specified number of elements has been dropped already - -**backpressures** when the specified number of elements has been dropped and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### take - -Pass `n` incoming elements downstream and then complete - -**emits** while the specified number of elements to take has not yet been reached - -**backpressures** when downstream backpressures - -**completes** when the defined number of elements has been taken or upstream completes - ---------------------------------------------------------------- - -### takeWhile - -Pass elements downstream as long as a predicate function return true for the element include the element -when the predicate first return false and then complete. - -**emits** while the predicate is true and until the first false result - -**backpressures** when downstream backpressures - -**completes** when predicate returned false or upstream completes - ---------------------------------------------------------------- - -### dropWhile - -Drop elements as long as a predicate function return true for the element - -**emits** when the predicate returned false and for all following stream elements - -**backpressures** predicate returned false and downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### recover - -Allow sending of one last element downstream when a failure has happened upstream. - -Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. - -**emits** when the element is available from the upstream or upstream is failed and pf returns an element - -**backpressures** when downstream backpressures, not when failure happened - -**completes** when upstream completes or upstream failed with exception pf can handle - ---------------------------------------------------------------- - -### recoverWith - -Allow switching to alternative Source when a failure has happened upstream. - -Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. - -**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source - -**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source - -**completes** upstream completes or upstream failed with exception pf can handle - ---------------------------------------------------------------- - -### recoverWithRetries - -RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after -a failure has been recovered up to *attempts* number of times so that each time there is a failure -it is fed into the *pf* and a new Source may be materialized. Note that if you pass in 0, this won't -attempt to recover at all. Passing -1 will behave exactly the same as *recoverWith*. - -Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. -This stage can recover the failure signal, but not the skipped elements, which will be dropped. - -**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source - -**backpressures** when downstream backpressures - -**completes** when upstream completes or upstream failed with exception pf can handle - ---------------------------------------------------------------- - -### mapError - -While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging -it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t -> throw t2)` since recover -would log the `t2` error. - -Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. -This stage can recover the failure signal, but not the skipped elements, which will be dropped. - -Similarily to `recover` throwing an exception inside `mapError` _will_ be logged on ERROR level automatically. - -**emits** when element is available from the upstream or upstream is failed and pf returns an element -**backpressures** when downstream backpressures -**completes** when upstream completes or upstream failed with exception pf can handle - ---------------------------------------------------------------- - -### detach - -Detach upstream demand from downstream demand without detaching the stream rates. - -**emits** when the upstream stage has emitted and there is demand - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### throttle - -Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where -a function has to be provided to calculate the individual cost of each element. - -**emits** when upstream emits an element and configured time per each element elapsed - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### intersperse - -Intersperse stream with provided element similar to `List.mkString`. It can inject start and end marker elements to stream. - -**emits** when upstream emits an element or before with the *start* element if provided - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### limit - -Limit number of element from upstream to given `max` number. - -**emits** when upstream emits and the number of emitted elements has not reached max - -**backpressures** when downstream backpressures - -**completes** when upstream completes and the number of emitted elements has not reached max - ---------------------------------------------------------------- - -### limitWeighted - -Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. -Evaluated cost of each element defines how many elements will be allowed to travel downstream. - -**emits** when upstream emits and the number of emitted elements has not reached max - -**backpressures** when downstream backpressures - -**completes** when upstream completes and the number of emitted elements has not reached max - ---------------------------------------------------------------- - -### log - -Log elements flowing through the stream as well as completion and erroring. By default element and -completion signals are logged on debug level, and errors are logged on Error level. -This can be changed by calling `Attributes.createLogLevels(...)` on the given Flow. - -**emits** when upstream emits - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### recoverWithRetries - -Switch to alternative Source on flow failure. It stays in effect after a failure has been recovered up to `attempts` -number of times. Each time a failure is fed into the partial function and a new Source may be materialized. - -**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source - -**backpressures** when downstream backpressures - -**completes** when upstream completes or upstream failed with exception partial function can handle - ---------------------------------------------------------------- - -
- -## Flow stages composed of Sinks and Sources - ---------------------------------------------------------------- - -### Flow.fromSinkAndSource - -Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink` -and the `Flow` 's output will come from the Source. - -Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side" -of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends, -for example most useful in handling websocket connections. - ---------------------------------------------------------------- - -### Flow.fromSinkAndSourceCoupled - -Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. -Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages. - -E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled, -however the Sink will also be completed. The table below illustrates the effects in detail: - -| Returned Flow | Sink (in) | Source (out) | -|-------------------------------------------------|-----------------------------|---------------------------------| -| cause: upstream (sink-side) receives completion | effect: receives completion | effect: receives cancel | -| cause: upstream (sink-side) receives error | effect: receives error | effect: receives cancel | -| cause: downstream (source-side) receives cancel | effect: completes | effect: receives cancel | -| effect: cancels upstream, completes downstream | effect: completes | cause: signals complete | -| effect: cancels upstream, errors downstream | effect: receives error | cause: signals error or throws | -| effect: cancels upstream, completes downstream | cause: cancels | effect: receives cancel | - -The order in which the *in* and *out* sides receive their respective completion signals is not defined, do not rely on its ordering. - ---------------------------------------------------------------- - -
- -## Asynchronous processing stages - -These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous -operation at the same time (usually handling the completion of a CompletionStage). - ---------------------------------------------------------------- - -### mapAsync - -Pass incoming elements to a function that return a `CompletionStage` result. When the CompletionStage arrives the result is passed -downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming -order will be kept when results complete. For use cases where order does not mather `mapAsyncUnordered` can be used. - -If a `CompletionStage` fails, the stream also fails (unless a different supervision strategy is applied) - -**emits** when the CompletionStage returned by the provided function finishes for the next element in sequence - -**backpressures** when the number of `CompletionStage` s reaches the configured parallelism and the downstream backpressures - -**completes** when upstream completes and all `CompletionStage` s has been completed and all elements has been emitted - ---------------------------------------------------------------- - -### mapAsyncUnordered - -Like `mapAsync` but `CompletionStage` results are passed downstream as they arrive regardless of the order of the elements -that triggered them. - -If a CompletionStage fails, the stream also fails (unless a different supervision strategy is applied) - -**emits** any of the `CompletionStage` s returned by the provided function complete - -**backpressures** when the number of `CompletionStage` s reaches the configured parallelism and the downstream backpressures - -**completes** upstream completes and all CompletionStages has been completed and all elements has been emitted - ---------------------------------------------------------------- - -
- -## Timer driven stages - -These stages process elements using timers, delaying, dropping or grouping elements for certain time durations. - ---------------------------------------------------------------- - -### takeWithin - -Pass elements downstream within a timeout and then complete. - -**emits** when an upstream element arrives - -**backpressures** downstream backpressures - -**completes** upstream completes or timer fires - ---------------------------------------------------------------- - -### dropWithin - -Drop elements until a timeout has fired - -**emits** after the timer fired and a new upstream element arrives - -**backpressures** when downstream backpressures - -**completes** upstream completes - ---------------------------------------------------------------- - -### groupedWithin - -Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, -whatever happens first. Empty groups will not be emitted if no elements are received from upstream. -The last group before end-of-stream will contain the buffered elements since the previously emitted group. - -**emits** when the configured time elapses since the last group has been emitted, -but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached. - -**backpressures** downstream backpressures, and there are *n+1* buffered elements - -**completes** when upstream completes - ---------------------------------------------------------------- - -### groupedWeightedWithin - -Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, -whatever happens first. Empty groups will not be emitted if no elements are received from upstream. -The last group before end-of-stream will contain the buffered elements since the previously emitted group. - -**emits** when the configured time elapses since the last group has been emitted, -but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached. - -**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than *maxWeight* - -**completes** when upstream completes - ---------------------------------------------------------------- - -### initialDelay - -Delay the initial element by a user specified duration from stream materialization. - -**emits** upstream emits an element if the initial delay already elapsed - -**backpressures** downstream backpressures or initial delay not yet elapsed - -**completes** when upstream completes - ---------------------------------------------------------------- - -### delay - -Delay every element passed through with a specific duration. - -**emits** there is a pending element in the buffer and configured time for this element elapsed - -**backpressures** differs, depends on `OverflowStrategy` set - -**completes** when upstream completes and buffered elements has been drained - - ---------------------------------------------------------------- - -
- -## Backpressure aware stages - -These stages are aware of the backpressure provided by their downstreams and able to adapt their behavior to that signal. - ---------------------------------------------------------------- - -### conflate - -Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as -there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or -average of incoming numbers, if aggregation should lead to a different type `conflateWithSeed` can be used: - -**emits** when downstream stops backpressuring and there is a conflated element available - -**backpressures** when the aggregate function cannot keep up with incoming elements - -**completes** when upstream completes - ---------------------------------------------------------------- - -### conflateWithSeed - -Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there -is backpressure. When backpressure starts or there is no backpressure element is passed into a `seed` function to -transform it to the summary type. - -**emits** when downstream stops backpressuring and there is a conflated element available - -**backpressures** when the aggregate or seed functions cannot keep up with incoming elements - -**completes** when upstream completes - ---------------------------------------------------------------- - -### batch - -Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there -is backpressure and a maximum number of batched elements is not yet reached. When the maximum number is reached and -downstream still backpressures batch will also backpressure. - -When backpressure starts or there is no backpressure element is passed into a `seed` function to transform it -to the summary type. - -Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be -aggregated to the batched value. - -**emits** when downstream stops backpressuring and there is a batched element available - -**backpressures** when batched elements reached the max limit of allowed batched elements & downstream backpressures - -**completes** when upstream completes and a "possibly pending" element was drained - ---------------------------------------------------------------- - -### batchWeighted - -Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there -is backpressure and a maximum weight batched elements is not yet reached. The weight of each element is determined by -applying `costFn`. When the maximum total weight is reached and downstream still backpressures batch will also -backpressure. - -Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be -aggregated to the batched value. - -**emits** downstream stops backpressuring and there is a batched element available - -**backpressures** batched elements reached the max weight limit of allowed batched elements & downstream backpressures - -**completes** upstream completes and a "possibly pending" element was drained - ---------------------------------------------------------------- - -### expand - -Allow for a faster downstream by expanding the last incoming element to an `Iterator`. For example -`Iterator.continually(element)` to keep repeating the last incoming element. - -**emits** when downstream stops backpressuring - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### buffer (Backpressure) - -Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full backpressure -is applied. - -**emits** when downstream stops backpressuring and there is a pending element in the buffer - -**backpressures** when buffer is full - -**completes** when upstream completes and buffered elements has been drained - ---------------------------------------------------------------- - -### buffer (Drop) - -Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full elements are -dropped according to the specified `OverflowStrategy`: - - * `dropHead()` drops the oldest element in the buffer to make space for the new element - * `dropTail()` drops the youngest element in the buffer to make space for the new element - * `dropBuffer()` drops the entire buffer and buffers the new element - * `dropNew()` drops the new element - -**emits** when downstream stops backpressuring and there is a pending element in the buffer - -**backpressures** never (when dropping cannot keep up with incoming elements) - -**completes** upstream completes and buffered elements has been drained - ---------------------------------------------------------------- - -### buffer (Fail) - -Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full the stage fails -the flow with a `BufferOverflowException`. - -**emits** when downstream stops backpressuring and there is a pending element in the buffer - -**backpressures** never, fails the stream instead of backpressuring when buffer is full - -**completes** when upstream completes and buffered elements has been drained - ---------------------------------------------------------------- - -
- -## Nesting and flattening stages - -These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains -nested streams and turn them into a stream of elements instead (flattening). - ---------------------------------------------------------------- - -### prefixAndTail - -Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) -and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. - -**emits** when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream - -**backpressures** when downstream backpressures or substream backpressures - -**completes** when prefix elements has been consumed and substream has been consumed - ---------------------------------------------------------------- - -### groupBy - -Demultiplex the incoming stream into separate output streams. - -**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group -there is an element pending for a group whose substream backpressures - -**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) - ---------------------------------------------------------------- - -### splitWhen - -Split off elements into a new substream whenever a predicate function return `true`. - -**emits** an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements - -**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures - -**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) - ---------------------------------------------------------------- - -### splitAfter - -End the current substream whenever a predicate returns `true`, starting a new substream for the next element. - -**emits** when an element passes through. When the provided predicate is true it emits the element * and opens a new substream for subsequent element - -**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures - -**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) - ---------------------------------------------------------------- - -### flatMapConcat - -Transform each input element into a `Source` whose elements are then flattened into the output stream through -concatenation. This means each source is fully consumed before consumption of the next source starts. - -**emits** when the current consumed substream has an element available - -**backpressures** when downstream backpressures - -**completes** when upstream completes and all consumed substreams complete - ---------------------------------------------------------------- - -### flatMapMerge - -Transform each input element into a `Source` whose elements are then flattened into the output stream through -merging. The maximum number of merged sources has to be specified. - -**emits** when one of the currently consumed substreams has an element available - -**backpressures** when downstream backpressures - -**completes** when upstream completes and all consumed substreams complete - ---------------------------------------------------------------- - -
- -## Time aware stages - -Those stages operate taking time into consideration. - ---------------------------------------------------------------- - -### initialTimeout - -If the first element has not passed through this stage before the provided timeout, the stream is failed -with a `TimeoutException`. - -**emits** when upstream emits an element - -**backpressures** when downstream backpressures - -**completes** when upstream completes or fails if timeout elapses before first element arrives - -**cancels** when downstream cancels - ---------------------------------------------------------------- - -### completionTimeout - -If the completion of the stream does not happen until the provided timeout, the stream is failed -with a `TimeoutException`. - -**emits** when upstream emits an element - -**backpressures** when downstream backpressures - -**completes** when upstream completes or fails if timeout elapses before upstream completes - -**cancels** when downstream cancels - ---------------------------------------------------------------- - -### idleTimeout - -If the time between two processed elements exceeds the provided timeout, the stream is failed -with a `TimeoutException`. The timeout is checked periodically, so the resolution of the -check is one period (equals to timeout value). - -**emits** when upstream emits an element - -**backpressures** when downstream backpressures - -**completes** when upstream completes or fails if timeout elapses between two emitted elements - -**cancels** when downstream cancels - ---------------------------------------------------------------- - -### backpressureTimeout - -If the time between the emission of an element and the following downstream demand exceeds the provided timeout, -the stream is failed with a `TimeoutException`. The timeout is checked periodically, so the resolution of the -check is one period (equals to timeout value). - -**emits** when upstream emits an element - -**backpressures** when downstream backpressures - -**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand. - -**cancels** when downstream cancels - ---------------------------------------------------------------- - -### keepAlive - -Injects additional (configured) elements if upstream does not emit for a configured amount of time. - -**emits** when upstream emits an element or if the upstream was idle for the configured period - -**backpressures** when downstream backpressures - -**completes** when upstream completes - -**cancels** when downstream cancels - ---------------------------------------------------------------- - -### initialDelay - -Delays the initial element by the specified duration. - -**emits** when upstream emits an element if the initial delay is already elapsed - -**backpressures** when downstream backpressures or initial delay is not yet elapsed - -**completes** when upstream completes - -**cancels** when downstream cancels - ---------------------------------------------------------------- - -
- -## Fan-in stages - -These stages take multiple streams as their input and provide a single output combining the elements from all of -the inputs in different ways. - ---------------------------------------------------------------- - -### merge - -Merge multiple sources. Picks elements randomly if all sources has elements ready. - -**emits** when one of the inputs has an element available - -**backpressures** when downstream backpressures - -**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) - ---------------------------------------------------------------- - -### mergeSorted - -Merge multiple sources. Waits for one element to be ready from each input stream and emits the -smallest element. - -**emits** when all of the inputs have an element available - -**backpressures** when downstream backpressures - -**completes** when all upstreams complete - ---------------------------------------------------------------- - -### mergePreferred - -Merge multiple sources. Prefer one source if all sources has elements ready. - -**emits** when one of the inputs has an element available, preferring a defined input if multiple have elements available - -**backpressures** when downstream backpressures - -**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) - ---------------------------------------------------------------- - -### zip - -Combines elements from each of multiple sources into *Pair* s and passes the pairs downstream. - -**emits** when all of the inputs have an element available - -**backpressures** when downstream backpressures - -**completes** when any upstream completes - ---------------------------------------------------------------- - -### zipWith - -Combines elements from multiple sources through a `combine` function and passes the -returned value downstream. - -**emits** when all of the inputs have an element available - -**backpressures** when downstream backpressures - -**completes** when any upstream completes - ---------------------------------------------------------------- - -### zipWithIndex - -Zips elements of current flow with its indices. - -**emits** upstream emits an element and is paired with their index - -**backpressures** when downstream backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### concat - -After completion of the original upstream the elements of the given source will be emitted. - -**emits** when the current stream has an element available; if the current input completes, it tries the next one - -**backpressures** when downstream backpressures - -**completes** when all upstreams complete - ---------------------------------------------------------------- - -### prepend - -Prepends the given source to the flow, consuming it until completion before the original source is consumed. - -If materialized values needs to be collected `prependMat` is available. - -**emits** when the given stream has an element available; if the given input completes, it tries the current one - -**backpressures** when downstream backpressures - -**completes** when all upstreams complete - ---------------------------------------------------------------- - -### orElse - -If the primary source completes without emitting any elements, the elements from the secondary source -are emitted. If the primary source emits any elements the secondary source is cancelled. - -Note that both sources are materialized directly and the secondary source is backpressured until it becomes -the source of elements or is cancelled. - -Signal errors downstream, regardless which of the two sources emitted the error. - -**emits** when an element is available from first stream or first stream closed without emitting any elements and an element -is available from the second stream - -**backpressures** when downstream backpressures - -**completes** the primary stream completes after emitting at least one element, when the primary stream completes -without emitting and the secondary stream already has completed or when the secondary stream completes - ---------------------------------------------------------------- - -### interleave - -Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one -source completes the rest of the other stream will be emitted. - -**emits** when element is available from the currently consumed upstream - -**backpressures** when upstream backpressures - -**completes** when both upstreams have completed - ---------------------------------------------------------------- - -
- -## Fan-out stages - -These have one input and multiple outputs. They might route the elements between different outputs, or emit elements on -multiple outputs at the same time. - ---------------------------------------------------------------- - -### unzip - -Takes a stream of two element tuples and unzips the two elements ino two different downstreams. - -**emits** when all of the outputs stops backpressuring and there is an input element available - -**backpressures** when any of the outputs backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### unzipWith - -Splits each element of input into multiple downstreams using a function - -**emits** when all of the outputs stops backpressuring and there is an input element available - -**backpressures** when any of the outputs backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### broadcast - -Emit each incoming element each of `n` outputs. - -**emits** when all of the outputs stops backpressuring and there is an input element available - -**backpressures** when any of the outputs backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### balance - -Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. - -**emits** when any of the outputs stops backpressuring; emits the element to the first available output - -**backpressures** when all of the outputs backpressure - -**completes** when upstream completes - ---------------------------------------------------------------- - -### partition - -Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the -partitioner function applied to the element. - -**emits** when the chosen output stops backpressuring and there is an input element available - -**backpressures** when the chosen output backpressures - -**completes** when upstream completes and no output is pending - ---------------------------------------------------------------- - -
- -## Watching status stages - ---------------------------------------------------------------- - -### watchTermination - -Materializes to a `CompletionStage` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. -The stage otherwise passes through elements unchanged. - -**emits** when input has an element available - -**backpressures** when output backpressures - -**completes** when upstream completes - ---------------------------------------------------------------- - -### monitor - -Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage. The stage otherwise -passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an -event, and may therefore affect performance. - -**emits** when upstream emits an element - -**backpressures** when downstream **backpressures** - -**completes** when upstream completes - ---------------------------------------------------------------- +../../scala/stream/stages-overview.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/java/stream/stream-composition.md b/akka-docs/src/main/paradox/java/stream/stream-composition.md index 295cb0964b..e1817dce2e 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-composition.md +++ b/akka-docs/src/main/paradox/java/stream/stream-composition.md @@ -1,264 +1 @@ -# Modularity, Composition and Hierarchy - -Akka Streams provide a uniform model of stream processing graphs, which allows flexible composition of reusable -components. In this chapter we show how these look like from the conceptual and API perspective, demonstrating -the modularity aspects of the library. - -## Basics of composition and modularity - -Every processing stage used in Akka Streams can be imagined as a "box" with input and output ports where elements to -be processed arrive and leave the stage. In this view, a `Source` is nothing else than a "box" with a single -output port, or, a `BidiFlow` is a "box" with exactly two input and two output ports. In the figure below -we illustrate the most common used stages viewed as "boxes". - -![compose_shapes.png](../../images/compose_shapes.png) - -The *linear* stages are `Source`, `Sink` -and `Flow`, as these can be used to compose strict chains of processing stages. -Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build -more complex graph layouts, not just chains. `BidiFlow` stages are usually useful in IO related tasks, where -there are input and output channels to be handled. Due to the specific shape of `BidiFlow` it is easy to -stack them on top of each other to build a layered protocol for example. The `TLS` support in Akka is for example -implemented as a `BidiFlow`. - -These reusable components already allow the creation of complex processing networks. What we -have seen so far does not implement modularity though. It is desirable for example to package up a larger graph entity into -a reusable component which hides its internals only exposing the ports that are meant to the users of the module -to interact with. One good example is the `Http` server component, which is encoded internally as a -`BidiFlow` which interfaces with the client TCP connection using an input-output port pair accepting and sending -`ByteString` s, while its upper ports emit and receive `HttpRequest` and `HttpResponse` instances. - -The following figure demonstrates various composite stages, that contain various other type of stages internally, but -hiding them behind a *shape* that looks like a `Source`, `Flow`, etc. - -![compose_composites.png](../../images/compose_composites.png) - -One interesting example above is a `Flow` which is composed of a disconnected `Sink` and `Source`. -This can be achieved by using the `fromSinkAndSource()` constructor method on `Flow` which takes the two parts as -parameters. - -Please note that when combining a `Flow` using that method, the termination signals are not carried -"through" as the `Sink` and `Source` are assumed to be fully independent. If however you want to construct -a `Flow` like this but need the termination events to trigger "the other side" of the composite flow, you can use -`CoupledTerminationFlow.fromSinkAndSource` which does just that. For example the cancelation of the composite flows -source-side will then lead to completion of its sink-side. Read `CoupledTerminationFlow`'s scaladoc for a -detailed explanation how this works. - -The example `BidiFlow` demonstrates that internally a module can be of arbitrary complexity, and the exposed -ports can be wired in flexible ways. The only constraint is that all the ports of enclosed modules must be either -connected to each other, or exposed as interface ports, and the number of such ports needs to match the requirement -of the shape, for example a `Source` allows only one exposed output port, the rest of the internal ports must -be properly connected. - -These mechanics allow arbitrary nesting of modules. For example the following figure demonstrates a `RunnableGraph` -that is built from a composite `Source` and a composite `Sink` (which in turn contains a composite -`Flow`). - -![compose_nested_flow.png](../../images/compose_nested_flow.png) - -The above diagram contains one more shape that we have not seen yet, which is called `RunnableGraph`. It turns -out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is *closed*. -This is what the `RunnableGraph` class represents. This is the shape that a `Materializer` can take -and turn into a network of running entities that perform the task described. In fact, a `RunnableGraph` is a -module itself, and (maybe somewhat surprisingly) it can be used as part of larger graphs. It is rarely useful to embed -a closed graph shape in a larger graph (since it becomes an isolated island as there are no open port for communication -with the rest of the graph), but this demonstrates the uniform underlying model. - -If we try to build a code snippet that corresponds to the above diagram, our first try might look like this: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #non-nested-flow } - -It is clear however that there is no nesting present in our first attempt, since the library cannot figure out -where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the -DSL provided by the `Flow`, `Source`, `Sink` classes then nesting can be achieved by calling one of the -methods `withAttributes()` or `named()` (where the latter is just a shorthand for adding a name attribute). - -The following code demonstrates how to achieve the desired nesting: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #nested-flow } - -Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If -we hide some of the internals of our composites, the result looks just like if any other predefine component has been -used: - -![compose_nested_flow_opaque.png](../../images/compose_nested_flow_opaque.png) - -If we look at usage of built-in components, and our custom components, there is no difference in usage as the code -snippet below demonstrates. - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #reuse } - -## Composing complex systems - -In the previous section we explored the possibility of composition, and hierarchy, but we stayed away from non-linear, -generalized graph components. There is nothing in Akka Streams though that enforces that stream processing layouts -can only be linear. The DSL for `Source` and friends is optimized for creating such linear chains, as they are -the most common in practice. There is a more advanced DSL for building complex graphs, that can be used if more -flexibility is needed. We will see that the difference between the two DSLs is only on the surface: the concepts they -operate on are uniform across all DSLs and fit together nicely. - -As a first example, let's look at a more complex layout: - -![compose_graph.png](../../images/compose_graph.png) - -The diagram shows a `RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore -can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages, -directed and non-directed cycles. The `runnable()` method of the `GraphDSL` factory object allows the creation of a -general, closed, and runnable graph. For example the network on the diagram can be realized like this: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph } - -In the code above we used the implicit port numbering feature to make the graph more readable and similar to the diagram. -It is possible to refer to the ports, so another version might look like this: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph-alt } - -Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but -the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL. -The way to do it is to use the `create()` method on `GraphDSL` factory. If we remove the sources and sinks -from the previous example, what remains is a partial graph: - -![compose_graph_partial.png](../../images/compose_graph_partial.png) - -We can recreate a similar graph in code, using the DSL in a similar way than before: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-graph } - -The only new addition is the return value of the builder block, which is a `Shape`. All graphs (including -`Source`, `BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example -there is exactly one input and output port left, so we can declare it to have a `FlowShape` by returning an -instance of it. While it is possible to create new `Shape` types, it is usually recommended to use one of the -matching built-in ones. - -The resulting graph is already a properly wrapped module, so there is no need to call *named()* to encapsulate the graph, but -it is a good practice to give names to modules to help debugging. - -![compose_graph_shape.png](../../images/compose_graph_shape.png) - -Since our partial graph has the right shape, it can be already used in the simpler, linear DSL: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-use } - -It is not possible to use it as a `Flow` yet, though (i.e. we cannot call `.filter()` on it), but `Flow` -has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are similar methods on `Source`, -`Sink` and `BidiShape`, so it is easy to get back to the simpler DSL if a graph has the right shape. -For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods. -To demonstrate this, we will create the following graph: - -![compose_graph_flow.png](../../images/compose_graph_flow.png) - -The code version of the above closed graph might look like this: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-flow-dsl } - -@@@ note - -All graph builder sections check if the resulting graph has all ports connected except the exposed ones and will -throw an exception if this is violated. - -@@@ - -We are still in debt of demonstrating that `RunnableGraph` is a component just like any other, which can -be embedded in graphs. In the following snippet we embed one closed graph in another: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #embed-closed } - -The type of the imported module indicates that the imported module has a `ClosedShape`, and so we are not -able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly, -and will be materialized just like any other module that is part of the graph. - -As we have demonstrated, the two DSLs are fully interoperable, as they encode a similar nested structure of "boxes with -ports", it is only the DSLs that differ to be as much powerful as possible on the given abstraction level. It is possible -to embed complex graphs in the fluid DSL, and it is just as easy to import and embed a `Flow`, etc, in a larger, -complex structure. - -We have also seen, that every module has a `Shape` (for example a `Sink` has a `SinkShape`) -independently which DSL was used to create it. This uniform representation enables the rich composability of various -stream processing entities in a convenient way. - -## Materialized values - -After realizing that `RunnableGraph` is nothing more than a module with no unused ports (it is an island), it becomes clear that -after materialization the only way to communicate with the running stream processing logic is via some side-channel. -This side channel is represented as a *materialized value*. The situation is similar to `Actor` s, where the -`Props` instance describes the actor logic, but it is the call to `actorOf()` that creates an actually running -actor, and returns an `ActorRef` that can be used to communicate with the running actor itself. Since the -`Props` can be reused, each call will return a different reference. - -When it comes to streams, each materialization creates a new running network corresponding to the blueprint that was -encoded in the provided `RunnableGraph`. To be able to interact with the running network, each materialization -needs to return a different object that provides the necessary interaction capabilities. In other words, the -`RunnableGraph` can be seen as a factory, which creates: - - * a network of running processing entities, inaccessible from the outside - * a materialized value, optionally providing a controlled interaction capability with the network - -Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple -stages or modules, we need to combine the materialized value as well (there are default rules which make this easier, -for example *to()* and *via()* takes care of the most common case of taking the materialized value to the left. -See @ref:[Combining materialized values](../stream/stream-flows-and-basics.md#flow-combine-mat) for details). We demonstrate how this works by a code example and a diagram which -graphically demonstrates what is happening. - -The propagation of the individual materialized values from the enclosed modules towards the top will look like this: - -![compose_mat.png](../../images/compose_mat.png) - -To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a -materialized type of `CompletableFuture>>`. By using the combiner function `Keep.left()`, the resulting materialized -type is of the nested module (indicated by the color *red* on the diagram): - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-1 } - -Next, we create a composite `Flow` from two smaller components. Here, the second enclosed `Flow` has a -materialized type of `CompletionStage`, and we propagate this to the parent by using `Keep.right()` -as the combiner function (indicated by the color *yellow* on the diagram): - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-2 } - -As a third step, we create a composite `Sink`, using our `nestedFlow` as a building block. In this snippet, both -the enclosed `Flow` and the folding `Sink` has a materialized value that is interesting for us, so -we use `Keep.both()` to get a `Pair` of them as the materialized type of `nestedSink` (indicated by the color -*blue* on the diagram) - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-3 } - -As the last example, we wire together `nestedSource` and `nestedSink` and we use a custom combiner function to -create a yet another materialized type of the resulting `RunnableGraph`. This combiner function just ignores -the `CompletionStage` part, and wraps the other two values in a custom case class `MyClass` -(indicated by color *purple* on the diagram): - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4a } - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4b } - -@@@ note - -The nested structure in the above example is not necessary for combining the materialized values, it just -demonstrates how the two features work together. See @ref:[Operator Fusion](stream-flows-and-basics.md#operator-fusion) for further examples -of combining materialized values without nesting and hierarchy involved. - -@@@ - -## Attributes - -We have seen that we can use `named()` to introduce a nesting level in the fluid DSL (and also explicit nesting by using -`create()` from `GraphDSL`). Apart from having the effect of adding a nesting level, `named()` is actually -a shorthand for calling `withAttributes(Attributes.name("someName"))`. Attributes provide a way to fine-tune certain -aspects of the materialized running entity. For example buffer sizes for asynchronous stages can be controlled via -attributes (see @ref:[Buffers for asynchronous stages](stream-rate.md#async-stream-buffers)). When it comes to hierarchic composition, attributes are inherited -by nested modules, unless they override them with a custom value. - -The code below, a modification of an earlier example sets the `inputBuffer` attribute on certain modules, but not -on others: - -@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #attributes-inheritance } - -The effect is, that each module inherits the `inputBuffer` attribute from its enclosing parent, unless it has -the same attribute explicitly set. `nestedSource` gets the default attributes from the materializer itself. `nestedSink` -on the other hand has this attribute set, so it will be used by all nested modules. `nestedFlow` will inherit from `nestedSink` -except the `map` stage which has again an explicitly provided attribute overriding the inherited one. - -![compose_attributes.png](../../images/compose_attributes.png) - -This diagram illustrates the inheritance process for the example code (representing the materializer default attributes -as the color *red*, the attributes set on `nestedSink` as *blue* and the attributes set on `nestedFlow` as *green*). +../../scala/stream/stream-composition.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md b/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md index 4e29af8c2e..77eaec108c 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md +++ b/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md @@ -1,301 +1 @@ -# Basics and working with Flows - - -## Core concepts - -Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This -latter property is what we refer to as *boundedness* and it is the defining feature of Akka Streams. Translated to -everyday terms it is possible to express a chain (or as we see later, graphs) of processing entities, each executing -independently (and possibly concurrently) from the others while only buffering a limited number of elements at any given -time. This property of bounded buffers is one of the differences from the actor model, where each actor usually has -an unbounded, or a bounded, but dropping mailbox. Akka Stream processing entities have bounded "mailboxes" that -do not drop. - -Before we move on, let's define some basic terminology which will be used throughout the entire documentation: - -Stream -: An active process that involves moving and transforming data. - -Element -: An element is the processing unit of streams. All operations transform and transfer elements from upstream to -downstream. Buffer sizes are always expressed as number of elements independently form the actual size of the elements. - -Back-pressure -: A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively -slowing down the upstream producer to match their consumption speeds. -In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*. - -Non-Blocking -: Means that a certain operation does not hinder the progress of the calling thread, even if it takes long time to -finish the requested operation. - -Graph -: A description of a stream processing topology, defining the pathways through which elements shall flow when the stream -is running. - -Processing Stage -: The common name for all building blocks that build up a Graph. -Examples of a processing stage would be operations like `map()`, `filter()`, custom `GraphStage` s and graph -junctions like `Merge` or `Broadcast`. For the full list of built-in processing stages see @ref:[stages overview](stages-overview.md) - - -When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka -Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they -will use asynchronous means to slow down a fast producer, without blocking its thread. This is a thread-pool friendly -design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but -can hand it back for further use to an underlying thread-pool. - - -## Defining and running streams - -Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: - -Source -: A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are -ready to receive them. - -Sink -: A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream -producer of elements - -Flow -: A processing stage which has *exactly one input and output*, which connects its up- and downstreams by -transforming the data elements flowing through it. - -RunnableGraph -: A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be `run()`. - - -It is possible to attach a `Flow` to a `Source` resulting in a composite source, and it is also possible to prepend -a `Flow` to a `Sink` to get a new sink. After a stream is properly terminated by having both a source and a sink, -it will be represented by the `RunnableGraph` type, indicating that it is ready to be executed. - -It is important to remember that even after constructing the `RunnableGraph` by connecting all the source, sink and -different processing stages, no data will flow through it until it is materialized. Materialization is the process of -allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve -starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, -thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have -one actor prepare the work, and then have it be materialized at some completely different place in the code. - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-in-steps } - -After running (materializing) the `RunnableGraph` we get a special container object, the `MaterializedMap`. Both -sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation -dependent. For example a `FoldSink` will make a `CompletionStage` available in this map which will represent the result -of the folding process over the stream. In general, a stream can expose multiple materialized values, -but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason -there is a convenience method called `runWith()` available for `Sink`, `Source` or `Flow` requiring, respectively, -a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or -both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet). - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-runWith } - -It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, -instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it: - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-immutable } - -@@@ note - -By default Akka Streams elements support **exactly one** downstream processing stage. -Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to -be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios, -by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements). - -@@@ - -In the above example we used the `runWith` method, which both materializes the stream and returns the materialized value -of the given sink or source. - -Since a stream can be materialized multiple times, the `MaterializedMap` returned is different for each materialization. -In the example below we create two running materialized instance of the stream that we described in the `runnable` -variable, and both materializations give us a different `CompletionStage` from the map even though we used the same `sink` -to refer to the future: - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #stream-reuse } - -### Defining sources, sinks and flows - -The objects `Source` and `Sink` define various ways to create sources and sinks of elements. The following -examples show some of the most useful constructs (refer to the API documentation for more details): - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-sink } - -There are various ways to wire up different parts of a stream, the following examples show some of the available options: - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-connecting } - -### Illegal stream elements - -In accordance to the Reactive Streams specification ([Rule 2.13](https://github.com/reactive-streams/reactive-streams-jvm#2.13)) -Akka Streams do not allow `null` to be passed through the stream as an element. In case you want to model the concept -of absence of a value we recommend using `java.util.Optional` which is available since Java 8. - - -## Back-pressure explained - -Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the [Reactive Streams](http://reactive-streams.org/) -specification, which Akka is a founding member of. - -The user of the library does not have to write any explicit back-pressure handling code — it is built in -and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add -explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important -in complex processing graphs which may even contain loops (which *must* be treated with very special -care, as explained in @ref:[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles)). - -The back pressure protocol is defined in terms of the number of elements a downstream `Subscriber` is able to receive -and buffer, referred to as `demand`. -The source of data, referred to as `Publisher` in Reactive Streams terminology and implemented as `Source` in Akka -Streams, guarantees that it will never emit more elements than the received total demand for any given `Subscriber`. - -@@@ note - -The Reactive Streams specification defines its protocol in terms of `Publisher` and `Subscriber`. -These types are **not** meant to be user facing API, instead they serve as the low level building blocks for -different Reactive Streams implementations. - -Akka Streams implements these concepts as `Source`, `Flow` (referred to as `Processor` in Reactive Streams) -and `Sink` without exposing the Reactive Streams interfaces directly. -If you need to integrate with other Reactive Stream libraries read @ref:[Integrating with Reactive Streams](stream-integrations.md#reactive-streams-integration). - -@@@ - -The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode", -since it will switch between push and pull based back-pressure models depending on the downstream being able to cope -with the upstream production rate or not. - -To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them: - -### Slow Publisher, fast Subscriber - -This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are -rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now -slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled -during such situations, however we do not want to pay a high penalty for this safety net being enabled. - -The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher -`Request(int n)` signals. The protocol guarantees that the Publisher will never signal *more* elements than the -signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher -rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means -that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. - -As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing -elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. - -### Fast Publisher, slow Subscriber - -This is the case when back-pressuring the `Publisher` is required, because the `Subscriber` is not able to cope with -the rate at which its upstream would like to emit data elements. - -Since the `Publisher` is not allowed to signal more elements than the pending demand signalled by the `Subscriber`, -it will have to abide to this back-pressure by applying one of the below strategies: - - * not generate elements, if it is able to control their production rate, - * try buffering the elements in a *bounded* manner until more demand is signalled, - * drop elements until more demand is signalled, - * tear down the stream if unable to apply any of the above strategies. - -As we can see, this scenario effectively means that the `Subscriber` will *pull* the elements from the Publisher – -this mode of operation is referred to as pull-based back-pressure. - - -## Stream Materialization - -When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. -Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources -it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, -but is not restricted to that—it could also mean opening files or socket connections etc.—depending on what the stream needs. - -Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the `run()` -and `runWith()` methods defined on `Source` or `Flow` elements as well as a small number of special syntactic sugars for running with -well-known sinks, such as `runForeach(el -> ...)` (being an alias to `runWith(Sink.foreach(el -> ...))`. - -Materialization is currently performed synchronously on the materializing thread. -The actual stream processing is handled by actors started up during the streams materialization, -which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in -`MaterializationSettings` while constructing the `ActorMaterializer`. - -@@@ note - -Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal, -yet will materialize that stage multiple times. - -@@@ - - -### Operator Fusion - -By default Akka Streams will fuse the stream operators. This means that the processing steps of a flow or -stream graph can be executed within the same Actor and has two consequences: - - * passing elements from one processing stage to the next is a lot faster between fused -stages due to avoiding the asynchronous messaging overhead - * fused stream processing stages does not run in parallel to each other, meaning that -only up to one CPU core is used for each fused part - -To allow for parallel processing you will have to insert asynchronous boundaries manually into your flows and -graphs by way of adding `Attributes.asyncBoundary` using the method `async` on `Source`, `Sink` and `Flow` -to pieces that shall communicate with the rest of the graph in an asynchronous fashion. - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-async } - -In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding -and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can -work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a -flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work -by adding information to the flow graph that has been constructed up to this point: - -![asyncBoundary.png](../../images/asyncBoundary.png) - -This means that everything that is inside the red bubble will be executed by one actor and everything outside of it -by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all -processing stages that have been added since them. - -@@@ warning - -Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer -that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers -may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer -there, data elements are passed without buffering between fused stages. In those cases where buffering -is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the -`.buffer()` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. - -@@@ - -The new fusing behavior can be disabled by setting the configuration parameter `akka.stream.materializer.auto-fusing=off`. -In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the -`SslTlsStage` and the `groupBy` operator all built-in processing stages can be fused. - -### Combining materialized values - -Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary -to somehow express how these values should be composed to a final value when we plug these stages together. For this, -many combinator methods have variants that take an additional argument, a function, that will be used to combine the -resulting values. Some examples of using these combiners are illustrated in the example below. - -@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-mat-combine } - -@@@ note - -In Graphs it is possible to access the materialized value from inside the stream processing graph. For details see @ref:[Accessing the materialized value inside the Graph](stream-graphs.md#graph-matvalue). - -@@@ - -## Stream ordering - -In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs `{IA1,IA2,...,IAn}` -"cause" outputs `{OA1,OA2,...,OAk}` and inputs `{IB1,IB2,...,IBm}` "cause" outputs `{OB1,OB2,...,OBl}` and all of -`IAi` happened before all `IBi` then `OAi` happens before `OBi`. - -This property is even uphold by async operations such as `mapAsync`, however an unordered version exists -called `mapAsyncUnordered` which does not preserve this ordering. - -However, in the case of Junctions which handle multiple input streams (e.g. `Merge`) the output order is, -in general, *not defined* for elements arriving on different input ports. That is a merge-like operation may emit `Ai` -before emitting `Bi`, and it is up to its internal logic to decide the order of emitted elements. Specialized elements -such as `Zip` however *do guarantee* their outputs order, as each output element depends on all upstream elements having -been signalled already – thus the ordering in the case of zipping is defined by this property. - -If you find yourself in need of fine grained control over order of emitted elements in fan-in -scenarios consider using `MergePreferred` or `GraphStage` – which gives you full control over how the -merge is performed. +../../scala/stream/stream-flows-and-basics.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/scala/stream/stages-overview.md b/akka-docs/src/main/paradox/scala/stream/stages-overview.md index 48105789a4..0ebafc4a2a 100644 --- a/akka-docs/src/main/paradox/scala/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/scala/stream/stages-overview.md @@ -4,14 +4,14 @@ ## Source stages -These built-in sources are available from `akka.stream.scaladsl.Source`: +These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] @java[`akka.stream.javadsl.Source`]: --------------------------------------------------------------- ### fromIterator Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew -for each materialization, which is the reason the method takes a function rather than an iterator directly. +for each materialization, which is the reason the @scala[`method`] @java[`factory`] takes a @scala[`function`] @java[`Creator`] rather than an `Iterator` directly. If the iterator perform blocking operations, make sure to run it on a separate dispatcher. @@ -21,6 +21,8 @@ If the iterator perform blocking operations, make sure to run it on a separate d --------------------------------------------------------------- +@@@ div { .group-scala } + ### apply Stream the values of an `immutable.Seq`. @@ -29,6 +31,17 @@ Stream the values of an `immutable.Seq`. **completes** when the last element of the seq has been emitted +@@@ + +@@@ div { .group-java } + +### from + +Stream the values of an `Iterable`. Make sure the `Iterable` is immutable or at least not modified after being used +as a source. + +@@@ + --------------------------------------------------------------- ### single @@ -88,7 +101,7 @@ If the future fails the stream is failed with that exception. ### fromCompletionStage -Send the single value of the Java `CompletionStage` when it completes and there is demand. +Send the single value of the `CompletionStage` when it completes and there is demand. If the future fails the stream is failed with that exception. **emits** the future completes @@ -121,8 +134,8 @@ If the *completion* fails the stream is failed with that exception. ### unfold -Stream the result of a function as long as it returns a `Some`, the value inside the option -consists of a tuple where the first value is a state passed back into the next call to the function allowing +Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`], the value inside the option +consists of a @scala[tuple] @java[pair] where the first value is a state passed back into the next call to the function allowing to pass a state. The first invocation of the provided fold function will receive the `zero` state. Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. @@ -135,14 +148,14 @@ Can be used to implement many stateful sources without having to touch the more ### unfoldAsync -Just like `unfold` but the fold function returns a `Future` which will cause the source to +Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`] which will cause the source to complete or emit when it completes. Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. **emits** when there is demand and unfold state returned future completes with some value -**completes** when the future returned by the unfold function completes with an empty value +**completes** when the @scala[future] @java[CompletionStage] returned by the unfold function completes with an empty value --------------------------------------------------------------- @@ -159,8 +172,8 @@ an API but there are no elements to emit. ### maybe -Materialize a `Promise[Option[T]]` that if completed with a `Some[T]` will emit that *T* and then complete -the stream, or if completed with `None` complete the stream right away. +Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] +will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away. **emits** when the returned promise is completed with some value @@ -206,7 +219,17 @@ elements or failing the stream, the strategy is chosen by the user. **emits** when there is demand and there are messages in the buffer or a message is sent to the actorref -**completes** when the actorref is sent `akka.actor.Status.Success` or `PoisonPill` +**completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill` + +--------------------------------------------------------------- + +### range + +Emit each integer in a range, with an option to take bigger steps than 1. + +**emits** when there is demand, the next value + +**completes** when the end of the range has been reached --------------------------------------------------------------- @@ -224,7 +247,7 @@ Combine several sources, using a given strategy such as merge or concat, into on Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. -**emits** when there is demand and read function returns value +**emits** when there is demand and read @scala[function] @java[method] returns value **completes** when read function returns `None` @@ -233,11 +256,11 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa ### unfoldResourceAsync Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. -Functions return `Future` to achieve asynchronous processing +Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchronous processing -**emits** when there is demand and `Future` from read function returns value +**emits** when there is demand and @scala[`Future`] @java[`CompletionStage`] from read function returns value -**completes** when `Future` from read function returns `None` +**completes** when @scala[`Future`] @java[`CompletionStage`] from read function returns `None` --------------------------------------------------------------- @@ -290,14 +313,14 @@ Combine the elements of multiple streams into a stream of sequences using a comb ## Sink stages -These built-in sinks are available from `akka.stream.scaladsl.Sink`: +These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @java[`akka.stream.javadsl.Sink`]: --------------------------------------------------------------- ### head -Materializes into a `Future` which completes with the first value arriving, -after this the stream is canceled. If no element is emitted, the future is be failed. +Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, +after this the stream is canceled. If no element is emitted, the @scala[`Future`] @java[`CompletionStage`] is failed. **cancels** after receiving one element @@ -307,8 +330,8 @@ after this the stream is canceled. If no element is emitted, the future is be fa ### headOption -Materializes into a `Future[Option[T]]` which completes with the first value arriving wrapped in a `Some`, -or a `None` if the stream completes without any elements emitted. +Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`], +or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted. **cancels** after receiving one element @@ -318,8 +341,8 @@ or a `None` if the stream completes without any elements emitted. ### last -Materializes into a `Future` which will complete with the last value emitted when the stream -completes. If the stream completes with no elements the future is failed. +Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream +completes. If the stream completes with no elements the @scala[`Future`] @java[`CompletionStage`] is failed. **cancels** never @@ -329,9 +352,9 @@ completes. If the stream completes with no elements the future is failed. ### lastOption -Materialize a `Future[Option[T]]` which completes with the last value -emitted wrapped in an `Some` when the stream completes. if the stream completes with no elements the future is -completed with `None`. +Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the last value +emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes. if the stream completes with no elements the `CompletionStage` is +completed with @scala[`None`] @java[an empty `Optional`]. **cancels** never @@ -360,8 +383,8 @@ Immediately cancel the stream ### seq -Collect values emitted from the stream into a collection, the collection is available through a `Future` or -which completes when the stream completes. Note that the collection is bounded to `Int.MaxValue`, +Collect values emitted from the stream into a collection, the collection is available through a @scala[`Future`] @java[`CompletionStage`] or +which completes when the stream completes. Note that the collection is bounded to @scala[`Int.MaxValue`] @java[`Integer.MAX_VALUE`], if more element are emitted the sink will cancel the stream **cancels** If too many values are collected @@ -372,7 +395,7 @@ if more element are emitted the sink will cancel the stream Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. -The sink materializes into a `Future[Option[Done]]` which completes when the +The sink materializes into a @scala[`Future[Option[Done]]`] @java[`CompletionStage`] which completes when the stream completes, or fails if the stream fails. Note that it is not safe to mutate state from the procedure. @@ -430,7 +453,7 @@ a buffer in case stream emitting elements faster than queue pulling them. Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation. The first invocation will be provided the `zero` value. -Materializes into a future that will complete with the last state when the stream has completed. +Materializes into a @scala[`Future`] @java[`CompletionStage`] that will complete with the last state when the stream has completed. This stage allows combining values into a result without a global mutable state by instead passing the state along between invocations. @@ -446,7 +469,7 @@ between invocations. Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation receives the two first elements of the flow. -Materializes into a future that will be completed by the last result of the reduction function. +Materializes into a @scala[`Future`] @java[`CompletionStage`] that will be completed by the last result of the reduction function. **cancels** never @@ -525,7 +548,7 @@ dispatcher configured through the `akka.stream.blocking-io-dispatcher`. Create a sink that wraps an `OutputStream`. Takes a function that produces an `OutputStream`, when the sink is materialized the function will be called and bytes sent to the sink will be written to the returned `OutputStream`. -Materializes into a `Future` which will complete with a `IOResult` when the stream +Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with a `IOResult` when the stream completes. Note that a flow can be materialized multiple times, so the function producing the `OutputStream` must be able @@ -551,7 +574,7 @@ The `InputStream` will be ended when the stream flowing into this `Sink` complet Create a source that wraps an `InputStream`. Takes a function that produces an `InputStream`, when the source is materialized the function will be called and bytes from the `InputStream` will be emitted into the stream. -Materializes into a `Future` which will complete with a `IOResult` when the stream +Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with a `IOResult` when the stream completes. Note that a flow can be materialized multiple times, so the function producing the `InputStream` must be able @@ -593,7 +616,7 @@ downstream on demand. ### javaCollector -Create a sink which materializes into a `Future` which will be completed with a result of the Java 8 `Collector` +Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. The `Collector` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable result container, optionally transformed into a final representation after all input elements have been processed. @@ -606,7 +629,7 @@ to handle multiple invocations. ### javaCollectorParallelUnordered -Create a sink which materializes into a `Future` which will be completed with a result of the Java 8 `Collector` +Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. The `Collector` is triggering demand downstream. Elements emitted through the stream will be accumulated into a mutable result container, optionally transformed into a final representation after all input elements have been processed. @@ -627,7 +650,7 @@ Sources and sinks for reading and writing files can be found on `FileIO`. ### fromPath -Emit the contents of a file, as `ByteString` s, materializes into a `Future` which will be completed with +Emit the contents of a file, as `ByteString` s, materializes into a @scala[`Future`] @java[`CompletionStage`]` which will be completed with a `IOResult` upon reaching the end of the file or if there is a failure. --------------------------------------------------------------- @@ -786,13 +809,13 @@ the second element is required from downstream. ### scanAsync -Just like `scan` but receiving a function that results in a `Future` to the next value. +Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. -**emits** when the `Future` resulting from the function scanning the element resolves to the next value +**emits** when the @scala[`Future`] @java[`CompletionStage`] resulting from the function scanning the element resolves to the next value **backpressures** when downstream backpressures -**completes** when upstream completes and the last `Future` is resolved +**completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved --------------------------------------------------------------- @@ -811,13 +834,13 @@ complete the current value is emitted downstream. ### foldAsync -Just like `fold` but receiving a function that results in a `Future` to the next value. +Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. -**emits** when upstream completes and the last `Future` is resolved +**emits** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved **backpressures** when downstream backpressures -**completes** when upstream completes and the last `Future` is resolved +**completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved --------------------------------------------------------------- @@ -1012,7 +1035,7 @@ Evaluated cost of each element defines how many elements will be allowed to trav Log elements flowing through the stream as well as completion and erroring. By default element and completion signals are logged on debug level, and errors are logged on Error level. -This can be changed by calling `Attributes.logLevels(...)` on the given Flow. +This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow. **emits** when upstream emits @@ -1078,38 +1101,38 @@ The order in which the *in* and *out* sides receive their respective completion ## Asynchronous processing stages These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous -operation at the same time (usually handling the completion of a Future). +operation at the same time (usually handling the completion of a @scala[`Future`] @java[`CompletionStage`]). --------------------------------------------------------------- ### mapAsync -Pass incoming elements to a function that return a `Future` result. When the future arrives the result is passed +Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result. When the @scala[`Future`] @java[`CompletionStage`] arrives the result is passed downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming order will be kept when results complete. For use cases where order does not mather `mapAsyncUnordered` can be used. -If a Future fails, the stream also fails (unless a different supervision strategy is applied) +If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) -**emits** when the Future returned by the provided function finishes for the next element in sequence +**emits** when the @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence -**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures +**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures -**completes** when upstream completes and all futures has been completed and all elements has been emitted +**completes** when upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted --------------------------------------------------------------- ### mapAsyncUnordered -Like `mapAsync` but `Future` results are passed downstream as they arrive regardless of the order of the elements +Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them. -If a Future fails, the stream also fails (unless a different supervision strategy is applied) +If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) -**emits** any of the Futures returned by the provided function complete +**emits** any of the @scala[`Future` s] @java[`CompletionStage` s] returned by the provided function complete -**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures +**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures -**completes** upstream completes and all futures has been completed and all elements has been emitted +**completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted --------------------------------------------------------------- @@ -1560,7 +1583,7 @@ Merge multiple sources. Prefer one source if all sources has elements ready. ### zip -Combines elements from each of multiple sources into tuples and passes the tuples downstream. +Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. **emits** when all of the inputs have an element available @@ -1744,7 +1767,7 @@ partitioner function applied to the element. ### watchTermination -Materializes to a `Future` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. +Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. The stage otherwise passes through elements unchanged. **emits** when input has an element available diff --git a/akka-docs/src/main/paradox/scala/stream/stream-composition.md b/akka-docs/src/main/paradox/scala/stream/stream-composition.md index d7bf780916..1c55761394 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-composition.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-composition.md @@ -35,7 +35,7 @@ hiding them behind a *shape* that looks like a `Source`, `Flow`, etc. One interesting example above is a `Flow` which is composed of a disconnected `Sink` and `Source`. This can be achieved by using the `fromSinkAndSource()` constructor method on `Flow` which takes the two parts as -parameters. +parameters. Please note that when combining a `Flow` using that method, the termination signals are not carried "through" as the `Sink` and `Source` are assumed to be fully independent. If however you want to construct @@ -66,7 +66,12 @@ with the rest of the graph), but this demonstrates the uniform underlying model. If we try to build a code snippet that corresponds to the above diagram, our first try might look like this: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #non-nested-flow } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #non-nested-flow } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #non-nested-flow } + It is clear however that there is no nesting present in our first attempt, since the library cannot figure out where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the @@ -75,7 +80,11 @@ methods `withAttributes()` or `named()` (where the latter is just a shorthand fo The following code demonstrates how to achieve the desired nesting: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #nested-flow } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #nested-flow } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #nested-flow } Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If we hide some of the internals of our composites, the result looks just like if any other predefine component has been @@ -86,7 +95,11 @@ used: If we look at usage of built-in components, and our custom components, there is no difference in usage as the code snippet below demonstrates. -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #reuse } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #reuse } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #reuse } ## Composing complex systems @@ -106,13 +119,21 @@ can be materialized) that encapsulates a non-trivial stream processing network. directed and non-directed cycles. The `runnable()` method of the `GraphDSL` object allows the creation of a general, closed, and runnable graph. For example the network on the diagram can be realized like this: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph } In the code above we used the implicit port numbering feature (to make the graph more readable and similar to the diagram) and we imported `Source` s, `Sink` s and `Flow` s explicitly. It is possible to refer to the ports explicitly, and it is not necessary to import our linear stages via `add()`, so another version might look like this: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph-alt } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph-alt } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph-alt } Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL. @@ -123,7 +144,11 @@ from the previous example, what remains is a partial graph: We can recreate a similar graph in code, using the DSL in a similar way than before: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-graph } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-graph } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-graph } The only new addition is the return value of the builder block, which is a `Shape`. All graphs (including `Source`, `BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example @@ -138,7 +163,11 @@ it is a good practice to give names to modules to help debugging. Since our partial graph has the right shape, it can be already used in the simpler, linear DSL: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-use } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-use } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-use } It is not possible to use it as a `Flow` yet, though (i.e. we cannot call `.filter()` on it), but `Flow` has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are similar methods on `Source`, @@ -150,7 +179,11 @@ To demonstrate this, we will create the following graph: The code version of the above closed graph might look like this: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-flow-dsl } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-flow-dsl } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-flow-dsl } @@@ note @@ -162,7 +195,11 @@ throw an exception if this is violated. We are still in debt of demonstrating that `RunnableGraph` is a component just like any other, which can be embedded in graphs. In the following snippet we embed one closed graph in another: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #embed-closed } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #embed-closed } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #embed-closed } The type of the imported module indicates that the imported module has a `ClosedShape`, and so we are not able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly, @@ -197,38 +234,57 @@ needs to return a different object that provides the necessary interaction capab Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple stages or modules, we need to combine the materialized value as well (there are default rules which make this easier, for example *to()* and *via()* takes care of the most common case of taking the materialized value to the left. -See @ref:[Combining materialized values](stream-flows-and-basics.md#flow-combine-mat) for details). We demonstrate how this works by a code example and a diagram which -graphically demonstrates what is happening. +See @ref:[Combining materialized values](stream-flows-and-basics.md#flow-combine-mat) for details). +We demonstrate how this works by a code example and a diagram which graphically demonstrates what is happening. The propagation of the individual materialized values from the enclosed modules towards the top will look like this: ![compose_mat.png](../../images/compose_mat.png) To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a -materialized type of `Promise[[Option[Int]]`. By using the combiner function `Keep.left`, the resulting materialized +materialized type of @scala[`Promise[[Option[Int]]`] @java[`CompletableFuture>>`]. By using the combiner function `Keep.left`, the resulting materialized type is of the nested module (indicated by the color *red* on the diagram): -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-1 } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-1 } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-1 } Next, we create a composite `Flow` from two smaller components. Here, the second enclosed `Flow` has a -materialized type of `Future[OutgoingConnection]`, and we propagate this to the parent by using `Keep.right` +materialized type of @scala[`Future[OutgoingConnection]`] @java[`CompletionStage`], and we propagate this to the parent by using `Keep.right` as the combiner function (indicated by the color *yellow* on the diagram): -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-2 } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-2 } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-2 } As a third step, we create a composite `Sink`, using our `nestedFlow` as a building block. In this snippet, both the enclosed `Flow` and the folding `Sink` has a materialized value that is interesting for us, so we use `Keep.both` to get a `Pair` of them as the materialized type of `nestedSink` (indicated by the color *blue* on the diagram) -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-3 } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-3 } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-3 } As the last example, we wire together `nestedSource` and `nestedSink` and we use a custom combiner function to create a yet another materialized type of the resulting `RunnableGraph`. This combiner function just ignores -the `Future[Sink]` part, and wraps the other two values in a custom case class `MyClass` +the @scala[`Future[Sink]`] @java[`CompletionStage`] part, and wraps the other two values in a custom case class `MyClass` (indicated by color *purple* on the diagram): -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-4 } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-4 } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4a } + + @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4b } + @@@ note @@ -250,7 +306,11 @@ by nested modules, unless they override them with a custom value. The code below, a modification of an earlier example sets the `inputBuffer` attribute on certain modules, but not on others: -@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #attributes-inheritance } +Scala +: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #attributes-inheritance } + +Java +: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #attributes-inheritance } The effect is, that each module inherits the `inputBuffer` attribute from its enclosing parent, unless it has the same attribute explicitly set. `nestedSource` gets the default attributes from the materializer itself. `nestedSink` diff --git a/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md b/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md index 0cd2c6d167..2ee9025ea6 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md @@ -77,13 +77,20 @@ starting up Actors). Thanks to Flows being simply a description of the processin thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have one actor prepare the work, and then have it be materialized at some completely different place in the code. -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-in-steps } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-in-steps } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-in-steps } + +@@@ div { .group-scala } After running (materializing) the `RunnableGraph[T]` we get back the materialized value of type T. Every stream processing stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type. In the above example we used `toMat` to indicate that we want to transform the materialized value of the source and sink, and we used the convenience function `Keep.right` to say that we are only interested in the materialized value of the sink. + In our example the `FoldSink` materializes a value of type `Future` which will represent the result of the folding process over the stream. In general, a stream can expose multiple materialized values, but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason @@ -91,12 +98,36 @@ there is a convenience method called `runWith()` available for `Sink`, `Source` a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet). -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-runWith } +@@@ + +@@@ div { .group-java } + +After running (materializing) the `RunnableGraph` we get a special container object, the `MaterializedMap`. Both +sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation +dependent. + +For example a `FoldSink` will make a `CompletionStage` available in this map which will represent the result +of the folding process over the stream. In general, a stream can expose multiple materialized values, +but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason +there is a convenience method called `runWith()` available for `Sink`, `Source` or `Flow` requiring, respectively, +a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or +both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet). +@@@ + +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-runWith } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-runWith } It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it: -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-immutable } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-immutable } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-immutable } @@@ note @@ -110,32 +141,43 @@ by providing named fan-out elements such as broadcast (signals all down-stream e In the above example we used the `runWith` method, which both materializes the stream and returns the materialized value of the given sink or source. -Since a stream can be materialized multiple times, the materialized value will also be calculated anew for each such +Since a stream can be materialized multiple times, the @scala[materialized value will also be calculated anew] @java[`MaterializedMap` returned is different] for each such materialization, usually leading to different values being returned each time. In the example below we create two running materialized instance of the stream that we described in the `runnable` -variable, and both materializations give us a different `Future` from the map even though we used the same `sink` +variable, and both materializations give us a different @scala[`Future`] @java[`CompletionStage`] from the map even though we used the same `sink` to refer to the future: -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #stream-reuse } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #stream-reuse } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #stream-reuse } ### Defining sources, sinks and flows The objects `Source` and `Sink` define various ways to create sources and sinks of elements. The following examples show some of the most useful constructs (refer to the API documentation for more details): -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-sink } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-sink } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-sink } There are various ways to wire up different parts of a stream, the following examples show some of the available options: -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-connecting } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-connecting } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-connecting } ### Illegal stream elements In accordance to the Reactive Streams specification ([Rule 2.13](https://github.com/reactive-streams/reactive-streams-jvm#2.13)) Akka Streams do not allow `null` to be passed through the stream as an element. In case you want to model the concept -of absence of a value we recommend using `scala.Option` or `scala.util.Either`. +of absence of a value we recommend using @scala[`scala.Option` or `scala.util.Either`] @java[`java.util.Optional` which is available since Java 8]. - ## Back-pressure explained Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the [Reactive Streams](http://reactive-streams.org/) @@ -178,7 +220,7 @@ slower than the Publisher. In order to safeguard from these situations, the back during such situations, however we do not want to pay a high penalty for this safety net being enabled. The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher -`Request(n:Int)` signals. The protocol guarantees that the Publisher will never signal *more* elements than the +@scala[`Request(n:Int)`] @java[`Request(int n)`] signals. The protocol guarantees that the Publisher will never signal *more* elements than the signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. @@ -212,7 +254,7 @@ but is not restricted to that—it could also mean opening files or socket conne Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the `run()` and `runWith()` methods defined on `Source` and `Flow` elements as well as a small number of special syntactic sugars for running with -well-known sinks, such as `runForeach(el => ...)` (being an alias to `runWith(Sink.foreach(el => ...))`. +well-known sinks, such as @scala[`runForeach(el => ...)`] @java[`runForeach(el -> ...)`] (being an alias to @scala[`runWith(Sink.foreach(el => ...))`] @java[`runWith(Sink.foreach(el -> ...))`]. Materialization is currently performed synchronously on the materializing thread. The actual stream processing is handled by actors started up during the streams materialization, @@ -241,7 +283,11 @@ To allow for parallel processing you will have to insert asynchronous boundaries graphs by way of adding `Attributes.asyncBoundary` using the method `async` on `Source`, `Sink` and `Flow` to pieces that shall communicate with the rest of the graph in an asynchronous fashion. -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-async } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-async } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-async } In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can @@ -278,7 +324,11 @@ to somehow express how these values should be composed to a final value when we many combinator methods have variants that take an additional argument, a function, that will be used to combine the resulting values. Some examples of using these combiners are illustrated in the example below. -@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-mat-combine } +Scala +: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-mat-combine } + +Java +: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-mat-combine } @@@ note