diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java index 0e66e463d3..8957040b4e 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java @@ -42,11 +42,10 @@ public class RecipeSeq extends RecipeTest { public void drainSourceToList() throws Exception { new JavaTestKit(system) { { + final Source mySource = Source.from(Arrays.asList("1", "2", "3")); //#draining-to-list-unsafe - final Source myData = Source.from(Arrays.asList("1", "2", "3")); - final int MAX_ALLOWED_SIZE = 100; - - final CompletionStage> strings = myData.runWith(Sink.seq(), mat); // dangerous! + // Dangerous: might produce a collection with 2 billion elements! + final CompletionStage> strings = mySource.runWith(Sink.seq(), mat); //#draining-to-list-unsafe strings.toCompletableFuture().get(3, TimeUnit.SECONDS); @@ -58,14 +57,14 @@ public class RecipeSeq extends RecipeTest { public void drainSourceToListWithLimit() throws Exception { new JavaTestKit(system) { { + final Source mySource = Source.from(Arrays.asList("1", "2", "3")); //#draining-to-list-safe - final Source myData = Source.from(Arrays.asList("1", "2", "3")); final int MAX_ALLOWED_SIZE = 100; // OK. Future will fail with a `StreamLimitReachedException` // if the number of incoming elements is larger than max final CompletionStage> strings = - myData.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat); + mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat); //#draining-to-list-safe strings.toCompletableFuture().get(1, TimeUnit.SECONDS); @@ -76,13 +75,14 @@ public class RecipeSeq extends RecipeTest { public void drainSourceToListWithTake() throws Exception { new JavaTestKit(system) { { - final Source myData = Source.from(Arrays.asList("1", "2", "3")); + final Source mySource = Source.from(Arrays.asList("1", "2", "3")); final int MAX_ALLOWED_SIZE = 100; //#draining-to-list-safe + // OK. Collect up until max-th elements only, then cancel upstream final CompletionStage> strings = - myData.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat); + mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat); //#draining-to-list-safe strings.toCompletableFuture().get(1, TimeUnit.SECONDS); diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index e39bc5cb61..f634a29436 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -17,53 +17,53 @@ each materialization of the source which is the reason the factory takes a ``Cre If the iterator perform blocking operations, make sure to run it on a separate dispatcher. -*emits* the next value returned from the iterator +**emits** the next value returned from the iterator -*completes* when the iterator reaches its end +**completes** when the iterator reaches its end from ^^^^ Stream the values of an ``Iterable``. Make sure the ``Iterable`` is immutable or at least not modified after being used as a source. -*emits* the next value of the iterable +**emits** the next value of the iterable -*completes* after the last element of the iterable has been emitted +**completes** after the last element of the iterable has been emitted single ^^^^^^ Stream a single object -*emits* the value once +**emits** the value once -*completes* when the single value has been emitted +**completes** when the single value has been emitted repeat ^^^^^^ Stream a single object repeatedly -*emits* the same value repeatedly when there is demand +**emits** the same value repeatedly when there is demand -*completes* never +**completes** never tick ^^^^ A periodical repetition of an arbitrary object. Delay of first tick is specified separately from interval of the following ticks. -*emits* periodically, if there is downstream backpressure ticks are skipped +**emits** periodically, if there is downstream backpressure ticks are skipped -*completes* never +**completes** never fromCompletionStage ^^^^^^^^^^^^^^^^^^^ Send the single value of the ``CompletionStage`` when it completes and there is demand. If the ``CompletionStage`` fails the stream is failed with that exception. -*emits* when the ``CompletionStage`` completes +**emits** when the ``CompletionStage`` completes -*completes* after the ``CompletionStage`` has completed or when it fails +**completes** after the ``CompletionStage`` has completed or when it fails fromFuture @@ -71,9 +71,9 @@ fromFuture Send the single value of the Scala ``Future`` when it completes and there is demand. If the future fails the stream is failed with that exception. -*emits* the future completes +**emits** the future completes -*completes* after the future has completed +**completes** after the future has completed unfold ^^^^^^ @@ -83,9 +83,9 @@ to pass a state. The first invocation of the provided fold function will receive Can be used to implement many stateful sources without having to touch the more low level ``GraphStage`` API. -*emits* when there is demand and the unfold function over the previous state returns non empty value +**emits** when there is demand and the unfold function over the previous state returns non empty value -*completes* when the unfold function returns an empty value +**completes** when the unfold function returns an empty value unfoldAsync ^^^^^^^^^^^ @@ -94,18 +94,18 @@ complete or emit when it completes. Can be used to implement many stateful sources without having to touch the more low level ``GraphStage`` API. -*emits* when there is demand and unfold state returned CompletionStage completes with some value +**emits** when there is demand and unfold state returned CompletionStage completes with some value -*completes* when the CompletionStage returned by the unfold function completes with an empty value +**completes** when the CompletionStage returned by the unfold function completes with an empty value empty ^^^^^ Complete right away without ever emitting any elements. Useful when you have to provide a source to an API but there are no elements to emit. -*emits* never +**emits** never -*completes* directly +**completes** directly maybe ^^^^^ @@ -113,25 +113,25 @@ Materialize a ``CompletionStage`` that can be completed with an ``Optional``. If it is completed with a value it will be eimitted from the source if it is an empty ``Optional`` it will complete directly. -*emits* when the returned promise is completed with some value +**emits** when the returned promise is completed with some value -*completes* after emitting some value, or directly if the promise is completed with no value +**completes** after emitting some value, or directly if the promise is completed with no value failed ^^^^^^ Fail directly with a user specified exception. -*emits* never +**emits** never -*completes* fails the stream directly with the given exception +**completes** fails the stream directly with the given exception actorPublisher ^^^^^^^^^^^^^^ Wrap an actor extending ``ActorPublisher`` as a source. -*emits* depends on the actor implementation +**emits** depends on the actor implementation -*completes* when the actor stops +**completes** when the actor stops actorRef ^^^^^^^^ @@ -139,26 +139,26 @@ Materialize an ``ActorRef``, sending messages to it will emit them on the stream a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream, the strategy is chosen by the user. -*emits* when there is demand and there are messages in the buffer or a message is sent to the actorref +**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref -*completes* when the ``ActorRef`` is sent ``akka.actor.Status.Success`` or ``PoisonPill`` +**completes** when the ``ActorRef`` is sent ``akka.actor.Status.Success`` or ``PoisonPill`` combine ^^^^^^^ Combine several sources, using a given strategy such as merge or concat, into one source. -*emits* when there is demand, but depending on the strategy +**emits** when there is demand, but depending on the strategy -*completes* when all sources has completed +**completes** when all sources has completed range ^^^^^ Emit each integer in a range, with an option to take bigger steps than 1. -*emits* when there is demand, the next value +**emits** when there is demand, the next value -*completes* when the end of the range has been reached +**completes** when the end of the range has been reached queue ^^^^^ @@ -167,9 +167,9 @@ a buffer, if elements are pushed onto the queue faster than the source is consum a strategy specified by the user. Functionality for tracking when an element has been emitted is available through ``SourceQueue.offer``. -*emits* when there is demand and the queue contains elements +**emits** when there is demand and the queue contains elements -*completes* when downstream completes +**completes** when downstream completes asSubscriber ^^^^^^^^^^^^ @@ -193,27 +193,27 @@ head Materializes into a ``CompletionStage`` which completes with the first value arriving, after this the stream is canceled. If no element is emitted, the CompletionStage is be failed. -*cancels* after receiving one element +**cancels** after receiving one element -*backpressures* never +**backpressures** never headOption ^^^^^^^^^^ Materializes into a ``CompletionStage>`` which completes with the first value arriving wrapped in optional, or an empty optional if the stream completes without any elements emitted. -*cancels* after receiving one element +**cancels** after receiving one element -*backpressures* never +**backpressures** never last ^^^^ Materializes into a ``CompletionStage`` which will complete with the last value emitted when the stream completes. If the stream completes with no elements the CompletionStage is failed. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never lastOption ^^^^^^^^^^ @@ -221,24 +221,24 @@ Materialize a ``CompletionStage>`` which completes with the last val emitted wrapped in an optional when the stream completes. if the stream completes with no elements the ``CompletionStage`` is completed with an empty optional. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never ignore ^^^^^^ Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually do anything with the elements. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never cancelled ^^^^^^^^^ Immediately cancel the stream -*cancels* immediately +**cancels** immediately seq ^^^ @@ -246,7 +246,7 @@ Collect values emitted from the stream into a collection, the collection is avai which completes when the stream completes. Note that the collection is bounded to ``Integer.MAX_VALUE``, if more element are emitted the sink will cancel the stream -*cancels* If too many values are collected +**cancels** If too many values are collected foreach ^^^^^^^ @@ -257,27 +257,27 @@ stream completes, or fails if the stream fails. Note that it is not safe to mutate state from the procedure. -*cancels* never +**cancels** never -*backpressures* when the previous procedure invocation has not yet completed +**backpressures** when the previous procedure invocation has not yet completed foreachParallel ^^^^^^^^^^^^^^^ Like ``foreach`` but allows up to ``parallellism`` procedure calls to happen in parallel. -*cancels* never +**cancels** never -*backpressures* when the previous parallel procedure invocations has not yet completed +**backpressures** when the previous parallel procedure invocations has not yet completed onComplete ^^^^^^^^^^ Invoke a callback when the stream has completed or failed. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never fold @@ -290,9 +290,9 @@ Materializes into a CompletionStage that will complete with the last state when This stage allows combining values into a result without a global mutable state by instead passing the state along between invocations. -*cancels* never +**cancels** never -*backpressures* when the previous fold function invocation has not yet completed +**backpressures** when the previous fold function invocation has not yet completed reduce ^^^^^^ @@ -301,27 +301,27 @@ receives the two first elements of the flow. Materializes into a CompletionStage that will be completed by the last result of the reduction function. -*cancels* never +**cancels** never -*backpressures* when the previous reduction function invocation has not yet completed +**backpressures** when the previous reduction function invocation has not yet completed combine ^^^^^^^ Combine several sinks into one using a user specified strategy -*cancels* depends on the strategy +**cancels** depends on the strategy -*backpressures* depends on the strategy +**backpressures** depends on the strategy actorRef ^^^^^^^^ Send the elements from the stream to an ``ActorRef``. No backpressure so care must be taken to not overflow the inbox. -*cancels* when the actor terminates +**cancels** when the actor terminates -*backpressures* never +**backpressures** never actorRefWithAck @@ -329,9 +329,9 @@ actorRefWithAck Send the elements from the stream to an ``ActorRef`` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. -*cancels* when the actor terminates +**cancels** when the actor terminates -*backpressures* when the actor acknowledgement has not arrived +**backpressures** when the actor acknowledgement has not arrived actorSubscriber @@ -341,9 +341,9 @@ receive the elements from the stream. Materializes into an ``ActorRef`` to the created actor. -*cancels* when the actor terminates +**cancels** when the actor terminates -*backpressures* depends on the actor implementation +**backpressures** depends on the actor implementation asPublisher @@ -434,61 +434,72 @@ Simple processing stages These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a single input (e.g. `mapConcat') or consume multiple elements before emitting one output (e.g. ``filter``). However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the -rate is affected. This is in contrast with :ref:`detached-stages-overview` which can change their processing behavior +rate is affected. This is in contrast with :ref:`detached-stages-overview_java` which can change their processing behavior depending on being backpressured by downstream or not. map ^^^ Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. -*emits* when the mapping function returns an element +**emits** when the mapping function returns an element -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes mapConcat ^^^^^^^^^ Transform each element into zero or more elements that are individually passed downstream. -*emits* when the mapping function returns an element or there are still remaining elements from the previously calculated collection +**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection -*backpressures* when downstream backpressures or there are still available elements from the previously calculated collection +**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection -*completes* when upstream completes and all remaining elements has been emitted +**completes** when upstream completes and all remaining elements has been emitted + +statefulMapConcat +^^^^^^^^^^^^^^^^^ +Transform each element into zero or more elements that are individually passed downstream. The difference to ``mapConcat`` is that +the transformation function is created from a factory for every materialization of the flow. + +**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection + +**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection + +**completes** when upstream completes and all remaining elements has been emitted filter ^^^^^^ Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if it returns false the element is discarded. -*emits* when the given predicate returns true for the element +**emits** when the given predicate returns true for the element -*backpressures* when the given predicate returns true for the element and downstream backpressures +**backpressures** when the given predicate returns true for the element and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes collect ^^^^^^^ Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream. Can often replace ``filter`` followed by ``map`` to achieve the same in one single stage. -*emits* when the provided partial function is defined for the element +**emits** when the provided partial function is defined for the element -*backpressures* the partial function is defined for the element and downstream backpressures +**backpressures** the partial function is defined for the element and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes grouped ^^^^^^^ Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream. -*emits* when the specified number of elements has been accumulated or upstream completed +**emits** when the specified number of elements has been accumulated or upstream completed -*backpressures* when a group has been assembled and downstream backpressures +**backpressures** when a group has been assembled and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes sliding ^^^^^^^ @@ -496,11 +507,11 @@ Provide a sliding window over the incoming stream and pass the windows as groups Note: the last window might be smaller than the requested size due to end of stream. -*emits* the specified number of elements has been accumulated or upstream completed +**emits** the specified number of elements has been accumulated or upstream completed -*backpressures* when a group has been assembled and downstream backpressures +**backpressures** when a group has been assembled and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes scan @@ -511,42 +522,42 @@ emitting the next current value. Note that this means that scan emits one element downstream before and upstream elements will not be requested until the second element is required from downstream. -*emits* when the function scanning the element returns a new element +**emits** when the function scanning the element returns a new element -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes fold ^^^^ Start with current value ``zero`` and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream. -*emits* when upstream completes +**emits** when upstream completes -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes drop ^^^^ Drop ``n`` elements and then pass any subsequent element downstream. -*emits* when the specified number of elements has been dropped already +**emits** when the specified number of elements has been dropped already -*backpressures* when the specified number of elements has been dropped and downstream backpressures +**backpressures** when the specified number of elements has been dropped and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes take ^^^^ Pass ``n`` incoming elements downstream and then complete -*emits* while the specified number of elements to take has not yet been reached +**emits** while the specified number of elements to take has not yet been reached -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when the defined number of elements has been taken or upstream completes +**completes** when the defined number of elements has been taken or upstream completes takeWhile @@ -554,62 +565,62 @@ takeWhile Pass elements downstream as long as a predicate function return true for the element include the element when the predicate first return false and then complete. -*emits* while the predicate is true and until the first false result +**emits** while the predicate is true and until the first false result -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when predicate returned false or upstream completes +**completes** when predicate returned false or upstream completes dropWhile ^^^^^^^^^ Drop elements as long as a predicate function return true for the element -*emits* when the predicate returned false and for all following stream elements +**emits** when the predicate returned false and for all following stream elements -*backpressures* predicate returned false and downstream backpressures +**backpressures** predicate returned false and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes recover ^^^^^^^ Allow sending of one last element downstream when a failure has happened upstream. -*emits* when the element is available from the upstream or upstream is failed and pf returns an element +**emits** when the element is available from the upstream or upstream is failed and pf returns an element -*backpressures* when downstream backpressures, not when failure happened +**backpressures** when downstream backpressures, not when failure happened -*completes* when upstream completes or upstream failed with exception pf can handle +**completes** when upstream completes or upstream failed with exception pf can handle recoverWith ^^^^^^^^^^^ Allow switching to alternative Source when a failure has happened upstream. -*emits* the element is available from the upstream or upstream is failed and pf returns alternative Source +**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source -*backpressures* downstream backpressures, after failure happened it backprssures to alternative Source +**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source -*completes* upstream completes or upstream failed with exception pf can handle +**completes** upstream completes or upstream failed with exception pf can handle detach ^^^^^^ Detach upstream demand from downstream demand without detaching the stream rates. -*emits* when the upstream stage has emitted and there is demand +**emits** when the upstream stage has emitted and there is demand -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes throttle ^^^^^^^^ Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. -*emits* when upstream emits an element and configured time per each element elapsed +**emits** when upstream emits an element and configured time per each element elapsed -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes @@ -628,11 +639,11 @@ order will be kept when results complete. For use cases where order does not mat If a ``CompletionStage`` fails, the stream also fails (unless a different supervision strategy is applied) -*emits* when the CompletionStage returned by the provided function finishes for the next element in sequence +**emits** when the CompletionStage returned by the provided function finishes for the next element in sequence -*backpressures* when the number of ``CompletionStage`` s reaches the configured parallelism and the downstream backpressures +**backpressures** when the number of ``CompletionStage`` s reaches the configured parallelism and the downstream backpressures -*completes* when upstream completes and all ``CompletionStage`` s has been completed and all elements has been emitted +**completes** when upstream completes and all ``CompletionStage`` s has been completed and all elements has been emitted mapAsyncUnordered ^^^^^^^^^^^^^^^^^ @@ -641,11 +652,11 @@ that triggered them. If a CompletionStage fails, the stream also fails (unless a different supervision strategy is applied) -*emits* any of the ``CompletionStage` s returned by the provided function complete +**emits** any of the ``CompletionStage`` s returned by the provided function complete -*backpressures* when the number of ``CompletionStage`` s reaches the configured parallelism and the downstream backpressures +**backpressures** when the number of ``CompletionStage`` s reaches the configured parallelism and the downstream backpressures -*completes* upstream completes and all CompletionStages has been completed and all elements has been emitted +**completes** upstream completes and all CompletionStages has been completed and all elements has been emitted Timer driven stages @@ -657,57 +668,57 @@ takeWithin ^^^^^^^^^^ Pass elements downstream within a timeout and then complete. -*emits* when an upstream element arrives +**emits** when an upstream element arrives -*backpressures* downstream backpressures +**backpressures** downstream backpressures -*completes* upstream completes or timer fires +**completes** upstream completes or timer fires dropWithin ^^^^^^^^^^ Drop elements until a timeout has fired -*emits* after the timer fired and a new upstream element arrives +**emits** after the timer fired and a new upstream element arrives -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* upstream completes +**completes** upstream completes groupedWithin ^^^^^^^^^^^^^ Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements, whichever happens first. -*emits* when the configured time elapses since the last group has been emitted +**emits** when the configured time elapses since the last group has been emitted -*backpressures* when the group has been assembled (the duration elapsed) and downstream backpressures +**backpressures** when the group has been assembled (the duration elapsed) and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes initialDelay ^^^^^^^^^^^^ Delay the initial element by a user specified duration from stream materialization. -*emits* upstream emits an element if the initial delay already elapsed +**emits** upstream emits an element if the initial delay already elapsed -*backpressures* downstream backpressures or initial delay not yet elapsed +**backpressures** downstream backpressures or initial delay not yet elapsed -*completes* when upstream completes +**completes** when upstream completes delay ^^^^^ Delay every element passed through with a specific duration. -*emits* there is a pending element in the buffer and configured time for this element elapsed +**emits** there is a pending element in the buffer and configured time for this element elapsed -*backpressures* differs, depends on ``OverflowStrategy`` set +**backpressures** differs, depends on ``OverflowStrategy`` set -*completes* when upstream completes and buffered elements has been drained +**completes** when upstream completes and buffered elements has been drained -.. _detached-stages-overview: +.. _detached-stages-overview_java: Backpressure aware stages ------------------------- @@ -720,11 +731,11 @@ Allow for a slower downstream by passing incoming elements and a summary into an there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type ``conflateWithSeed`` can be used: -*emits* when downstream stops backpressuring and there is a conflated element available +**emits** when downstream stops backpressuring and there is a conflated element available -*backpressures* when the aggregate function cannot keep up with incoming elements +**backpressures** when the aggregate function cannot keep up with incoming elements -*completes* when upstream completes +**completes** when upstream completes conflateWithSeed ^^^^^^^^^^^^^^^^ @@ -732,11 +743,11 @@ Allow for a slower downstream by passing incoming elements and a summary into an is backpressure. When backpressure starts or there is no backpressure element is passed into a ``seed`` function to transform it to the summary type. -*emits* when downstream stops backpressuring and there is a conflated element available +**emits** when downstream stops backpressuring and there is a conflated element available -*backpressures* when the aggregate or seed functions cannot keep up with incoming elements +**backpressures** when the aggregate or seed functions cannot keep up with incoming elements -*completes* when upstream completes +**completes** when upstream completes batch ^^^^^ @@ -750,11 +761,11 @@ to the summary type. Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be aggregated to the batched value. -*emits* when downstream stops backpressuring and there is a batched element available +**emits** when downstream stops backpressuring and there is a batched element available -*backpressures* when batched elements reached the max limit of allowed batched elements & downstream backpressures +**backpressures** when batched elements reached the max limit of allowed batched elements & downstream backpressures -*completes* when upstream completes and a "possibly pending" element was drained +**completes** when upstream completes and a "possibly pending" element was drained batchWeighted @@ -767,33 +778,33 @@ backpressure. Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be aggregated to the batched value. -*emits* downstream stops backpressuring and there is a batched element available +**emits** downstream stops backpressuring and there is a batched element available -*backpressures* batched elements reached the max weight limit of allowed batched elements & downstream backpressures +**backpressures** batched elements reached the max weight limit of allowed batched elements & downstream backpressures -*completes* upstream completes and a "possibly pending" element was drained +**completes** upstream completes and a "possibly pending" element was drained expand ^^^^^^ Allow for a faster downstream by expanding the last incoming element to an ``Iterator``. For example ``Iterator.continually(element)`` to keep repating the last incoming element. -*emits* when downstream stops backpressuring +**emits** when downstream stops backpressuring -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes buffer (Backpressure) ^^^^^^^^^^^^^^^^^^^^^ Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full backpressure is applied. -*emits* when downstream stops backpressuring and there is a pending element in the buffer +**emits** when downstream stops backpressuring and there is a pending element in the buffer -*backpressures* when buffer is full +**backpressures** when buffer is full -*completes* when upstream completes and buffered elements has been drained +**completes** when upstream completes and buffered elements has been drained buffer (Drop) ^^^^^^^^^^^^^ @@ -805,22 +816,22 @@ dropped according to the specified ``OverflowStrategy``: * ``dropBuffer()`` drops the entire buffer and buffers the new element * ``dropNew()`` drops the new element -*emits* when downstream stops backpressuring and there is a pending element in the buffer +**emits** when downstream stops backpressuring and there is a pending element in the buffer -*backpressures* never (when dropping cannot keep up with incoming elements) +**backpressures** never (when dropping cannot keep up with incoming elements) -*completes* upstream completes and buffered elements has been drained +**completes** upstream completes and buffered elements has been drained buffer (Fail) ^^^^^^^^^^^^^ Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full the stage fails the flow with a ``BufferOverflowException``. -*emits* when downstream stops backpressuring and there is a pending element in the buffer +**emits** when downstream stops backpressuring and there is a pending element in the buffer -*backpressures* never, fails the stream instead of backpressuring when buffer is full +**backpressures** never, fails the stream instead of backpressuring when buffer is full -*completes* when upstream completes and buffered elements has been drained +**completes** when upstream completes and buffered elements has been drained Nesting and flattening stages @@ -834,52 +845,52 @@ prefixAndTail Take up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. -*emits* when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream +**emits** when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream -*backpressures* when downstream backpressures or substream backpressures +**backpressures** when downstream backpressures or substream backpressures -*completes* when prefix elements has been consumed and substream has been consumed +**completes** when prefix elements has been consumed and substream has been consumed groupBy ^^^^^^^ Demultiplex the incoming stream into separate output streams. -*emits* an element for which the grouping function returns a group that has not yet been created. Emits the new group +**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures -*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) splitWhen ^^^^^^^^^ Split off elements into a new substream whenever a predicate function return ``true``. -*emits* an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements +**emits** an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements -*backpressures* when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures +**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures -*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) splitAfter ^^^^^^^^^^ End the current substream whenever a predicate returns ``true``, starting a new substream for the next element. -*emits* when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element +**emits** when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element -*backpressures* when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures +**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures -*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) flatMapConcat ^^^^^^^^^^^^^ Transform each input element into a ``Source`` whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts. -*emits* when the current consumed substream has an element available +**emits** when the current consumed substream has an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes and all consumed substreams complete +**completes** when upstream completes and all consumed substreams complete flatMapMerge @@ -887,11 +898,11 @@ flatMapMerge Transform each input element into a ``Source`` whose elements are then flattened into the output stream through merging. The maximum number of merged sources has to be specified. -*emits* when one of the currently consumed substreams has an element available +**emits** when one of the currently consumed substreams has an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes and all consumed substreams complete +**completes** when upstream completes and all consumed substreams complete Fan-in stages @@ -904,63 +915,63 @@ merge ^^^^^ Merge multiple sources. Picks elements randomly if all sources has elements ready. -*emits* when one of the inputs has an element available +**emits** when one of the inputs has an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) mergeSorted ^^^^^^^^^^^ Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element. -*emits* when all of the inputs have an element available +**emits** when all of the inputs have an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete +**completes** when all upstreams complete mergePreferred ^^^^^^^^^^^^^^ Merge multiple sources. Prefer one source if all sources has elements ready. -*emits* when one of the inputs has an element available, preferring a defined input if multiple have elements available +**emits** when one of the inputs has an element available, preferring a defined input if multiple have elements available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) zip ^^^ Combines elements from each of multiple sources into `Pair` s and passes the pairs downstream. -*emits* when all of the inputs have an element available +**emits** when all of the inputs have an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when any upstream completes +**completes** when any upstream completes zipWith ^^^^^^^ Combines elements from multiple sources through a ``combine`` function and passes the returned value downstream. -*emits* when all of the inputs have an element available +**emits** when all of the inputs have an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when any upstream completes +**completes** when any upstream completes concat ^^^^^^ After completion of the original upstream the elements of the given source will be emitted. -*emits* when the current stream has an element available; if the current input completes, it tries the next one +**emits** when the current stream has an element available; if the current input completes, it tries the next one -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete +**completes** when all upstreams complete prepend ^^^^^^^ @@ -968,22 +979,22 @@ Prepends the given source to the flow, consuming it until completion before the If materialized values needs to be collected ``prependMat`` is available. -*emits* when the given stream has an element available; if the given input completes, it tries the current one +**emits** when the given stream has an element available; if the given input completes, it tries the current one -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete +**completes** when all upstreams complete interleave ^^^^^^^^^^ Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one source completes the rest of the other stream will be emitted. -*emits* when element is available from the currently consumed upstream +**emits** when element is available from the currently consumed upstream -*backpressures* when upstream backpressures +**backpressures** when upstream backpressures -*completes* when both upstreams have completed +**completes** when both upstreams have completed Fan-out stages -------------- @@ -995,41 +1006,41 @@ unzip ^^^^^ Takes a stream of two element tuples and unzips the two elements ino two different downstreams. -*emits* when all of the outputs stops backpressuring and there is an input element available +**emits** when all of the outputs stops backpressuring and there is an input element available -*backpressures* when any of the outputs backpressures +**backpressures** when any of the outputs backpressures -*completes* when upstream completes +**completes** when upstream completes unzipWith ^^^^^^^^^ Splits each element of input into multiple downstreams using a function -*emits* when all of the outputs stops backpressuring and there is an input element available +**emits** when all of the outputs stops backpressuring and there is an input element available -*backpressures* when any of the outputs backpressures +**backpressures** when any of the outputs backpressures -*completes* when upstream completes +**completes** when upstream completes broadcast ^^^^^^^^^ Emit each incoming element each of ``n`` outputs. -*emits* when all of the outputs stops backpressuring and there is an input element available +**emits** when all of the outputs stops backpressuring and there is an input element available -*backpressures* when any of the outputs backpressures +**backpressures** when any of the outputs backpressures -*completes* when upstream completes +**completes** when upstream completes balance ^^^^^^^ Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. -*emits* when any of the outputs stops backpressuring; emits the element to the first available output +**emits** when any of the outputs stops backpressuring; emits the element to the first available output -*backpressures* when all of the outputs backpressure +**backpressures** when all of the outputs backpressure -*completes* when upstream completes +**completes** when upstream completes Watching status stages @@ -1040,9 +1051,9 @@ watchTermination Materializes to a ``CompletionStage`` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. The stage otherwise passes through elements unchanged. -*emits* when input has an element available +**emits** when input has an element available -*backpressures* when output backpressures +**backpressures** when output backpressures -*completes* when upstream completes +**completes** when upstream completes diff --git a/akka-docs/rst/java/stream/stream-cookbook.rst b/akka-docs/rst/java/stream/stream-cookbook.rst index 0fb3a475a8..3775870f89 100644 --- a/akka-docs/rst/java/stream/stream-cookbook.rst +++ b/akka-docs/rst/java/stream/stream-cookbook.rst @@ -16,7 +16,7 @@ This part also serves as supplementary material for the main body of documentati open while reading the manual and look for examples demonstrating various streaming concepts as they appear in the main body of documentation. -If you need a quick reference of the available processing stages used in the recipes see :ref:`stages-overview`. +If you need a quick reference of the available processing stages used in the recipes see :ref:`stages-overview_java`. Working with Flows ================== @@ -63,11 +63,11 @@ The function ``limit`` or ``take`` should always be used in conjunction in order For example, this is best avoided: -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeSeq.java#draining-to-list-unsafe +.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeSeq.java#draining-to-list-unsafe Rather, use ``limit`` or ``take`` to ensure that the resulting ``List`` will contain only up to ``MAX_ALLOWED_SIZE`` elements: -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeSeq.java#draining-to-list-safe +.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeSeq.java#draining-to-list-safe Calculating the digest of a ByteString stream --------------------------------------------- diff --git a/akka-docs/rst/java/stream/stream-flows-and-basics.rst b/akka-docs/rst/java/stream/stream-flows-and-basics.rst index 13b1c6cc2b..aadc3b3bc6 100644 --- a/akka-docs/rst/java/stream/stream-flows-and-basics.rst +++ b/akka-docs/rst/java/stream/stream-flows-and-basics.rst @@ -38,7 +38,7 @@ Processing Stage The common name for all building blocks that build up a Graph. Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like :class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``. - For the full list of built-in processing stages see :ref:`stages-overview` + For the full list of built-in processing stages see :ref:`stages-overview_java` When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they diff --git a/akka-docs/rst/java/stream/stream-graphs.rst b/akka-docs/rst/java/stream/stream-graphs.rst index aca91f52f2..6ab59fe9fb 100644 --- a/akka-docs/rst/java/stream/stream-graphs.rst +++ b/akka-docs/rst/java/stream/stream-graphs.rst @@ -24,7 +24,7 @@ Graphs are built from simple Flows which serve as the linear connections within which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour and making them explicit elements these elements should be rather straightforward to use. -Akka Streams currently provide these junctions (for a detailed list see :ref:`stages-overview`): +Akka Streams currently provide these junctions (for a detailed list see :ref:`stages-overview_java`): * **Fan-out** diff --git a/akka-docs/rst/java/stream/stream-introduction.rst b/akka-docs/rst/java/stream/stream-introduction.rst index 46ba7ef5f7..6605e2668a 100644 --- a/akka-docs/rst/java/stream/stream-introduction.rst +++ b/akka-docs/rst/java/stream/stream-introduction.rst @@ -79,7 +79,7 @@ and for best results we recommend the following approach: * The bottom-up learners may feel more at home rummaging through the :ref:`stream-cookbook-java`. * For a complete overview of the built-in processing stages you can look at the - table in :ref:`stages-overview` + table in :ref:`stages-overview_java` * The other sections can be read sequentially or as needed during the previous steps, each digging deeper into specific topics. diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSeq.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSeq.scala new file mode 100644 index 0000000000..06e54ec133 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSeq.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package docs.stream.cookbook + +import akka.stream.scaladsl._ +import scala.concurrent.Future +import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.duration._ + +class RecipeSeq extends RecipeSpec with ScalaFutures { + implicit val patience = PatienceConfig(3.seconds) + + "Draining to a strict sequence" must { + + "not be done unsafely" in { + val mySource = Source(1 to 3).map(_.toString) + //#draining-to-seq-unsafe + // Dangerous: might produce a collection with 2 billion elements! + val f: Future[Seq[String]] = mySource.runWith(Sink.seq) + //#draining-to-seq-unsafe + f.futureValue should ===(Seq("1", "2", "3")) + } + + "be done safely" in { + val mySource = Source(1 to 3).map(_.toString) + //#draining-to-seq-safe + val MAX_ALLOWED_SIZE = 100 + + // OK. Future will fail with a `StreamLimitReachedException` + // if the number of incoming elements is larger than max + val limited: Future[Seq[String]] = + mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq) + + // OK. Collect up until max-th elements only, then cancel upstream + val ignoreOverflow: Future[Seq[String]] = + mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq) + //#draining-to-seq-safe + limited.futureValue should ===(Seq("1", "2", "3")) + ignoreOverflow.futureValue should ===(Seq("1", "2", "3")) + } + + } + +} diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index b46a7aaa7e..b74e092fdf 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -1,4 +1,4 @@ -.. _stages-overview: +.. _stages-overview_scala: Overview of built-in stages and their semantics =============================================== @@ -17,61 +17,61 @@ for each materialization, which is the reason the method takes a function rather If the iterator perform blocking operations, make sure to run it on a separate dispatcher. -*emits* the next value returned from the iterator +**emits** the next value returned from the iterator -*completes* when the iterator reaches its end +**completes** when the iterator reaches its end apply ^^^^^ Stream the values of an ``immutable.Seq``. -*emits* the next value of the seq +**emits** the next value of the seq -*completes* when the last element of the seq has been emitted +**completes** when the last element of the seq has been emitted single ^^^^^^ Stream a single object -*emits* the value once +**emits** the value once -*completes* when the single value has been emitted +**completes** when the single value has been emitted repeat ^^^^^^ Stream a single object repeatedly -*emits* the same value repeatedly when there is demand +**emits** the same value repeatedly when there is demand -*completes* never +**completes** never tick ^^^^ A periodical repetition of an arbitrary object. Delay of first tick is specified separately from interval of the following ticks. -*emits* periodically, if there is downstream backpressure ticks are skipped +**emits** periodically, if there is downstream backpressure ticks are skipped -*completes* never +**completes** never fromFuture ^^^^^^^^^^ Send the single value of the ``Future`` when it completes and there is demand. If the future fails the stream is failed with that exception. -*emits* the future completes +**emits** the future completes -*completes* after the future has completed +**completes** after the future has completed fromCompletionStage ^^^^^^^^^^^^^^^^^^^ Send the single value of the Java ``CompletionStage`` when it completes and there is demand. If the future fails the stream is failed with that exception. -*emits* the future completes +**emits** the future completes -*completes* after the future has completed +**completes** after the future has completed unfold @@ -82,9 +82,9 @@ to pass a state. The first invocation of the provided fold function will receive Can be used to implement many stateful sources without having to touch the more low level ``GraphStage`` API. -*emits* when there is demand and the unfold function over the previous state returns non empty value +**emits** when there is demand and the unfold function over the previous state returns non empty value -*completes* when the unfold function returns an empty value +**completes** when the unfold function returns an empty value unfoldAsync ^^^^^^^^^^^ @@ -93,43 +93,43 @@ complete or emit when it completes. Can be used to implement many stateful sources without having to touch the more low level ``GraphStage`` API. -*emits* when there is demand and unfold state returned future completes with some value +**emits** when there is demand and unfold state returned future completes with some value -*completes* when the future returned by the unfold function completes with an empty value +**completes** when the future returned by the unfold function completes with an empty value empty ^^^^^ Complete right away without ever emitting any elements. Useful when you have to provide a source to an API but there are no elements to emit. -*emits* never +**emits** never -*completes* directly +**completes** directly maybe ^^^^^ Materialize a ``Promise[Option[T]]`` that if completed with a ``Some[T]`` will emit that `T` and then complete the stream, or if completed with ``None`` complete the stream right away. -*emits* when the returned promise is completed with some value +**emits** when the returned promise is completed with some value -*completes* after emitting some value, or directly if the promise is completed with no value +**completes** after emitting some value, or directly if the promise is completed with no value failed ^^^^^^ Fail directly with a user specified exception. -*emits* never +**emits** never -*completes* fails the stream directly with the given exception +**completes** fails the stream directly with the given exception actorPublisher ^^^^^^^^^^^^^^ Wrap an actor extending ``ActorPublisher`` as a source. -*emits* depends on the actor implementation +**emits** depends on the actor implementation -*completes* when the actor stops +**completes** when the actor stops actorRef ^^^^^^^^ @@ -137,17 +137,17 @@ Materialize an ``ActorRef``, sending messages to it will emit them on the stream a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream, the strategy is chosen by the user. -*emits* when there is demand and there are messages in the buffer or a message is sent to the actorref +**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref -*completes* when the actorref is sent ``akka.actor.Status.Success`` or ``PoisonPill`` +**completes** when the actorref is sent ``akka.actor.Status.Success`` or ``PoisonPill`` combine ^^^^^^^ Combine several sources, using a given strategy such as merge or concat, into one source. -*emits* when there is demand, but depending on the strategy +**emits** when there is demand, but depending on the strategy -*completes* when all sources has completed +**completes** when all sources has completed queue ^^^^^ @@ -156,9 +156,9 @@ a buffer, if elements are pushed onto the queue faster than the source is consum a strategy specified by the user. Functionality for tracking when an element has been emitted is available through ``SourceQueue.offer``. -*emits* when there is demand and the queue contains elements +**emits** when there is demand and the queue contains elements -*completes* when downstream completes +**completes** when downstream completes asSubscriber ^^^^^^^^^^^^ @@ -182,27 +182,27 @@ head Materializes into a ``Future`` which completes with the first value arriving, after this the stream is canceled. If no element is emitted, the future is be failed. -*cancels* after receiving one element +**cancels** after receiving one element -*backpressures* never +**backpressures** never headOption ^^^^^^^^^^ Materializes into a ``Future[Option[T]]`` which completes with the first value arriving wrapped in a ``Some``, or a ``None`` if the stream completes without any elements emitted. -*cancels* after receiving one element +**cancels** after receiving one element -*backpressures* never +**backpressures** never last ^^^^ Materializes into a ``Future`` which will complete with the last value emitted when the stream completes. If the stream completes with no elements the future is failed. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never lastOption ^^^^^^^^^^ @@ -210,24 +210,24 @@ Materialize a ``Future[Option[T]]`` which completes with the last value emitted wrapped in an ``Some`` when the stream completes. if the stream completes with no elements the future is completed with ``None``. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never ignore ^^^^^^ Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually do anything with the elements. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never cancelled ^^^^^^^^^ Immediately cancel the stream -*cancels* immediately +**cancels** immediately seq ^^^ @@ -235,7 +235,7 @@ Collect values emitted from the stream into a collection, the collection is avai which completes when the stream completes. Note that the collection is bounded to ``Int.MaxValue``, if more element are emitted the sink will cancel the stream -*cancels* If too many values are collected +**cancels** If too many values are collected foreach ^^^^^^^ @@ -246,27 +246,27 @@ stream completes, or fails if the stream fails. Note that it is not safe to mutate state from the procedure. -*cancels* never +**cancels** never -*backpressures* when the previous procedure invocation has not yet completed +**backpressures** when the previous procedure invocation has not yet completed foreachParallel ^^^^^^^^^^^^^^^ Like ``foreach`` but allows up to ``parallellism`` procedure calls to happen in parallel. -*cancels* never +**cancels** never -*backpressures* when the previous parallel procedure invocations has not yet completed +**backpressures** when the previous parallel procedure invocations has not yet completed onComplete ^^^^^^^^^^ Invoke a callback when the stream has completed or failed. -*cancels* never +**cancels** never -*backpressures* never +**backpressures** never fold @@ -279,9 +279,9 @@ Materializes into a future that will complete with the last state when the strea This stage allows combining values into a result without a global mutable state by instead passing the state along between invocations. -*cancels* never +**cancels** never -*backpressures* when the previous fold function invocation has not yet completed +**backpressures** when the previous fold function invocation has not yet completed reduce ^^^^^^ @@ -290,27 +290,27 @@ receives the two first elements of the flow. Materializes into a future that will be completed by the last result of the reduction function. -*cancels* never +**cancels** never -*backpressures* when the previous reduction function invocation has not yet completed +**backpressures** when the previous reduction function invocation has not yet completed combine ^^^^^^^ Combine several sinks into one using a user specified strategy -*cancels* depends on the strategy +**cancels** depends on the strategy -*backpressures* depends on the strategy +**backpressures** depends on the strategy actorRef ^^^^^^^^ Send the elements from the stream to an ``ActorRef``. No backpressure so care must be taken to not overflow the inbox. -*cancels* when the actor terminates +**cancels** when the actor terminates -*backpressures* never +**backpressures** never actorRefWithAck @@ -318,9 +318,9 @@ actorRefWithAck Send the elements from the stream to an ``ActorRef`` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. -*cancels* when the actor terminates +**cancels** when the actor terminates -*backpressures* when the actor acknowledgement has not arrived +**backpressures** when the actor acknowledgement has not arrived actorSubscriber @@ -330,9 +330,9 @@ receive the elements from the stream. Materializes into an ``ActorRef`` to the created actor. -*cancels* when the actor terminates +**cancels** when the actor terminates -*backpressures* depends on the actor implementation +**backpressures** depends on the actor implementation asPublisher @@ -423,61 +423,72 @@ Simple processing stages These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a single input (e.g. `mapConcat') or consume multiple elements before emitting one output (e.g. ``filter``). However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the -rate is affected. This is in contrast with :ref:`detached-stages-overview` which can change their processing behavior +rate is affected. This is in contrast with :ref:`detached-stages-overview_scala` which can change their processing behavior depending on being backpressured by downstream or not. map ^^^ Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. -*emits* when the mapping function returns an element +**emits** when the mapping function returns an element -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes mapConcat ^^^^^^^^^ Transform each element into zero or more elements that are individually passed downstream. -*emits* when the mapping function returns an element or there are still remaining elements from the previously calculated collection +**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection -*backpressures* when downstream backpressures or there are still available elements from the previously calculated collection +**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection -*completes* when upstream completes and all remaining elements has been emitted +**completes** when upstream completes and all remaining elements has been emitted + +statefulMapConcat +^^^^^^^^^^^^^^^^^ +Transform each element into zero or more elements that are individually passed downstream. The difference to ``mapConcat`` is that +the transformation function is created from a factory for every materialization of the flow. + +**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection + +**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection + +**completes** when upstream completes and all remaining elements has been emitted filter ^^^^^^ Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if it returns false the element is discarded. -*emits* when the given predicate returns true for the element +**emits** when the given predicate returns true for the element -*backpressures* when the given predicate returns true for the element and downstream backpressures +**backpressures** when the given predicate returns true for the element and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes collect ^^^^^^^ Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream. Can often replace ``filter`` followed by ``map`` to achieve the same in one single stage. -*emits* when the provided partial function is defined for the element +**emits** when the provided partial function is defined for the element -*backpressures* the partial function is defined for the element and downstream backpressures +**backpressures** the partial function is defined for the element and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes grouped ^^^^^^^ Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream. -*emits* when the specified number of elements has been accumulated or upstream completed +**emits** when the specified number of elements has been accumulated or upstream completed -*backpressures* when a group has been assembled and downstream backpressures +**backpressures** when a group has been assembled and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes sliding ^^^^^^^ @@ -485,11 +496,11 @@ Provide a sliding window over the incoming stream and pass the windows as groups Note: the last window might be smaller than the requested size due to end of stream. -*emits* the specified number of elements has been accumulated or upstream completed +**emits** the specified number of elements has been accumulated or upstream completed -*backpressures* when a group has been assembled and downstream backpressures +**backpressures** when a group has been assembled and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes scan @@ -500,42 +511,42 @@ emitting the next current value. Note that this means that scan emits one element downstream before and upstream elements will not be requested until the second element is required from downstream. -*emits* when the function scanning the element returns a new element +**emits** when the function scanning the element returns a new element -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes fold ^^^^ Start with current value ``zero`` and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream. -*emits* when upstream completes +**emits** when upstream completes -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes drop ^^^^ Drop ``n`` elements and then pass any subsequent element downstream. -*emits* when the specified number of elements has been dropped already +**emits** when the specified number of elements has been dropped already -*backpressures* when the specified number of elements has been dropped and downstream backpressures +**backpressures** when the specified number of elements has been dropped and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes take ^^^^ Pass ``n`` incoming elements downstream and then complete -*emits* while the specified number of elements to take has not yet been reached +**emits** while the specified number of elements to take has not yet been reached -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when the defined number of elements has been taken or upstream completes +**completes** when the defined number of elements has been taken or upstream completes takeWhile @@ -543,51 +554,51 @@ takeWhile Pass elements downstream as long as a predicate function return true for the element include the element when the predicate first return false and then complete. -*emits* while the predicate is true and until the first false result +**emits** while the predicate is true and until the first false result -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when predicate returned false or upstream completes +**completes** when predicate returned false or upstream completes dropWhile ^^^^^^^^^ Drop elements as long as a predicate function return true for the element -*emits* when the predicate returned false and for all following stream elements +**emits** when the predicate returned false and for all following stream elements -*backpressures* predicate returned false and downstream backpressures +**backpressures** predicate returned false and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes recover ^^^^^^^ Allow sending of one last element downstream when a failure has happened upstream. -*emits* when the element is available from the upstream or upstream is failed and pf returns an element +**emits** when the element is available from the upstream or upstream is failed and pf returns an element -*backpressures* when downstream backpressures, not when failure happened +**backpressures** when downstream backpressures, not when failure happened -*completes* when upstream completes or upstream failed with exception pf can handle +**completes** when upstream completes or upstream failed with exception pf can handle recoverWith ^^^^^^^^^^^ Allow switching to alternative Source when a failure has happened upstream. -*emits* the element is available from the upstream or upstream is failed and pf returns alternative Source +**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source -*backpressures* downstream backpressures, after failure happened it backprssures to alternative Source +**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source -*completes* upstream completes or upstream failed with exception pf can handle +**completes** upstream completes or upstream failed with exception pf can handle detach ^^^^^^ Detach upstream demand from downstream demand without detaching the stream rates. -*emits* when the upstream stage has emitted and there is demand +**emits** when the upstream stage has emitted and there is demand -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes throttle @@ -595,11 +606,11 @@ throttle Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. -*emits* when upstream emits an element and configured time per each element elapsed +**emits** when upstream emits an element and configured time per each element elapsed -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes Asynchronous processing stages @@ -617,11 +628,11 @@ order will be kept when results complete. For use cases where order does not mat If a Future fails, the stream also fails (unless a different supervision strategy is applied) -*emits* when the Future returned by the provided function finishes for the next element in sequence +**emits** when the Future returned by the provided function finishes for the next element in sequence -*backpressures* when the number of futures reaches the configured parallelism and the downstream backpressures +**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures -*completes* when upstream completes and all futures has been completed and all elements has been emitted +**completes** when upstream completes and all futures has been completed and all elements has been emitted mapAsyncUnordered ^^^^^^^^^^^^^^^^^ @@ -630,11 +641,11 @@ that triggered them. If a Future fails, the stream also fails (unless a different supervision strategy is applied) -*emits* any of the Futures returned by the provided function complete +**emits** any of the Futures returned by the provided function complete -*backpressures* when the number of futures reaches the configured parallelism and the downstream backpressures +**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures -*completes* upstream completes and all futures has been completed and all elements has been emitted +**completes** upstream completes and all futures has been completed and all elements has been emitted Timer driven stages @@ -646,59 +657,59 @@ takeWithin ^^^^^^^^^^ Pass elements downstream within a timeout and then complete. -*emits* when an upstream element arrives +**emits** when an upstream element arrives -*backpressures* downstream backpressures +**backpressures** downstream backpressures -*completes* upstream completes or timer fires +**completes** upstream completes or timer fires dropWithin ^^^^^^^^^^ Drop elements until a timeout has fired -*emits* after the timer fired and a new upstream element arrives +**emits** after the timer fired and a new upstream element arrives -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* upstream completes +**completes** upstream completes groupedWithin ^^^^^^^^^^^^^ Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements, whichever happens first. -*emits* when the configured time elapses since the last group has been emitted +**emits** when the configured time elapses since the last group has been emitted -*backpressures* when the group has been assembled (the duration elapsed) and downstream backpressures +**backpressures** when the group has been assembled (the duration elapsed) and downstream backpressures -*completes* when upstream completes +**completes** when upstream completes initialDelay ^^^^^^^^^^^^ Delay the initial element by a user specified duration from stream materialization. -*emits* upstream emits an element if the initial delay already elapsed +**emits** upstream emits an element if the initial delay already elapsed -*backpressures* downstream backpressures or initial delay not yet elapsed +**backpressures** downstream backpressures or initial delay not yet elapsed -*completes* when upstream completes +**completes** when upstream completes delay ^^^^^ Delay every element passed through with a specific duration. -*emits* there is a pending element in the buffer and configured time for this element elapsed +**emits** there is a pending element in the buffer and configured time for this element elapsed -*backpressures* differs, depends on ``OverflowStrategy`` set +**backpressures** differs, depends on ``OverflowStrategy`` set -*completes* when upstream completes and buffered elements has been drained +**completes** when upstream completes and buffered elements has been drained -.. _detached-stages-overview: +.. _detached-stages-overview_scala: Backpressure aware stages ------------------------- @@ -711,11 +722,11 @@ Allow for a slower downstream by passing incoming elements and a summary into an there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type ``conflateWithSeed`` can be used: -*emits* when downstream stops backpressuring and there is a conflated element available +**emits** when downstream stops backpressuring and there is a conflated element available -*backpressures* when the aggregate function cannot keep up with incoming elements +**backpressures** when the aggregate function cannot keep up with incoming elements -*completes* when upstream completes +**completes** when upstream completes conflateWithSeed ^^^^^^^^^^^^^^^^ @@ -723,11 +734,11 @@ Allow for a slower downstream by passing incoming elements and a summary into an is backpressure. When backpressure starts or there is no backpressure element is passed into a ``seed`` function to transform it to the summary type. -*emits* when downstream stops backpressuring and there is a conflated element available +**emits** when downstream stops backpressuring and there is a conflated element available -*backpressures* when the aggregate or seed functions cannot keep up with incoming elements +**backpressures** when the aggregate or seed functions cannot keep up with incoming elements -*completes* when upstream completes +**completes** when upstream completes batch ^^^^^ @@ -741,11 +752,11 @@ to the summary type. Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be aggregated to the batched value. -*emits* when downstream stops backpressuring and there is a batched element available +**emits** when downstream stops backpressuring and there is a batched element available -*backpressures* when batched elements reached the max limit of allowed batched elements & downstream backpressures +**backpressures** when batched elements reached the max limit of allowed batched elements & downstream backpressures -*completes* when upstream completes and a "possibly pending" element was drained +**completes** when upstream completes and a "possibly pending" element was drained batchWeighted @@ -758,33 +769,33 @@ backpressure. Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be aggregated to the batched value. -*emits* downstream stops backpressuring and there is a batched element available +**emits** downstream stops backpressuring and there is a batched element available -*backpressures* batched elements reached the max weight limit of allowed batched elements & downstream backpressures +**backpressures** batched elements reached the max weight limit of allowed batched elements & downstream backpressures -*completes* upstream completes and a "possibly pending" element was drained +**completes** upstream completes and a "possibly pending" element was drained expand ^^^^^^ Allow for a faster downstream by expanding the last incoming element to an ``Iterator``. For example ``Iterator.continually(element)`` to keep repating the last incoming element. -*emits* when downstream stops backpressuring +**emits** when downstream stops backpressuring -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes +**completes** when upstream completes buffer (Backpressure) ^^^^^^^^^^^^^^^^^^^^^ Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full backpressure is applied. -*emits* when downstream stops backpressuring and there is a pending element in the buffer +**emits** when downstream stops backpressuring and there is a pending element in the buffer -*backpressures* when buffer is full +**backpressures** when buffer is full -*completes* when upstream completes and buffered elements has been drained +**completes** when upstream completes and buffered elements has been drained buffer (Drop) ^^^^^^^^^^^^^ @@ -796,22 +807,22 @@ dropped according to the specified ``OverflowStrategy``: * ``dropBuffer`` drops the entire buffer and buffers the new element * ``dropNew`` drops the new element -*emits* when downstream stops backpressuring and there is a pending element in the buffer +**emits** when downstream stops backpressuring and there is a pending element in the buffer -*backpressures* never (when dropping cannot keep up with incoming elements) +**backpressures** never (when dropping cannot keep up with incoming elements) -*completes* upstream completes and buffered elements has been drained +**completes** upstream completes and buffered elements has been drained buffer (Fail) ^^^^^^^^^^^^^ Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full the stage fails the flow with a ``BufferOverflowException``. -*emits* when downstream stops backpressuring and there is a pending element in the buffer +**emits** when downstream stops backpressuring and there is a pending element in the buffer -*backpressures* never, fails the stream instead of backpressuring when buffer is full +**backpressures** never, fails the stream instead of backpressuring when buffer is full -*completes* when upstream completes and buffered elements has been drained +**completes** when upstream completes and buffered elements has been drained Nesting and flattening stages @@ -825,52 +836,52 @@ prefixAndTail Take up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. -*emits* when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream +**emits** when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream -*backpressures* when downstream backpressures or substream backpressures +**backpressures** when downstream backpressures or substream backpressures -*completes* when prefix elements has been consumed and substream has been consumed +**completes** when prefix elements has been consumed and substream has been consumed groupBy ^^^^^^^ Demultiplex the incoming stream into separate output streams. -*emits* an element for which the grouping function returns a group that has not yet been created. Emits the new group +**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures -*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) splitWhen ^^^^^^^^^ Split off elements into a new substream whenever a predicate function return ``true``. -*emits* an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements +**emits** an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements -*backpressures* when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures +**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures -*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) splitAfter ^^^^^^^^^^ End the current substream whenever a predicate returns ``true``, starting a new substream for the next element. -*emits* when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element +**emits** when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element -*backpressures* when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures +**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures -*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) flatMapConcat ^^^^^^^^^^^^^ Transform each input element into a ``Source`` whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts. -*emits* when the current consumed substream has an element available +**emits** when the current consumed substream has an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes and all consumed substreams complete +**completes** when upstream completes and all consumed substreams complete flatMapMerge @@ -878,11 +889,11 @@ flatMapMerge Transform each input element into a ``Source`` whose elements are then flattened into the output stream through merging. The maximum number of merged sources has to be specified. -*emits* when one of the currently consumed substreams has an element available +**emits** when one of the currently consumed substreams has an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when upstream completes and all consumed substreams complete +**completes** when upstream completes and all consumed substreams complete Fan-in stages @@ -895,63 +906,63 @@ merge ^^^^^ Merge multiple sources. Picks elements randomly if all sources has elements ready. -*emits* when one of the inputs has an element available +**emits** when one of the inputs has an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) mergeSorted ^^^^^^^^^^^ Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element. -*emits* when all of the inputs have an element available +**emits** when all of the inputs have an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete +**completes** when all upstreams complete mergePreferred ^^^^^^^^^^^^^^ Merge multiple sources. Prefer one source if all sources has elements ready. -*emits* when one of the inputs has an element available, preferring a defined input if multiple have elements available +**emits** when one of the inputs has an element available, preferring a defined input if multiple have elements available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) zip ^^^ Combines elements from each of multiple sources into tuples and passes the tuples downstream. -*emits* when all of the inputs have an element available +**emits** when all of the inputs have an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when any upstream completes +**completes** when any upstream completes zipWith ^^^^^^^ Combines elements from multiple sources through a ``combine`` function and passes the returned value downstream. -*emits* when all of the inputs have an element available +**emits** when all of the inputs have an element available -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when any upstream completes +**completes** when any upstream completes concat ^^^^^^ After completion of the original upstream the elements of the given source will be emitted. -*emits* when the current stream has an element available; if the current input completes, it tries the next one +**emits** when the current stream has an element available; if the current input completes, it tries the next one -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete +**completes** when all upstreams complete prepend ^^^^^^^ @@ -959,22 +970,22 @@ Prepends the given source to the flow, consuming it until completion before the If materialized values needs to be collected ``prependMat`` is available. -*emits* when the given stream has an element available; if the given input completes, it tries the current one +**emits** when the given stream has an element available; if the given input completes, it tries the current one -*backpressures* when downstream backpressures +**backpressures** when downstream backpressures -*completes* when all upstreams complete +**completes** when all upstreams complete interleave ^^^^^^^^^^ Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one source completes the rest of the other stream will be emitted. -*emits* when element is available from the currently consumed upstream +**emits** when element is available from the currently consumed upstream -*backpressures* when upstream backpressures +**backpressures** when upstream backpressures -*completes* when both upstreams have completed +**completes** when both upstreams have completed Fan-out stages -------------- @@ -986,41 +997,41 @@ unzip ^^^^^ Takes a stream of two element tuples and unzips the two elements ino two different downstreams. -*emits* when all of the outputs stops backpressuring and there is an input element available +**emits** when all of the outputs stops backpressuring and there is an input element available -*backpressures* when any of the outputs backpressures +**backpressures** when any of the outputs backpressures -*completes* when upstream completes +**completes** when upstream completes unzipWith ^^^^^^^^^ Splits each element of input into multiple downstreams using a function -*emits* when all of the outputs stops backpressuring and there is an input element available +**emits** when all of the outputs stops backpressuring and there is an input element available -*backpressures* when any of the outputs backpressures +**backpressures** when any of the outputs backpressures -*completes* when upstream completes +**completes** when upstream completes broadcast ^^^^^^^^^ Emit each incoming element each of ``n`` outputs. -*emits* when all of the outputs stops backpressuring and there is an input element available +**emits** when all of the outputs stops backpressuring and there is an input element available -*backpressures* when any of the outputs backpressures +**backpressures** when any of the outputs backpressures -*completes* when upstream completes +**completes** when upstream completes balance ^^^^^^^ Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. -*emits* when any of the outputs stops backpressuring; emits the element to the first available output +**emits** when any of the outputs stops backpressuring; emits the element to the first available output -*backpressures* when all of the outputs backpressure +**backpressures** when all of the outputs backpressure -*completes* when upstream completes +**completes** when upstream completes Watching status stages @@ -1031,9 +1042,9 @@ watchTermination Materializes to a ``Future`` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. The stage otherwise passes through elements unchanged. -*emits* when input has an element available +**emits** when input has an element available -*backpressures* when output backpressures +**backpressures** when output backpressures -*completes* when upstream completes +**completes** when upstream completes diff --git a/akka-docs/rst/scala/stream/stream-cookbook.rst b/akka-docs/rst/scala/stream/stream-cookbook.rst index 12a22a1de7..139f993be3 100644 --- a/akka-docs/rst/scala/stream/stream-cookbook.rst +++ b/akka-docs/rst/scala/stream/stream-cookbook.rst @@ -16,7 +16,7 @@ This part also serves as supplementary material for the main body of documentati open while reading the manual and look for examples demonstrating various streaming concepts as they appear in the main body of documentation. -If you need a quick reference of the available processing stages used in the recipes see :ref:`stages-overview`. +If you need a quick reference of the available processing stages used in the recipes see :ref:`stages-overview_scala`. Working with Flows ================== @@ -63,11 +63,11 @@ The function ``limit`` or ``take`` should always be used in conjunction in order For example, this is best avoided: -.. includecode:: code/docs/stream/cookbook/RecipeSeq.scala#draining-to-seq-unsafe +.. includecode:: ../code/docs/stream/cookbook/RecipeSeq.scala#draining-to-seq-unsafe Rather, use ``limit`` or ``take`` to ensure that the resulting ``Seq`` will contain only up to ``max`` elements: -.. includecode:: code/docs/stream/cookbook/RecipeSeq.scala#draining-to-seq-safe +.. includecode:: ../code/docs/stream/cookbook/RecipeSeq.scala#draining-to-seq-safe Calculating the digest of a ByteString stream --------------------------------------------- diff --git a/akka-docs/rst/scala/stream/stream-flows-and-basics.rst b/akka-docs/rst/scala/stream/stream-flows-and-basics.rst index 21a8bc2f20..d32b4dceef 100644 --- a/akka-docs/rst/scala/stream/stream-flows-and-basics.rst +++ b/akka-docs/rst/scala/stream/stream-flows-and-basics.rst @@ -38,7 +38,7 @@ Processing Stage The common name for all building blocks that build up a Graph. Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like :class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``. - For the full list of built-in processing stages see :ref:`stages-overview` + For the full list of built-in processing stages see :ref:`stages-overview_scala` When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they diff --git a/akka-docs/rst/scala/stream/stream-graphs.rst b/akka-docs/rst/scala/stream/stream-graphs.rst index 314298904b..37b6dcf8be 100644 --- a/akka-docs/rst/scala/stream/stream-graphs.rst +++ b/akka-docs/rst/scala/stream/stream-graphs.rst @@ -24,7 +24,7 @@ Graphs are built from simple Flows which serve as the linear connections within which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour and making them explicit elements these elements should be rather straightforward to use. -Akka Streams currently provide these junctions (for a detailed list see :ref:`stages-overview`): +Akka Streams currently provide these junctions (for a detailed list see :ref:`stages-overview_scala`): * **Fan-out** diff --git a/akka-docs/rst/scala/stream/stream-introduction.rst b/akka-docs/rst/scala/stream/stream-introduction.rst index dacecf187d..064cd46890 100644 --- a/akka-docs/rst/scala/stream/stream-introduction.rst +++ b/akka-docs/rst/scala/stream/stream-introduction.rst @@ -79,7 +79,7 @@ and for best results we recommend the following approach: * The bottom-up learners may feel more at home rummaging through the :ref:`stream-cookbook-scala`. * For a complete overview of the built-in processing stages you can look at the - table in :ref:`stages-overview` + table in :ref:`stages-overview_scala` * The other sections can be read sequentially or as needed during the previous steps, each digging deeper into specific topics. diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 4e4bf7393a..8252f3838c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -362,12 +362,12 @@ object StreamLayout { } final case class CompositeModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) @@ -392,13 +392,13 @@ object StreamLayout { } final case class FusedModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { override def isFused: Boolean = true