Stream supervision doc clarification #23376

This commit is contained in:
Johan Andrén 2017-07-26 16:23:46 +02:00 committed by GitHub
parent 8eabc685dc
commit 407131cb4e
34 changed files with 509 additions and 193 deletions

View file

@ -1,4 +1,4 @@
# Contents
* @ref[Java Documentation](java/index.md)
* @ref[Scala Documentation](scala/index.md)
* @ref:[Java Documentation](java/index.md)
* @ref:[Scala Documentation](scala/index.md)

View file

@ -921,7 +921,7 @@ e.g. scheduled tick messages.
## Timers, scheduled messages
Messages can be scheduled to be sent at a later point by using the @ref[Scheduler](scheduler.md) directly,
Messages can be scheduled to be sent at a later point by using the @ref:[Scheduler](scheduler.md) directly,
but when scheduling periodic or single messages in an actor to itself it's more convenient and safe
to use the support for named timers. The lifecycle of scheduled messages can be difficult to manage
when the actor is restarted and that is taken care of by the timers.

View file

@ -70,7 +70,7 @@ in an application composed of multiple JARs to reside under a single package nam
might scan all classes from `com.example.plugins` for specific service implementations with that package existing in
several contributed JARs.
While it is possible to support overlapping packages with complex manifest headers, it's much better to use non-overlapping
package spaces and facilities such as @ref[Akka Cluster](../common/cluster.md)
package spaces and facilities such as @ref:[Akka Cluster](../common/cluster.md)
for service discovery. Stylistically, many organizations opt to use the root package path as the name of the bundle
distribution file.

View file

@ -307,7 +307,7 @@ sent to the parent of the entity actor, otherwise the entity will be automatical
restarted after the entity restart backoff specified in the configuration.
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
stored in @ref[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
the default directory contains the remote port of the actor system. If using a dynamically
assigned port (0) it will be different each time and the previously stored data will not
@ -319,7 +319,7 @@ for that entity has been received in the `Shard`. Entities will not be restarted
using a `Passivate`.
Note that the state of the entities themselves will not be restored unless they have been made persistent,
e.g. with @ref[Persistence](persistence.md).
e.g. with @ref:[Persistence](persistence.md).
The performance cost of `rememberEntities` is rather high when starting/stopping entities and when
shards are rebalanced. This cost increases with number of entities per shard and we currently don't

View file

@ -869,7 +869,7 @@ the actor system for a specific role. This can also be used to grab the `akka.ac
## How to Test
Currently testing with the `sbt-multi-jvm` plugin is only documented for Scala.
Go to the corresponding @ref[Scala page](../scala/cluster-usage.md#how-to-test) for details.
Go to the corresponding @ref:[Scala page](../scala/cluster-usage.md#how-to-test) for details.
@@@

View file

@ -36,7 +36,7 @@ instructions on downloading and running the Hello World example. The *Quickstart
This *Getting Started* guide provides the next level of information. It covers why the actor model fits the needs of modern distributed systems and includes a tutorial that will help further your knowledge of Akka. Topics include:
* @ref[Why modern systems need a new programming model](actors-motivation.md)
* @ref[How the actor model meets the needs of concurrent, distributed systems](actors-intro.md)
* @ref[Overview of Akka libraries and modules](modules.md)
* A @ref[more complex example](tutorial.md) that builds on the Hello World example to illustrate common Akka patterns.
* @ref:[Why modern systems need a new programming model](actors-motivation.md)
* @ref:[How the actor model meets the needs of concurrent, distributed systems](actors-intro.md)
* @ref:[Overview of Akka libraries and modules](modules.md)
* A @ref:[more complex example](tutorial.md) that builds on the Hello World example to illustrate common Akka patterns.

View file

@ -92,7 +92,7 @@ first stopped
When we stopped actor `first`, it stopped its child actor, `second`, before stopping itself. This ordering is strict, _all_ `postStop()` hooks of the children are called before the `postStop()` hook of the parent
is called.
The @ref[Actor Lifecycle](../actors.md#actor-lifecycle) section of the Akka reference manual provides details on the full set of lifecyle hooks.
The @ref:[Actor Lifecycle](../actors.md#actor-lifecycle) section of the Akka reference manual provides details on the full set of lifecyle hooks.
### Failure handling
@ -143,7 +143,7 @@ We see that after failure the supervised actor is stopped and immediately restar
which are the default to be called after and before restarts, so we cannot distinguish from inside the actor whether it was started for the first time or restarted. This is usually the right thing to do, the purpose of the restart is to set the actor in a known-good state, which usually means a clean starting stage. **What actually happens though is
that the `preRestart()` and `postRestart()` methods are called which, if not overridden, by default delegate to `postStop()` and `preStart()` respectively**. You can experiment with overriding these additional methods and see how the output changes.
For the impatient, we also recommend looking into the @ref[supervision reference page](../general/supervision.md) for more in-depth
For the impatient, we also recommend looking into the @ref:[supervision reference page](../general/supervision.md) for more in-depth
details.
# Summary

View file

@ -103,7 +103,7 @@ This means that, for Akka messages:
These guarantees strike a good balance: having messages from one actor arrive in-order is convenient for building systems that can be easily reasoned about, while on the other hand allowing messages from different actors to arrive interleaved provides sufficient freedom for an efficient implementation of the actor system.
For the full details on delivery guarantees please refer to the @ref[reference page](../general/message-delivery-reliability.md).
For the full details on delivery guarantees please refer to the @ref:[reference page](../general/message-delivery-reliability.md).
## Adding flexibility to device messages

View file

@ -241,13 +241,13 @@ Java
## Summary
In the context of the IoT system, this guide introduced the following concepts, among others. You can follow the links to review them if necessary:
* @ref[The hierarchy of actors and their lifecycle](tutorial_1.md)
* @ref[The importance of designing messages for flexibility](tutorial_3.md)
* @ref[How to watch and stop actors, if necessary](tutorial_4.md#keeping-track-of-the-device-actors-in-the-group)
* @ref:[The hierarchy of actors and their lifecycle](tutorial_1.md)
* @ref:[The importance of designing messages for flexibility](tutorial_3.md)
* @ref:[How to watch and stop actors, if necessary](tutorial_4.md#keeping-track-of-the-device-actors-in-the-group)
## What's Next?
To continue your journey with Akka, we recommend:
* Start building your own applications with Akka, make sure you [get involved in our amazing community](http://akka.io/get-involved) for help if you get stuck.
* If youd like some additional background, read the rest of the reference documentation and check out some of the @ref[books and videos](../additional/books.md) on Akka.
* If youd like some additional background, read the rest of the reference documentation and check out some of the @ref:[books and videos](../additional/books.md) on Akka.

View file

@ -35,7 +35,7 @@ class MyActor extends Actor with akka.actor.ActorLogging {
The first parameter to @scala[`Logging`] @java[`Logging.getLogger`] could also be any
`LoggingBus`, specifically @scala[`system.eventStream`] @scala[`system.eventStream()`]; in the demonstrated
case, the actor system's address is included in the `akkaSource`
representation of the log source (see @ref[Logging Thread, Akka Source and Actor System in MDC](#logging-thread-akka-source-and-actor-system-in-mdc))
representation of the log source (see @ref:[Logging Thread, Akka Source and Actor System in MDC](#logging-thread-akka-source-and-actor-system-in-mdc))
while in the second case this is not automatically done.
The second parameter to @scala[`Logging`] @java[`Logging.getLogger`] is the source of this logging channel.
The source object is translated to a String according to the following rules:
@ -235,7 +235,7 @@ akka {
}
```
Also see the @ref[logging options for TestKit](testing.md#actor-logging).
Also see the @ref:[logging options for TestKit](testing.md#actor-logging).
@@@ div { .group-scala }
@ -297,7 +297,7 @@ using an async logging backend though. (See [Using the SLF4J API directly](#slf4
@@@
You can configure which event handlers are created at system start-up and listen to logging events. That is done using the
`loggers` element in the @ref[configuration](general/configuration.md).
`loggers` element in the @ref:[configuration](general/configuration.md).
Here you can also define the log level. More fine grained filtering based on the log source
can be implemented in a custom `LoggingFilter`, which can be defined in the `logging-filter`
configuration property.
@ -313,7 +313,7 @@ akka {
```
The default one logs to STDOUT and is registered by default. It is not intended
to be used for production. There is also an @ref[SLF4J](#slf4j)
to be used for production. There is also an @ref:[SLF4J](#slf4j)
logger available in the 'akka-slf4j' module.
Example of creating a listener:
@ -575,7 +575,7 @@ rely more information using them rather than just a single string.
Akka includes a logger for [java.util.logging](https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html#package.description).
You need to enable the `akka.event.jul.JavaLogger` in the `loggers` element in
the @ref[configuration](general/configuration.md). Here you can also define the log level of the event bus.
the @ref:[configuration](general/configuration.md). Here you can also define the log level of the event bus.
More fine grained log levels can be defined in the configuration of the logging backend.
You should also define `akka.event.jul.JavaLoggingFilter` in
the `logging-filter` configuration property. It will filter the log events using the backend

View file

@ -327,7 +327,7 @@ so if network security is not considered as enough protection the classic remoti
Best practice is that Akka remoting nodes should only be accessible from the adjacent network.
It is also security best practice to @ref[disable the Java serializer](#disabling-the-java-serializer) because of
It is also security best practice to @ref:[disable the Java serializer](#disabling-the-java-serializer) because of
its multiple [known attack surfaces](https://community.hpe.com/t5/Security-Research/The-perils-of-Java-deserialization/ba-p/6838995).
### Untrusted Mode
@ -354,7 +354,7 @@ as a marker trait to user-defined messages.
Untrusted mode does not give full protection against attacks by itself.
It makes it slightly harder to perform malicious or unintended actions but
it should be complemented with @ref[disabled Java serializer](#disabling-the-java-serializer)
it should be complemented with @ref:[disabled Java serializer](#disabling-the-java-serializer)
Additional protection can be achieved when running in an untrusted network by
network security (e.g. firewalls).

View file

@ -1,6 +1,6 @@
# Remoting
For an introduction of remoting capabilities of Akka please see @ref[Location Transparency](general/remoting.md).
For an introduction of remoting capabilities of Akka please see @ref:[Location Transparency](general/remoting.md).
@@@ note
@ -596,7 +596,7 @@ the other (the "server").
Note that if TLS is enabled with mutual authentication there is still a risk that an attacker can gain access to a valid certificate
by compromising any node with certificates issued by the same internal PKI tree.
See also a description of the settings in the @ref[Remote Configuration](remoting.md#remote-configuration) section.
See also a description of the settings in the @ref:[Remote Configuration](remoting.md#remote-configuration) section.
@@@ note

View file

@ -1,17 +1,135 @@
# Error Handling
# Error Handling in Streams
Strategies for how to handle exceptions from processing stream elements can be defined when
materializing the stream. The error handling strategies are inspired by actor supervision
strategies, but the semantics have been adapted to the domain of stream processing.
When a stage in a stream fails this will normally lead to the entire stream being torn down.
Each of the stages downstream gets informed about the failure and each upstream stage sees a cancellation.
@@@ warning
In many cases you may want to avoid complete stream failure, this can be done in a few different ways:
*ZipWith*, *GraphStage* junction, *ActorPublisher* source and *ActorSubscriber* sink
components do not honour the supervision strategy attribute yet.
* `recover` to emit a final element then complete the stream normally on upstream failure
* `recoverWithRetries` to create a new upstream and start consuming from that on failure
* Restarting sections of the stream after a backoff
* Using a supervision strategy for stages that support it
In addition to these built in tools for error handling, a common pattern is to wrap the stream
inside an actor, and have the actor restart the entire stream on failure.
## Recover
`recover` allows you to emit a final element and then complete the stream on an upstream failure.
Deciding which exceptions should be recovered is done through a `PartialFunction`. If an exception
does not have a @scala[matching case] @java[match defined] the stream is failed.
Recovering can be useful if you want to gracefully complete a stream on failure while letting
downstream know that there was a failure.
Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #recover }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #recover }
This will output:
Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #recover-output }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #recover-output }
## Recover with retries
`recoverWithRetries` allows you to put a new upstream in place of the failed one, recovering
stream failures up to a specified maximum number of times.
Deciding which exceptions should be recovered is done through a `PartialFunction`. If an exception
does not have a @scala[matching case] @java[match defined] the stream is failed.
Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #recoverWithRetries }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #recoverWithRetries }
This will output:
Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #recoverWithRetries-output }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #recoverWithRetries-output }
<a id="restart-with-backoff"></a>
## Delayed restarts with a backoff stage
Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams
also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts.
This pattern is useful when the stage fails or completes because some external resource is not available
and we need to give it some time to start-up again. One of the prime examples when this is useful is
when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded.
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.
The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`]
@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a
stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will
be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due
to the `maxBackoff` parameter):
Scala
: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #restart-with-backoff-source }
Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #restart-with-backoff-source }
Using a `randomFactor` to add a little bit of additional variance to the backoff intervals
is highly recommended, in order to avoid multiple streams re-start at the exact same point in time,
for example because they were stopped due to a shared resource such as the same server going down
and re-starting after the same configured interval. By adding additional randomness to the
re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact.
The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use
it in combination with a @ref:[`KillSwitch`](stream-dynamic.md#kill-switch), so that you can terminate it when needed:
Scala
: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #with-kill-switch }
Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch }
Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]
@java[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]. The `RestartSink` is restarted when
it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out
port sends an error.
## Supervision Strategies
@@@ note
The stages that support supervision strategies are explicitly documented to do so, if there is
nothing in the documentation of a stage saying that it adheres to the supervision strategy it
means it fails rather than applies supervision.
@@@
## Supervision Strategies
The error handling strategies are inspired by actor supervision strategies, but the semantics
have been adapted to the domain of stream processing. The most important difference is that
supervision is not automatically applied to stream stages but instead something that each stage
has to implement explicitly.
For many stages it may not even make sense to implement support for supervision strategies,
this is especially true for stages connecting to external technologies where for example a
failed connection will likely still fail if a new connection is tried immediately (see
@ref:[Restart with back off](#restart-with-backoff) for such scenarios).
For stages that do implement supervision, the strategies for how to handle exceptions from
processing stream elements can be selected when materializing the stream through use of an attribute.
There are three ways to handle exceptions from application code:
@ -65,9 +183,10 @@ Scala
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #restart-section }
## Errors from mapAsync
### Errors from mapAsync
Stream supervision can also be applied to the futures of `mapAsync`.
Stream supervision can also be applied to the futures of `mapAsync` and `mapAsyncUnordered` even if such
failures happen in the future rather than inside the stage itself.
Let's say that we use an external service to lookup email addresses and we would like to
discard those that cannot be found.
@ -103,47 +222,3 @@ Java
If we would not use `Resume` the default stopping strategy would complete the stream
with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally].
## Delayed restarts with a backoff stage
Just as Akka provides the @ref:[backoff supervision pattern for actors](../general/supervision.md#backoff-supervisor), Akka streams
also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts.
This pattern is useful when the stage fails or completes because some external resource is not available
and we need to give it some time to start-up again. One of the prime examples when this is useful is
when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded.
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.
The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`]
@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a
stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will
be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due
to the `maxBackoff` parameter):
Scala
: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #restart-with-backoff-source }
Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #restart-with-backoff-source }
Using a `randomFactor` to add a little bit of additional variance to the backoff intervals
is highly recommended, in order to avoid multiple streams re-start at the exact same point in time,
for example because they were stopped due to a shared resource such as the same server going down
and re-starting after the same configured interval. By adding additional randomness to the
re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact.
The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use
it in combination with a @ref:[`KillSwitch`](stream-dynamic.md#kill-switch), so that you can terminate it when needed:
Scala
: @@snip [RestartDocSpec.scala]($code$/scala/docs/stream/RestartDocSpec.scala) { #with-kill-switch }
Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch }
Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]
@java[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]. The `RestartSink` is restarted when
it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out
port sends an error.

View file

@ -49,7 +49,7 @@ If you don't care about the reply values and only use them as back-pressure sign
can use `Sink.ignore` after the `mapAsync` stage and then actor is effectively a sink
of the stream.
The same pattern can be used with @ref[Actor routers](../routing.md). Then you
The same pattern can be used with @ref:[Actor routers](../routing.md). Then you
can use `mapAsyncUnordered` for better efficiency if you don't care about the
order of the emitted downstream elements (the replies).
@ -493,7 +493,7 @@ Please note that a factory is necessary to achieve reusability of the resulting
As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and
any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you
implement Reactive Streams integrations with built-in stages or @ref[custom stages](stream-customize.md).
implement Reactive Streams integrations with built-in stages or @ref:[custom stages](stream-customize.md).
For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are
provided to support implementing Reactive Streams `Publisher` and `Subscriber` with
@ -524,7 +524,7 @@ type-safe and safe to implement `akka.stream.stage.GraphStage`. It can also
expose a "stage actor ref" is needed to be addressed as-if an Actor.
Custom stages implemented using `GraphStage` are also automatically fusable.
To learn more about implementing custom stages using it refer to @ref[Custom processing with GraphStage](stream-customize.md#graphstage).
To learn more about implementing custom stages using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage).
@@@
@ -590,7 +590,7 @@ type-safe and safe to implement `akka.stream.stage.GraphStage`. It can also
expose a "stage actor ref" is needed to be addressed as-if an Actor.
Custom stages implemented using `GraphStage` are also automatically fusable.
To learn more about implementing custom stages using it refer to @ref[Custom processing with GraphStage](stream-customize.md#graphstage).
To learn more about implementing custom stages using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage).
@@@

View file

@ -63,13 +63,13 @@ composition, therefore it may take some careful study of this subject until you
feel familiar with the tools and techniques. The documentation is here to help
and for best results we recommend the following approach:
* Read the @ref[Quick Start Guide](stream-quickstart.md#stream-quickstart) to get a feel for how streams
* Read the @ref:[Quick Start Guide](stream-quickstart.md#stream-quickstart) to get a feel for how streams
look like and what they can do.
* The top-down learners may want to peruse the @ref[Design Principles behind Akka Streams](../general/stream/stream-design.md) at this
* The top-down learners may want to peruse the @ref:[Design Principles behind Akka Streams](../general/stream/stream-design.md) at this
point.
* The bottom-up learners may feel more at home rummaging through the
@ref[Streams Cookbook](stream-cookbook.md).
@ref:[Streams Cookbook](stream-cookbook.md).
* For a complete overview of the built-in processing stages you can look at the
table in @ref[stages overview](stages-overview.md)
table in @ref:[stages overview](stages-overview.md)
* The other sections can be read sequentially or as needed during the previous
steps, each digging deeper into specific topics.

View file

@ -1,7 +1,7 @@
# Working with streaming IO
Akka Streams provides a way of handling File IO and TCP connections with Streams.
While the general approach is very similar to the @ref[Actor based TCP handling](../io-tcp.md) using Akka IO,
While the general approach is very similar to the @ref:[Actor based TCP handling](../io-tcp.md) using Akka IO,
by using Akka Streams you are freed of having to manually react to back-pressure signals,
as the library does it transparently for you.
@ -86,7 +86,7 @@ When writing such end-to-end back-pressured systems you may sometimes end up in
in which *either side is waiting for the other one to start the conversation*. One does not need to look far
to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we
are connecting to would start the conversation, which effectively means both sides are back-pressured and can not get
the conversation started. There are multiple ways of dealing with this which are explained in depth in @ref[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles),
the conversation started. There are multiple ways of dealing with this which are explained in depth in @ref:[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles),
however in client-server scenarios it is often the simplest to make either side simply send an initial message.
@@@ note

View file

@ -47,7 +47,7 @@ not be able to operate at full capacity <a id="^1" href="#1">[1]</a>.
@@@ note
Asynchronous stream processing stages have internal buffers to make communication between them more efficient.
For more details about the behavior of these and how to add additional buffers refer to @ref[Buffers and working with rate](stream-rate.md).
For more details about the behavior of these and how to add additional buffers refer to @ref:[Buffers and working with rate](stream-rate.md).
@@@
@ -72,7 +72,7 @@ One drawback of the example code above that it does not preserve the ordering of
if children like to track their "own" pancakes. In those cases the `Balance` and `Merge` stages should be replaced
by strict-round robing balancing and merging stages that put in and take out pancakes in a strict order.
A more detailed example of creating a worker pool can be found in the cookbook: @ref[Balancing jobs to a fixed pool of workers](stream-cookbook.md#cookbook-balance)
A more detailed example of creating a worker pool can be found in the cookbook: @ref:[Balancing jobs to a fixed pool of workers](stream-cookbook.md#cookbook-balance)
## Combining pipelining and parallel processing

View file

@ -194,7 +194,7 @@ second the throttle combinator will assert *back-pressure* upstream.
This is basically all there is to Akka Streams in a nutshell—glossing over the
fact that there are dozens of sources and sinks and many more stream
transformation combinators to choose from, see also @ref[stages overview](stages-overview.md).
transformation combinators to choose from, see also @ref:[stages overview](stages-overview.md).
# Reactive Tweets
@ -219,7 +219,7 @@ Java
@@@ note
If you would like to get an overview of the used vocabulary first instead of diving head-first
into an actual example you can have a look at the @ref[Core concepts](stream-flows-and-basics.md#core-concepts) and @ref[Defining and running streams](stream-flows-and-basics.md#defining-and-running-streams)
into an actual example you can have a look at the @ref:[Core concepts](stream-flows-and-basics.md#core-concepts) and @ref:[Defining and running streams](stream-flows-and-basics.md#defining-and-running-streams)
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
@@@
@ -239,7 +239,7 @@ Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #materializer-setup }
The `ActorMaterializer` can optionally take `ActorMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also @ref[Buffers for asynchronous stages](stream-rate.md#async-stream-buffers)), the dispatcher to
materialization properties, such as default buffer sizes (see also @ref:[Buffers for asynchronous stages](stream-rate.md#async-stream-buffers)), the dispatcher to
be used by the pipeline etc. These can be overridden with `withAttributes` on `Flow`, `Source`, `Sink` and `Graph`.
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a @scala[`Source[Out, M]`]@java[`Source<Out, M>`]:
@ -269,7 +269,7 @@ Scala
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-filter-map }
Finally in order to @ref[materialize](stream-flows-and-basics.md#stream-materialization) and run the stream computation we need to attach
Finally in order to @ref:[materialize](stream-flows-and-basics.md#stream-materialization) and run the stream computation we need to attach
the Flow to a @scala[`Sink`]@java[`Sink<T, M>`] that will get the Flow running. The simplest way to do this is to call
`runWith(sink)` on a @scala[`Source`]@java[`Source<Out, M>`]. For convenience a number of common Sinks are predefined and collected as @scala[]@java[static] methods on
the @scala[`Sink` companion object]@java[`Sink class`].
@ -360,16 +360,16 @@ Both `Graph` and `RunnableGraph` are *immutable, thread-safe, and freely shareab
A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports
expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
explained in detail in @ref[Modularity, Composition and Hierarchy](stream-composition.md). It is also possible to wrap complex computation graphs
explained in detail in @ref:[Modularity, Composition and Hierarchy](stream-composition.md). It is also possible to wrap complex computation graphs
as Flows, Sinks or Sources, which will be explained in detail in
@scala[@ref[Constructing Sources, Sinks and Flows from Partial Graphs](stream-graphs.md#constructing-sources-sinks-flows-from-partial-graphs)]@java[@ref:[Constructing and combining Partial Graphs](stream-graphs.md#partial-graph-dsl)].
@scala[@ref:[Constructing Sources, Sinks and Flows from Partial Graphs](stream-graphs.md#constructing-sources-sinks-flows-from-partial-graphs)]@java[@ref:[Constructing and combining Partial Graphs](stream-graphs.md#partial-graph-dsl)].
## Back-pressure in action
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
@ref[Back-pressure explained](stream-flows-and-basics.md#back-pressure-explained).
@ref:[Back-pressure explained](stream-flows-and-basics.md#back-pressure-explained).
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
@ -438,7 +438,7 @@ Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-runnable-flow-materialized-twice }
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in @ref[Stream Materialization](stream-flows-and-basics.md#stream-materialization). Summing up this section, now we know
steering these elements which will be discussed in detail in @ref:[Stream Materialization](stream-flows-and-basics.md#stream-materialization). Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
Scala

View file

@ -11,6 +11,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.japi.pf.PFBuilder;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
@ -138,4 +139,64 @@ public class FlowErrorDocTest extends AbstractJavaTest {
result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void demonstrateRecover() {
//#recover
final Materializer mat = ActorMaterializer.create(system);
Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).map(n -> {
if (n < 5) return n.toString();
else throw new RuntimeException("Boom!");
}).recover(new PFBuilder()
.match(RuntimeException.class, ex -> "stream truncated")
.build()
).runForeach(System.out::println, mat);
//#recover
/*
Output:
//#recover-output
0
1
2
3
4
stream truncated
//#recover-output
*/
}
@Test
public void demonstrateRecoverWithRetries() {
//#recoverWithRetries
final Materializer mat = ActorMaterializer.create(system);
Source<String, NotUsed> planB = Source.from(Arrays.asList("five", "six", "seven", "eight"));
Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).map(n -> {
if (n < 5) return n.toString();
else throw new RuntimeException("Boom!");
}).recoverWithRetries(
1, // max attempts
new PFBuilder()
.match(RuntimeException.class, ex -> planB)
.build()
).runForeach(System.out::println, mat);
//#recoverWithRetries
/*
Output:
//#recoverWithRetries-output
0
1
2
3
4
five
six
seven
eight
//#recoverWithRetries-output
*/
}
}

View file

@ -89,4 +89,57 @@ class FlowErrorDocSpec extends AkkaSpec {
Await.result(result, 3.seconds) should be(Vector(0, 1, 4, 0, 5, 12))
}
"demonstrate recover" in {
implicit val materializer = ActorMaterializer()
//#recover
Source(0 to 6).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")
).recover {
case _: RuntimeException => "stream truncated"
}.runForeach(println)
//#recover
/*
Output:
//#recover-output
0
1
2
3
4
stream truncated
//#recover-output
*/
}
"demonstrate recoverWithRetries" in {
implicit val materializer = ActorMaterializer()
//#recoverWithRetries
val planB = Source(List("five", "six", "seven", "eight"))
Source(0 to 10).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")
).recoverWithRetries(attempts = 1, {
case _: RuntimeException => planB
}).runForeach(println)
//#recoverWithRetries
/*
Output:
//#recoverWithRetries-output
0
1
2
3
4
five
six
seven
eight
//#recoverWithRetries-output
*/
}
}