diff --git a/akka-docs/src/main/paradox/java/actors.md b/akka-docs/src/main/paradox/java/actors.md index fc28505da4..02b91cee91 100644 --- a/akka-docs/src/main/paradox/java/actors.md +++ b/akka-docs/src/main/paradox/java/actors.md @@ -408,7 +408,6 @@ result: It is always preferable to communicate with other Actors using their ActorRef instead of relying upon ActorSelection. Exceptions are -> * sending messages using the @ref:[At-Least-Once Delivery](persistence.md#at-least-once-delivery) facility * initiating first contact with a remote system @@ -465,9 +464,12 @@ An example demonstrating actor look-up is given in @ref:[Remoting Sample](remoti ## Messages and immutability -**IMPORTANT**: Messages can be any kind of object but have to be -immutable. Akka can’t enforce immutability (yet) so this has to be by -convention. +@@@ warning { title=IMPORTANT } + +Messages can be any kind of object but have to be immutable. Akka can’t enforce +immutability (yet) so this has to be by convention. + +@@@ Here is an example of an immutable message: @@ -1029,4 +1031,4 @@ the potential issues is that messages might be lost when sent to remote actors. an uninitialized state might lead to the condition that it receives a user message before the initialization has been done. -@@@ \ No newline at end of file +@@@ diff --git a/akka-docs/src/main/paradox/java/agents.md b/akka-docs/src/main/paradox/java/agents.md index bf39c5bf02..521f9c5d27 100644 --- a/akka-docs/src/main/paradox/java/agents.md +++ b/akka-docs/src/main/paradox/java/agents.md @@ -2,9 +2,9 @@ Agents in Akka are inspired by [agents in Clojure](http://clojure.org/agents). -@@@ warning +@@@ warning { title="Deprecation warning" } -**Deprecation warning** - Agents have been deprecated and are scheduled for removal +Agents have been deprecated and are scheduled for removal in the next major version. We have found that their leaky abstraction (they do not work over the network) make them inferior to pure Actors, and in face of the soon inclusion of Akka Typed we see little value in maintaining the current Agents. @@ -90,4 +90,4 @@ Agents participating in enclosing STM transaction is a deprecated feature in 2.3 If an Agent is used within an enclosing `Scala STM transaction`, then it will participate in that transaction. If you send to an Agent within a transaction then the dispatch to the Agent will be held until that transaction commits, and discarded if the -transaction is aborted. \ No newline at end of file +transaction is aborted. diff --git a/akka-docs/src/main/paradox/java/cluster-client.md b/akka-docs/src/main/paradox/java/cluster-client.md index fde0e0ead0..ddb95352d8 100644 --- a/akka-docs/src/main/paradox/java/cluster-client.md +++ b/akka-docs/src/main/paradox/java/cluster-client.md @@ -42,23 +42,23 @@ The `ClusterClientReceptionist` sends out notifications in relation to having re from a `ClusterClient`. This notification enables the server containing the receptionist to become aware of what clients are connected. -**1. ClusterClient.Send** +1. **ClusterClient.Send** -The message will be delivered to one recipient with a matching path, if any such -exists. If several entries match the path the message will be delivered -to one random destination. The `sender` of the message can specify that local -affinity is preferred, i.e. the message is sent to an actor in the same local actor -system as the used receptionist actor, if any such exists, otherwise random to any other -matching entry. + The message will be delivered to one recipient with a matching path, if any such + exists. If several entries match the path the message will be delivered + to one random destination. The `sender` of the message can specify that local + affinity is preferred, i.e. the message is sent to an actor in the same local actor + system as the used receptionist actor, if any such exists, otherwise random to any other + matching entry. -**2. ClusterClient.SendToAll** +2. **ClusterClient.SendToAll** -The message will be delivered to all recipients with a matching path. + The message will be delivered to all recipients with a matching path. -**3. ClusterClient.Publish** +3. **ClusterClient.Publish** -The message will be delivered to all recipients Actors that have been registered as subscribers -to the named topic. + The message will be delivered to all recipients Actors that have been registered as subscribers + to the named topic. Response messages from the destination actor are tunneled via the receptionist to avoid inbound connections from other cluster nodes to the client, i.e. @@ -193,4 +193,4 @@ within a configurable interval. This is configured with the `reconnect-timeout`, This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the client will be stopped in that case a monitoring actor can watch it and upon `Terminate` a new set of initial -contacts can be fetched and a new cluster client started. \ No newline at end of file +contacts can be fetched and a new cluster client started. diff --git a/akka-docs/src/main/paradox/java/cluster-sharding.md b/akka-docs/src/main/paradox/java/cluster-sharding.md index 28f7575aca..d67de743f8 100644 --- a/akka-docs/src/main/paradox/java/cluster-sharding.md +++ b/akka-docs/src/main/paradox/java/cluster-sharding.md @@ -57,7 +57,6 @@ identifier and the shard identifier from incoming messages. This example illustrates two different ways to define the entity identifier in the messages: -> * The `Get` message includes the identifier itself. * The `EntityEnvelope` holds the identifier, and the actual message that is sent to the entity actor is wrapped in the envelope. @@ -418,4 +417,4 @@ a `ShardRegion.ClusterShardingStats` containing the identifiers of the shards ru of entities that are alive in each shard. The purpose of these messages is testing and monitoring, they are not provided to give access to -directly sending messages to the individual entities. \ No newline at end of file +directly sending messages to the individual entities. diff --git a/akka-docs/src/main/paradox/java/dispatchers.md b/akka-docs/src/main/paradox/java/dispatchers.md index 0d0e21fae4..e5228f15b8 100644 --- a/akka-docs/src/main/paradox/java/dispatchers.md +++ b/akka-docs/src/main/paradox/java/dispatchers.md @@ -74,33 +74,37 @@ where you'd use periods to denote sub-sections, like this: `"foo.bar.my-dispatch There are 3 different types of message dispatchers: - * Dispatcher - * This is an event-based dispatcher that binds a set of Actors to a thread pool. It is the default dispatcher -used if one is not specified. +* **Dispatcher** + + This is an event-based dispatcher that binds a set of Actors to a thread + pool. It is the default dispatcher used if one is not specified. + * Sharability: Unlimited * Mailboxes: Any, creates one per Actor * Use cases: Default dispatcher, Bulkheading - * - Driven by: - `java.util.concurrent.ExecutorService` - : specify using "executor" using "fork-join-executor", -"thread-pool-executor" or the FQCN of -an `akka.dispatcher.ExecutorServiceConfigurator` - - * PinnedDispatcher - * This dispatcher dedicates a unique thread for each actor using it; i.e. each actor will have its own thread pool with only one thread in the pool. + * Driven by: `java.util.concurrent.ExecutorService`. + Specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of + an `akka.dispatcher.ExecutorServiceConfigurator`. + +* **PinnedDispatcher** + + This dispatcher dedicates a unique thread for each actor using it; i.e. + each actor will have its own thread pool with only one thread in the pool. + * Sharability: None * Mailboxes: Any, creates one per Actor * Use cases: Bulkheading - * - Driven by: Any - `akka.dispatch.ThreadPoolExecutorConfigurator` - : by default a "thread-pool-executor" + * Driven by: Any `akka.dispatch.ThreadPoolExecutorConfigurator`. + By default a "thread-pool-executor". - * CallingThreadDispatcher - * This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, -but it can be used from different threads concurrently for the same actor. See @ref:[Java-CallingThreadDispatcher](testing.md#java-callingthreaddispatcher) -for details and restrictions. +* **CallingThreadDispatcher** + + This dispatcher runs invocations on the current thread only. This + dispatcher does not create any new threads, but it can be used from + different threads concurrently for the same actor. + See @ref:[Scala-CallingThreadDispatcher](testing.md#scala-callingthreaddispatcher) + for details and restrictions. + * Sharability: Unlimited * Mailboxes: Any, creates one per Actor per Thread (on demand) * Use cases: Testing @@ -135,4 +139,4 @@ and that pool will have only one thread. Note that it's not guaranteed that the *same* thread is used over time, since the core pool timeout is used for `PinnedDispatcher` to keep resource usage down in case of idle actors. To use the same thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the -configuration of the `PinnedDispatcher`. \ No newline at end of file +configuration of the `PinnedDispatcher`. diff --git a/akka-docs/src/main/paradox/java/fault-tolerance.md b/akka-docs/src/main/paradox/java/fault-tolerance.md index d1cc029cfa..4908305d66 100644 --- a/akka-docs/src/main/paradox/java/fault-tolerance.md +++ b/akka-docs/src/main/paradox/java/fault-tolerance.md @@ -16,7 +16,7 @@ Read the following source code. The inlined comments explain the different piece the fault handling and why they are added. It is also highly recommended to run this sample as it is easy to follow the log output to understand what is happening in runtime. -@@toc +@@toc { depth=1 } @@@ index @@ -157,4 +157,4 @@ different supervisor which overrides this behavior. With this parent, the child survives the escalated restart, as demonstrated in the last test: -@@snip [FaultHandlingTest.java]($code$/java/jdocs/actor/FaultHandlingTest.java) { #escalate-restart } \ No newline at end of file +@@snip [FaultHandlingTest.java]($code$/java/jdocs/actor/FaultHandlingTest.java) { #escalate-restart } diff --git a/akka-docs/src/main/paradox/java/fsm.md b/akka-docs/src/main/paradox/java/fsm.md index 0c16ca0a0f..a1eafc4a04 100644 --- a/akka-docs/src/main/paradox/java/fsm.md +++ b/akka-docs/src/main/paradox/java/fsm.md @@ -7,13 +7,11 @@ an Akka Actor and is best described in the [Erlang design principles](http://www A FSM can be described as a set of relations of the form: -> -**State(S) x Event(E) -> Actions (A), State(S')** +> **State(S) x Event(E) -> Actions (A), State(S')** These relations are interpreted as meaning: -> -*If we are in state S and the event E occurs, we should perform the actions A +> *If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S'.* ## A Simple Example @@ -52,7 +50,6 @@ The basic strategy is to declare the actor, by inheriting the `AbstractFSM` clas and specifying the possible states and data values as type parameters. Within the body of the actor a DSL is used for declaring the state machine: -> * `startWith` defines the initial state and initial data * then there is one `when() { ... }` declaration per state to be handled (could potentially be multiple ones, the passed @@ -121,7 +118,6 @@ the FSM logic. The `AbstractFSM` class takes two type parameters: -> 1. the supertype of all state names, usually an enum, 2. the type of the state data which are tracked by the `AbstractFSM` module itself. @@ -139,8 +135,9 @@ internal state explicit in a few well-known places. A state is defined by one or more invocations of the method -> -`when([, stateTimeout = ])(stateFunction)`. +``` +when([, stateTimeout = ])(stateFunction) +``` The given name must be an object which is type-compatible with the first type parameter given to the `AbstractFSM` class. This object is used as a hash key, @@ -179,8 +176,9 @@ It is recommended practice to declare the states as an enum and then verify that Each FSM needs a starting point, which is declared using -> -`startWith(state, data[, timeout])` +``` +startWith(state, data[, timeout]) +``` The optionally given timeout argument overrides any specification given for the desired initial state. If you want to cancel a default timeout, use @@ -260,8 +258,9 @@ Up to this point, the FSM DSL has been centered on states and events. The dual view is to describe it as a series of transitions. This is enabled by the method -> -`onTransition(handler)` +``` +onTransition(handler) +``` which associates actions with a transition instead of with a state and event. The handler is a partial function which takes a pair of states as input; no @@ -310,8 +309,9 @@ the listener. Besides state timeouts, FSM manages timers identified by `String` names. You may set a timer using -> -`setTimer(name, msg, interval, repeat)` +``` +setTimer(name, msg, interval, repeat) +``` where `msg` is the message object which will be sent after the duration `interval` has elapsed. If `repeat` is `true`, then the timer is @@ -321,15 +321,17 @@ adding the new timer. Timers may be canceled using -> -`cancelTimer(name)` +``` +cancelTimer(name) +``` which is guaranteed to work immediately, meaning that the scheduled message will not be processed after this call even if the timer already fired and queued it. The status of any timer may be inquired with -> -`isTimerActive(name)` +``` +isTimerActive(name) +``` These named timers complement state timeouts because they are not affected by intervening reception of other messages. @@ -338,8 +340,9 @@ intervening reception of other messages. The FSM is stopped by specifying the result state as -> -`stop([reason[, data]])` +``` +stop([reason[, data]]) +``` The reason must be one of `Normal` (which is the default), `Shutdown` or `Failure(reason)`, and the second argument may be given to change the @@ -396,7 +399,6 @@ event trace by `LoggingFSM` instances: This FSM will log at DEBUG level: -> * all processed events, including `StateTimeout` and scheduled timer messages * every setting and cancellation of named timers @@ -434,4 +436,4 @@ zero. A bigger FSM example contrasted with Actor's `become`/`unbecome` can be downloaded as a ready to run @extref[Akka FSM sample](ecs:akka-samples-fsm-java) together with a tutorial. The source code of this sample can be found in the -@extref[Akka Samples Repository](samples:akka-sample-fsm-java). \ No newline at end of file +@extref[Akka Samples Repository](samples:akka-sample-fsm-java). diff --git a/akka-docs/src/main/paradox/java/io-udp.md b/akka-docs/src/main/paradox/java/io-udp.md index 494e6a1069..a580d33e5a 100644 --- a/akka-docs/src/main/paradox/java/io-udp.md +++ b/akka-docs/src/main/paradox/java/io-udp.md @@ -3,7 +3,6 @@ UDP is a connectionless datagram protocol which offers two different ways of communication on the JDK level: -> * sockets which are free to send datagrams to any destination and receive datagrams from any origin * sockets which are restricted to communication with one specific remote @@ -98,4 +97,4 @@ Another socket option will be needed to join a multicast group. Socket options must be provided to `UdpMessage.bind` command. -@@snip [JavaUdpMulticast.java]($code$/java/jdocs/io/JavaUdpMulticast.java) { #bind } \ No newline at end of file +@@snip [JavaUdpMulticast.java]($code$/java/jdocs/io/JavaUdpMulticast.java) { #bind } diff --git a/akka-docs/src/main/paradox/java/logging.md b/akka-docs/src/main/paradox/java/logging.md index ec60a1b439..0272dfe71d 100644 --- a/akka-docs/src/main/paradox/java/logging.md +++ b/akka-docs/src/main/paradox/java/logging.md @@ -263,7 +263,6 @@ stdout logger is `WARNING` and it can be silenced completely by setting Akka provides a logger for [SL4FJ](http://www.slf4j.org/). This module is available in the 'akka-slf4j.jar'. It has a single dependency: the slf4j-api jar. In your runtime, you also need a SLF4J backend. We recommend [Logback](http://logback.qos.ch/): -> ```xml ch.qos.logback @@ -499,4 +498,4 @@ shown below: ```scala final LoggingAdapter log = Logging.getLogger(system.eventStream(), "my.string"); -``` \ No newline at end of file +``` diff --git a/akka-docs/src/main/paradox/java/persistence-schema-evolution.md b/akka-docs/src/main/paradox/java/persistence-schema-evolution.md index a47a69257a..5f48cb9572 100644 --- a/akka-docs/src/main/paradox/java/persistence-schema-evolution.md +++ b/akka-docs/src/main/paradox/java/persistence-schema-evolution.md @@ -37,7 +37,7 @@ type to `PersistentActor` s and @ref:[persistence queries](persistence-query.md) instead of having to explicitly deal with different schemas. In summary, schema evolution in event sourced systems exposes the following characteristics: -: + * Allow the system to continue operating without large scale migrations to be applied, * Allow the system to read "old" events from the underlying storage, however present them in a "new" view to the application logic, * Transparently promote events to the latest versions during recovery (or queries) such that the business logic need not consider multiple versions of events @@ -111,7 +111,7 @@ The below figure explains how the default serialization scheme works, and how it user provided message itself, which we will from here on refer to as the `payload` (highlighted in yellow): ![persistent-message-envelope.png](../images/persistent-message-envelope.png) -> + Akka Persistence provided serializers wrap the user payload in an envelope containing all persistence-relevant information. **If the Journal uses provided Protobuf serializers for the wrapper types (e.g. PersistentRepr), then the payload will be serialized using the user configured serializer, and if none is provided explicitly, Java serialization will be used for it.** @@ -234,7 +234,7 @@ add the overhead of having to maintain the schema. When using serializers like t (except renaming the field and method used during serialization) is needed to perform such evolution: ![persistence-serializer-rename.png](../images/persistence-serializer-rename.png) -> + This is how such a rename would look in protobuf: @@snip [PersistenceSchemaEvolutionDocSpec.scala]($code$/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala) { #protobuf-rename-proto } @@ -262,7 +262,7 @@ automatically by the serializer. You can do these kinds of "promotions" either m or using a library like [Stamina](https://github.com/javapenos/stamina) which helps to create those `V1->V2->V3->...->Vn` promotion chains without much boilerplate. ![persistence-manual-rename.png](../images/persistence-manual-rename.png) -> + The following snippet showcases how one could apply renames if working with plain JSON (using a `JsObject` as an example JSON representation): @@ -296,7 +296,7 @@ for the recovery mechanisms that this entails. For example, a naive way of filte being delivered to a recovering `PersistentActor` is pretty simple, as one can simply filter them out in an @ref:[EventAdapter](persistence.md#event-adapters): ![persistence-drop-event.png](../images/persistence-drop-event.png) -> + The `EventAdapter` can drop old events (**O**) by emitting an empty `EventSeq`. Other events can simply be passed through (**E**). @@ -311,7 +311,6 @@ In the just described technique we have saved the PersistentActor from receiving out in the `EventAdapter`, however the event itself still was deserialized and loaded into memory. This has two notable *downsides*: -> * first, that the deserialization was actually performed, so we spent some of out time budget on the deserialization, even though the event does not contribute anything to the persistent actors state. * second, that we are *unable to remove the event class* from the system – since the serializer still needs to create @@ -326,7 +325,7 @@ This can for example be implemented by using an `SerializerWithStringManifest` that the type is no longer needed, and skip the deserialization all-together: ![persistence-drop-event-serializer.png](../images/persistence-drop-event-serializer.png) -> + The serializer is aware of the old event types that need to be skipped (**O**), and can skip deserializing them alltogether by simply returning a "tombstone" (**T**), which the EventAdapter converts into an empty EventSeq. Other events (**E**) can simply be passed through. @@ -360,7 +359,7 @@ classes which very often may be less user-friendly yet highly optimised for thro these types in a 1:1 style as illustrated below: ![persistence-detach-models.png](../images/persistence-detach-models.png) -> + Domain events (**A**) are adapted to the data model events (**D**) by the `EventAdapter`. The data model can be a format natively understood by the journal, such that it can store it more efficiently or include additional data for the event (e.g. tags), for ease of later querying. @@ -442,7 +441,7 @@ on what the user actually intended to change (instead of the composite `UserDeta of our model). ![persistence-event-adapter-1-n.png](../images/persistence-event-adapter-1-n.png) -> + The `EventAdapter` splits the incoming event into smaller more fine grained events during recovery. During recovery however, we now need to convert the old `V1` model into the `V2` representation of the change. @@ -452,4 +451,4 @@ and the address change is handled similarily: @@snip [PersistenceSchemaEvolutionDocTest.java]($code$/java/jdocs/persistence/PersistenceSchemaEvolutionDocTest.java) { #split-events-during-recovery } By returning an `EventSeq` from the event adapter, the recovered event can be converted to multiple events before -being delivered to the persistent actor. \ No newline at end of file +being delivered to the persistent actor. diff --git a/akka-docs/src/main/paradox/java/remoting-artery.md b/akka-docs/src/main/paradox/java/remoting-artery.md index c576b0b44e..8f639ead2e 100644 --- a/akka-docs/src/main/paradox/java/remoting-artery.md +++ b/akka-docs/src/main/paradox/java/remoting-artery.md @@ -703,10 +703,11 @@ Artery has been designed for low latency and as a result it can be CPU hungry wh This is not always desirable. It is possible to tune the tradeoff between CPU usage and latency with the following configuration: -> +``` # Values can be from 1 to 10, where 10 strongly prefers low latency # and 1 strongly prefers less CPU usage akka.remote.artery.advanced.idle-cpu-level = 1 +``` By setting this value to a lower number, it tells Akka to do longer "sleeping" periods on its thread dedicated for [spin-waiting](https://en.wikipedia.org/wiki/Busy_waiting) and hence reducing CPU load when there is no @@ -789,4 +790,4 @@ akka { } } } -``` \ No newline at end of file +``` diff --git a/akka-docs/src/main/paradox/java/remoting.md b/akka-docs/src/main/paradox/java/remoting.md index 8a4c6f9dc4..79c7175d74 100644 --- a/akka-docs/src/main/paradox/java/remoting.md +++ b/akka-docs/src/main/paradox/java/remoting.md @@ -633,4 +633,4 @@ Keep in mind that local.address will most likely be in one of private network ra * *172.16.0.0 - 172.31.255.255* (network class B) * *192.168.0.0 - 192.168.255.255* (network class C) -For further details see [RFC 1597]([https://tools.ietf.org/html/rfc1597](https://tools.ietf.org/html/rfc1597)) and [RFC 1918]([https://tools.ietf.org/html/rfc1918](https://tools.ietf.org/html/rfc1918)). \ No newline at end of file +For further details see [RFC 1597](https://tools.ietf.org/html/rfc1597) and [RFC 1918](https://tools.ietf.org/html/rfc1918). diff --git a/akka-docs/src/main/paradox/java/stream/stages-overview.md b/akka-docs/src/main/paradox/java/stream/stages-overview.md index 2d92c63a63..a91bf45a9d 100644 --- a/akka-docs/src/main/paradox/java/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/java/stream/stages-overview.md @@ -1,9 +1,13 @@ # Overview of built-in stages and their semantics +
+ ## Source stages These built-in sources are available from `akka.stream.javadsl.Source`: +--------------------------------------------------------------- + ### fromIterator Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew on @@ -15,6 +19,8 @@ If the iterator perform blocking operations, make sure to run it on a separate d **completes** when the iterator reaches its end +--------------------------------------------------------------- + ### from Stream the values of an `Iterable`. Make sure the `Iterable` is immutable or at least not modified after being used @@ -24,6 +30,8 @@ as a source. **completes** after the last element of the iterable has been emitted +--------------------------------------------------------------- + ### single Stream a single object @@ -32,6 +40,8 @@ Stream a single object **completes** when the single value has been emitted +--------------------------------------------------------------- + ### repeat Stream a single object repeatedly @@ -40,6 +50,8 @@ Stream a single object repeatedly **completes** never +--------------------------------------------------------------- + ### cycle Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning @@ -51,6 +63,8 @@ exception. **completes** never +--------------------------------------------------------------- + ### tick A periodical repetition of an arbitrary object. Delay of first tick is specified @@ -60,6 +74,8 @@ separately from interval of the following ticks. **completes** never +--------------------------------------------------------------- + ### fromCompletionStage Send the single value of the `CompletionStage` when it completes and there is demand. @@ -69,6 +85,8 @@ If the `CompletionStage` fails the stream is failed with that exception. **completes** after the `CompletionStage` has completed or when it fails +--------------------------------------------------------------- + ### fromFuture Send the single value of the Scala `Future` when it completes and there is demand. @@ -78,6 +96,8 @@ If the future fails the stream is failed with that exception. **completes** after the future has completed +--------------------------------------------------------------- + ### fromFutureSource Streams the elements of the given future source once it successfully completes. @@ -87,6 +107,8 @@ If the future fails the stream is failed. **completes** after the *future* source completes +--------------------------------------------------------------- + ### fromSourceCompletionStage Streams the elements of an asynchronous source once its given *completion* stage completes. @@ -96,6 +118,8 @@ If the *completion* fails the stream is failed with that exception. **completes** after the asynchronous source completes +--------------------------------------------------------------- + ### unfold Stream the result of a function as long as it returns a `Optional`, the value inside the optional @@ -108,6 +132,8 @@ Can be used to implement many stateful sources without having to touch the more **completes** when the unfold function returns an empty value +--------------------------------------------------------------- + ### unfoldAsync Just like `unfold` but the fold function returns a `CompletionStage` which will cause the source to @@ -119,6 +145,8 @@ Can be used to implement many stateful sources without having to touch the more **completes** when the CompletionStage returned by the unfold function completes with an empty value +--------------------------------------------------------------- + ### empty Complete right away without ever emitting any elements. Useful when you have to provide a source to @@ -128,6 +156,8 @@ an API but there are no elements to emit. **completes** directly +--------------------------------------------------------------- + ### maybe Materialize a `CompletionStage` that can be completed with an `Optional`. @@ -138,6 +168,8 @@ complete directly. **completes** after emitting some value, or directly if the promise is completed with no value +--------------------------------------------------------------- + ### failed Fail directly with a user specified exception. @@ -146,7 +178,9 @@ Fail directly with a user specified exception. **completes** fails the stream directly with the given exception -#### lazily +--------------------------------------------------------------- + +### lazily Defers creation and materialization of a `Source` until there is demand. @@ -154,6 +188,8 @@ Defers creation and materialization of a `Source` until there is demand. **completes** depends on the wrapped `Source` +--------------------------------------------------------------- + ### actorPublisher Wrap an actor extending `ActorPublisher` as a source. @@ -162,6 +198,8 @@ Wrap an actor extending `ActorPublisher` as a source. **completes** when the actor stops +--------------------------------------------------------------- + ### actorRef Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain @@ -172,6 +210,8 @@ elements or failing the stream, the strategy is chosen by the user. **completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill` +--------------------------------------------------------------- + ### combine Combine several sources, using a given strategy such as merge or concat, into one source. @@ -180,6 +220,8 @@ Combine several sources, using a given strategy such as merge or concat, into on **completes** when all sources has completed +--------------------------------------------------------------- + ### range Emit each integer in a range, with an option to take bigger steps than 1. @@ -188,6 +230,8 @@ Emit each integer in a range, with an option to take bigger steps than 1. **completes** when the end of the range has been reached +--------------------------------------------------------------- + ### unfoldResource Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. @@ -196,6 +240,8 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa **completes** when read function returns `None` +--------------------------------------------------------------- + ### unfoldAsyncResource Wrap any resource that can be opened, queried for next element and closed using three distinct functions into a source. @@ -205,6 +251,8 @@ Functions return `CompletionStage` result to achieve asynchronous processing **completes** when `CompletionStage` from read function returns `None` +--------------------------------------------------------------- + ### queue Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains @@ -216,14 +264,20 @@ a strategy specified by the user. Functionality for tracking when an element has **completes** when downstream completes +--------------------------------------------------------------- + ### asSubscriber Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`. +--------------------------------------------------------------- + ### fromPublisher Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`. +--------------------------------------------------------------- + ### zipN Combine the elements of multiple streams into a stream of sequences. @@ -232,6 +286,8 @@ Combine the elements of multiple streams into a stream of sequences. **completes** when any upstream completes +--------------------------------------------------------------- + ### zipWithN Combine the elements of multiple streams into a stream of sequences using a combiner function. @@ -240,10 +296,16 @@ Combine the elements of multiple streams into a stream of sequences using a comb **completes** when any upstream completes +--------------------------------------------------------------- + +
+ ## Sink stages These built-in sinks are available from `akka.stream.javadsl.Sink`: +--------------------------------------------------------------- + ### head Materializes into a `CompletionStage` which completes with the first value arriving, @@ -253,6 +315,8 @@ after this the stream is canceled. If no element is emitted, the CompletionStage **backpressures** never +--------------------------------------------------------------- + ### headOption Materializes into a `CompletionStage>` which completes with the first value arriving wrapped in optional, @@ -262,6 +326,8 @@ or an empty optional if the stream completes without any elements emitted. **backpressures** never +--------------------------------------------------------------- + ### last Materializes into a `CompletionStage` which will complete with the last value emitted when the stream @@ -271,6 +337,8 @@ completes. If the stream completes with no elements the CompletionStage is faile **backpressures** never +--------------------------------------------------------------- + ### lastOption Materialize a `CompletionStage>` which completes with the last value @@ -281,6 +349,8 @@ completed with an empty optional. **backpressures** never +--------------------------------------------------------------- + ### ignore Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually @@ -290,12 +360,16 @@ do anything with the elements. **backpressures** never +--------------------------------------------------------------- + ### cancelled Immediately cancel the stream **cancels** immediately +--------------------------------------------------------------- + ### seq Collect values emitted from the stream into a collection, the collection is available through a `CompletionStage` or @@ -304,6 +378,8 @@ if more element are emitted the sink will cancel the stream **cancels** If too many values are collected +--------------------------------------------------------------- + ### foreach Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. @@ -317,6 +393,8 @@ Note that it is not safe to mutate state from the procedure. **backpressures** when the previous procedure invocation has not yet completed +--------------------------------------------------------------- + ### foreachParallel Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. @@ -325,6 +403,8 @@ Like `foreach` but allows up to `parallellism` procedure calls to happen in para **backpressures** when the previous parallel procedure invocations has not yet completed +--------------------------------------------------------------- + ### onComplete Invoke a callback when the stream has completed or failed. @@ -333,6 +413,8 @@ Invoke a callback when the stream has completed or failed. **backpressures** never +--------------------------------------------------------------- + ### lazyInit Invoke sinkFactory function to create a real sink upon receiving the first element. Internal `Sink` will not be created if there are no elements, @@ -342,6 +424,8 @@ because of completion or error. *fallback* will be invoked if there was no eleme **backpressures** when initialized and when created sink backpressures +--------------------------------------------------------------- + ### queue Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. The queue contains @@ -351,6 +435,8 @@ a buffer in case stream emitting elements faster than queue pulling them. **backpressures** when buffer has some space +--------------------------------------------------------------- + ### fold Fold over emitted element with a function, where each invocation will get the new element and the result from the @@ -365,6 +451,8 @@ between invocations. **backpressures** when the previous fold function invocation has not yet completed +--------------------------------------------------------------- + ### reduce Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation @@ -376,6 +464,8 @@ Materializes into a CompletionStage that will be completed by the last result of **backpressures** when the previous reduction function invocation has not yet completed +--------------------------------------------------------------- + ### combine Combine several sinks into one using a user specified strategy @@ -384,6 +474,8 @@ Combine several sinks into one using a user specified strategy **backpressures** depends on the strategy +--------------------------------------------------------------- + ### actorRef Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox. @@ -392,6 +484,8 @@ Send the elements from the stream to an `ActorRef`. No backpressure so care must **backpressures** never +--------------------------------------------------------------- + ### actorRefWithAck Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, @@ -401,6 +495,8 @@ to provide back pressure onto the sink. **backpressures** when the actor acknowledgement has not arrived +--------------------------------------------------------------- + ### actorSubscriber Create an actor from a `Props` upon materialization, where the actor implements `ActorSubscriber`, which will @@ -412,20 +508,30 @@ Materializes into an `ActorRef` to the created actor. **backpressures** depends on the actor implementation +--------------------------------------------------------------- + ### asPublisher Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`. +--------------------------------------------------------------- + ### fromSubscriber Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink +--------------------------------------------------------------- + +
+ ## Additional Sink and Source converters Sources and sinks for integrating with `java.io.InputStream` and `java.io.OutputStream` can be found on `StreamConverters`. As they are blocking APIs the implementations of these stages are run on a separate dispatcher configured through the `akka.stream.blocking-io-dispatcher`. +--------------------------------------------------------------- + ### fromOutputStream Create a sink that wraps an `OutputStream`. Takes a function that produces an `OutputStream`, when the sink is @@ -440,6 +546,8 @@ to handle multiple invocations. The `OutputStream` will be closed when the stream that flows into the `Sink` is completed, and the `Sink` will cancel its inflow when the `OutputStream` is no longer writable. +--------------------------------------------------------------- + ### asInputStream Create a sink which materializes into an `InputStream` that can be read to trigger demand through the sink. @@ -448,6 +556,8 @@ Bytes emitted through the stream will be available for reading through the `Inpu The `InputStream` will be ended when the stream flowing into this `Sink` completes, and the closing the `InputStream` will cancel the inflow of this `Sink`. +--------------------------------------------------------------- + ### fromInputStream Create a source that wraps an `InputStream`. Takes a function that produces an `InputStream`, when the source is @@ -462,6 +572,8 @@ to handle multiple invocations. The `InputStream` will be closed when the `Source` is canceled from its downstream, and reaching the end of the `InputStream` will complete the `Source`. +--------------------------------------------------------------- + ### asOutputStream Create a source that materializes into an `OutputStream`. When bytes are written to the `OutputStream` they @@ -470,6 +582,8 @@ are emitted from the source. The `OutputStream` will no longer be writable when the `Source` has been canceled from its downstream, and closing the `OutputStream` will complete the `Source`. +--------------------------------------------------------------- + ### asJavaStream Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink. @@ -480,11 +594,15 @@ The Java 8 a `Stream` will be ended when the stream flowing into this `Sink` com Be aware that Java 8 `Stream` blocks current thread while waiting on next element from downstream. +--------------------------------------------------------------- + ### fromJavaStream Create a source that wraps Java 8 `Stream`. `Source` uses a stream iterator to get all its elements and send them downstream on demand. +--------------------------------------------------------------- + ### javaCollector Create a sink which materializes into a `CompletionStage` which will be completed with a result of the Java 8 `Collector` @@ -496,6 +614,8 @@ The `Collector` can also do reduction at the end. Reduction processing is perfor Note that a flow can be materialized multiple times, so the function producing the `Collector` must be able to handle multiple invocations. +--------------------------------------------------------------- + ### javaCollectorParallelUnordered Create a sink which materializes into a `CompletionStage` which will be completed with a result of the Java 8 Collector @@ -507,19 +627,31 @@ The `Collector` can also do reduction at the end. Reduction processing is perfor Note that a flow can be materialized multiple times, so the function producing the `Collector` must be able to handle multiple invocations. +--------------------------------------------------------------- + +
+ ## File IO Sinks and Sources Sources and sinks for reading and writing files can be found on `FileIO`. +--------------------------------------------------------------- + ### fromPath Emit the contents of a file, as `ByteString` s, materializes into a `CompletionStage` which will be completed with a `IOResult` upon reaching the end of the file or if there is a failure. +--------------------------------------------------------------- + ### toPath Create a sink which will write incoming `ByteString` s to a given file path. +--------------------------------------------------------------- + +
+ ## Flow stages All flows by default backpressure if the computation they encapsulate is not fast enough to keep up with the rate of @@ -533,14 +665,18 @@ For in-band error handling of normal errors (dropping elements if a map fails fo supervision support, or explicitly wrap your element types in a proper container that can express error or success states. +
+ ## Simple processing stages These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a single input (e.g. `mapConcat') or consume multiple elements before emitting one output (e.g. `filter`). However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the -rate is affected. This is in contrast with [detached stages](#detached-stages-overview) which can change their processing behavior +rate is affected. This is in contrast with [detached stages](#backpressure-aware-stages) which can change their processing behavior depending on being backpressured by downstream or not. +--------------------------------------------------------------- + ### map Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. @@ -551,6 +687,8 @@ Transform each element in the stream by calling a mapping function with it and p **completes** when upstream completes +--------------------------------------------------------------- + ### mapConcat Transform each element into zero or more elements that are individually passed downstream. @@ -561,6 +699,8 @@ Transform each element into zero or more elements that are individually passed d **completes** when upstream completes and all remaining elements has been emitted +--------------------------------------------------------------- + ### statefulMapConcat Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that @@ -572,6 +712,8 @@ the transformation function is created from a factory for every materialization **completes** when upstream completes and all remaining elements has been emitted +--------------------------------------------------------------- + ### filter Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if @@ -583,6 +725,8 @@ it returns false the element is discarded. **completes** when upstream completes +--------------------------------------------------------------- + ### filterNot Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if @@ -594,6 +738,8 @@ it returns true the element is discarded. **completes** when upstream completes +--------------------------------------------------------------- + ### collect Apply a partial function to each incoming element, if the partial function is defined for a value the returned @@ -605,6 +751,8 @@ value is passed downstream. Can often replace `filter` followed by `map` to achi **completes** when upstream completes +--------------------------------------------------------------- + ### grouped Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of @@ -616,6 +764,8 @@ elements downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### sliding Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream. @@ -628,6 +778,8 @@ Note: the last window might be smaller than the requested size due to end of str **completes** when upstream completes +--------------------------------------------------------------- + ### scan Emit its current value which starts at `zero` and then applies the current and next value to the given function @@ -642,6 +794,8 @@ the second element is required from downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### scanAsync Just like `scan` but receiving a function that results in a `CompletionStage` to the next value. @@ -652,6 +806,8 @@ Just like `scan` but receiving a function that results in a `CompletionStage` to **completes** when upstream completes and the last `CompletionStage` is resolved +--------------------------------------------------------------- + ### fold Start with current value `zero` and then apply the current and next value to the given function, when upstream @@ -663,6 +819,8 @@ complete the current value is emitted downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### foldAsync Just like `fold` but receiving a function that results in a `CompletionStage` to the next value. @@ -673,6 +831,8 @@ Just like `fold` but receiving a function that results in a `CompletionStage` to **completes** when upstream completes and the last `CompletionStage` is resolved +--------------------------------------------------------------- + ### reduce Start with first element and then apply the current and next value to the given function, when upstream @@ -684,6 +844,8 @@ complete the current value is emitted downstream. Similar to `fold`. **completes** when upstream completes +--------------------------------------------------------------- + ### drop Drop `n` elements and then pass any subsequent element downstream. @@ -694,6 +856,8 @@ Drop `n` elements and then pass any subsequent element downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### take Pass `n` incoming elements downstream and then complete @@ -704,6 +868,8 @@ Pass `n` incoming elements downstream and then complete **completes** when the defined number of elements has been taken or upstream completes +--------------------------------------------------------------- + ### takeWhile Pass elements downstream as long as a predicate function return true for the element include the element @@ -715,6 +881,8 @@ when the predicate first return false and then complete. **completes** when predicate returned false or upstream completes +--------------------------------------------------------------- + ### dropWhile Drop elements as long as a predicate function return true for the element @@ -725,6 +893,8 @@ Drop elements as long as a predicate function return true for the element **completes** when upstream completes +--------------------------------------------------------------- + ### recover Allow sending of one last element downstream when a failure has happened upstream. @@ -737,6 +907,8 @@ Throwing an exception inside `recover` _will_ be logged on ERROR level automatic **completes** when upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### recoverWith Allow switching to alternative Source when a failure has happened upstream. @@ -749,6 +921,8 @@ Throwing an exception inside `recoverWith` _will_ be logged on ERROR level autom **completes** upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### recoverWithRetries RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after @@ -765,6 +939,8 @@ This stage can recover the failure signal, but not the skipped elements, which w **completes** when upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### mapError While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging @@ -780,6 +956,8 @@ Similarily to `recover` throwing an exception inside `mapError` _will_ be logged **backpressures** when downstream backpressures **completes** when upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### detach Detach upstream demand from downstream demand without detaching the stream rates. @@ -790,6 +968,8 @@ Detach upstream demand from downstream demand without detaching the stream rates **completes** when upstream completes +--------------------------------------------------------------- + ### throttle Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where @@ -801,6 +981,8 @@ a function has to be provided to calculate the individual cost of each element. **completes** when upstream completes +--------------------------------------------------------------- + ### intersperse Intersperse stream with provided element similar to `List.mkString`. It can inject start and end marker elements to stream. @@ -811,6 +993,8 @@ Intersperse stream with provided element similar to `List.mkString`. It can inje **completes** when upstream completes +--------------------------------------------------------------- + ### limit Limit number of element from upstream to given `max` number. @@ -821,6 +1005,8 @@ Limit number of element from upstream to given `max` number. **completes** when upstream completes and the number of emitted elements has not reached max +--------------------------------------------------------------- + ### limitWeighted Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. @@ -832,6 +1018,8 @@ Evaluated cost of each element defines how many elements will be allowed to trav **completes** when upstream completes and the number of emitted elements has not reached max +--------------------------------------------------------------- + ### log Log elements flowing through the stream as well as completion and erroring. By default element and @@ -844,6 +1032,8 @@ This can be changed by calling `Attributes.createLogLevels(...)` on the given Fl **completes** when upstream completes +--------------------------------------------------------------- + ### recoverWithRetries Switch to alternative Source on flow failure. It stays in effect after a failure has been recovered up to `attempts` @@ -855,8 +1045,14 @@ number of times. Each time a failure is fed into the partial function and a new **completes** when upstream completes or upstream failed with exception partial function can handle +--------------------------------------------------------------- + +
+ ## Flow stages composed of Sinks and Sources +--------------------------------------------------------------- + ### Flow.fromSinkAndSource Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink` @@ -866,6 +1062,8 @@ Note that termination events, like completion and cancelation is not automatical of the such-composed Flow. Use `CoupledTerminationFlow` if you want to couple termination of both of the ends, for example most useful in handling websocket connections. +--------------------------------------------------------------- + ### CoupledTerminationFlow.fromSinkAndSource Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. @@ -876,29 +1074,28 @@ Similar to `Flow.fromSinkAndSource` however couples the termination of these two E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled, however the Sink will also be completed. The table below illustrates the effects in detail: -+=================================================+=============================+=================================+ | Returned Flow | Sink (in) | Source (out) | -+=================================================+=============================+=================================+ +|-------------------------------------------------|-----------------------------|---------------------------------| | cause: upstream (sink-side) receives completion | effect: receives completion | effect: receives cancel | -+-------------------------------------------------+-----------------------------+---------------------------------+ | cause: upstream (sink-side) receives error | effect: receives error | effect: receives cancel | -+-------------------------------------------------+-----------------------------+---------------------------------+ | cause: downstream (source-side) receives cancel | effect: completes | effect: receives cancel | -+-------------------------------------------------+-----------------------------+---------------------------------+ | effect: cancels upstream, completes downstream | effect: completes | cause: signals complete | -+-------------------------------------------------+-----------------------------+---------------------------------+ | effect: cancels upstream, errors downstream | effect: receives error | cause: signals error or throws | -+-------------------------------------------------+-----------------------------+---------------------------------+ | effect: cancels upstream, completes downstream | cause: cancels | effect: receives cancel | -+=================================================+=============================+=================================+ The order in which the *in* and *out* sides receive their respective completion signals is not defined, do not rely on its ordering. +--------------------------------------------------------------- + +
+ ## Asynchronous processing stages These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous operation at the same time (usually handling the completion of a CompletionStage). +--------------------------------------------------------------- + ### mapAsync Pass incoming elements to a function that return a `CompletionStage` result. When the CompletionStage arrives the result is passed @@ -913,6 +1110,8 @@ If a `CompletionStage` fails, the stream also fails (unless a different supervis **completes** when upstream completes and all `CompletionStage` s has been completed and all elements has been emitted +--------------------------------------------------------------- + ### mapAsyncUnordered Like `mapAsync` but `CompletionStage` results are passed downstream as they arrive regardless of the order of the elements @@ -926,10 +1125,16 @@ If a CompletionStage fails, the stream also fails (unless a different supervisio **completes** upstream completes and all CompletionStages has been completed and all elements has been emitted +--------------------------------------------------------------- + +
+ ## Timer driven stages These stages process elements using timers, delaying, dropping or grouping elements for certain time durations. +--------------------------------------------------------------- + ### takeWithin Pass elements downstream within a timeout and then complete. @@ -940,6 +1145,8 @@ Pass elements downstream within a timeout and then complete. **completes** upstream completes or timer fires +--------------------------------------------------------------- + ### dropWithin Drop elements until a timeout has fired @@ -950,6 +1157,8 @@ Drop elements until a timeout has fired **completes** upstream completes +--------------------------------------------------------------- + ### groupedWithin Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, @@ -963,8 +1172,10 @@ but not if no elements has been grouped (i.e: no empty groups), or when limit ha **completes** when upstream completes -groupedWeightedWithin -^^^^^^^^^^^^^ +--------------------------------------------------------------- + +### groupedWeightedWithin + Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group. @@ -976,6 +1187,8 @@ but not if no elements has been grouped (i.e: no empty groups), or when weight l **completes** when upstream completes +--------------------------------------------------------------- + ### initialDelay Delay the initial element by a user specified duration from stream materialization. @@ -986,6 +1199,8 @@ Delay the initial element by a user specified duration from stream materializati **completes** when upstream completes +--------------------------------------------------------------- + ### delay Delay every element passed through with a specific duration. @@ -996,11 +1211,17 @@ Delay every element passed through with a specific duration. **completes** when upstream completes and buffered elements has been drained - + +--------------------------------------------------------------- + +
+ ## Backpressure aware stages These stages are aware of the backpressure provided by their downstreams and able to adapt their behavior to that signal. +--------------------------------------------------------------- + ### conflate Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as @@ -1013,6 +1234,8 @@ average of incoming numbers, if aggregation should lead to a different type `con **completes** when upstream completes +--------------------------------------------------------------- + ### conflateWithSeed Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there @@ -1025,6 +1248,8 @@ transform it to the summary type. **completes** when upstream completes +--------------------------------------------------------------- + ### batch Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there @@ -1043,6 +1268,8 @@ aggregated to the batched value. **completes** when upstream completes and a "possibly pending" element was drained +--------------------------------------------------------------- + ### batchWeighted Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there @@ -1059,6 +1286,8 @@ aggregated to the batched value. **completes** upstream completes and a "possibly pending" element was drained +--------------------------------------------------------------- + ### expand Allow for a faster downstream by expanding the last incoming element to an `Iterator`. For example @@ -1070,6 +1299,8 @@ Allow for a faster downstream by expanding the last incoming element to an `Iter **completes** when upstream completes +--------------------------------------------------------------- + ### buffer (Backpressure) Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full backpressure @@ -1081,6 +1312,8 @@ is applied. **completes** when upstream completes and buffered elements has been drained +--------------------------------------------------------------- + ### buffer (Drop) Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full elements are @@ -1097,6 +1330,8 @@ dropped according to the specified `OverflowStrategy`: **completes** upstream completes and buffered elements has been drained +--------------------------------------------------------------- + ### buffer (Fail) Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full the stage fails @@ -1108,11 +1343,17 @@ the flow with a `BufferOverflowException`. **completes** when upstream completes and buffered elements has been drained +--------------------------------------------------------------- + +
+ ## Nesting and flattening stages These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains nested streams and turn them into a stream of elements instead (flattening). +--------------------------------------------------------------- + ### prefixAndTail Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) @@ -1124,6 +1365,8 @@ and returns a pair containing a strict sequence of the taken element and a strea **completes** when prefix elements has been consumed and substream has been consumed +--------------------------------------------------------------- + ### groupBy Demultiplex the incoming stream into separate output streams. @@ -1133,6 +1376,8 @@ there is an element pending for a group whose substream backpressures **completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +--------------------------------------------------------------- + ### splitWhen Split off elements into a new substream whenever a predicate function return `true`. @@ -1143,6 +1388,8 @@ Split off elements into a new substream whenever a predicate function return `tr **completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +--------------------------------------------------------------- + ### splitAfter End the current substream whenever a predicate returns `true`, starting a new substream for the next element. @@ -1153,6 +1400,8 @@ End the current substream whenever a predicate returns `true`, starting a new su **completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +--------------------------------------------------------------- + ### flatMapConcat Transform each input element into a `Source` whose elements are then flattened into the output stream through @@ -1164,6 +1413,8 @@ concatenation. This means each source is fully consumed before consumption of th **completes** when upstream completes and all consumed substreams complete +--------------------------------------------------------------- + ### flatMapMerge Transform each input element into a `Source` whose elements are then flattened into the output stream through @@ -1175,10 +1426,16 @@ merging. The maximum number of merged sources has to be specified. **completes** when upstream completes and all consumed substreams complete +--------------------------------------------------------------- + +
+ ## Time aware stages Those stages operate taking time into consideration. +--------------------------------------------------------------- + ### initialTimeout If the first element has not passed through this stage before the provided timeout, the stream is failed @@ -1192,6 +1449,8 @@ with a `TimeoutException`. **cancels** when downstream cancels +--------------------------------------------------------------- + ### completionTimeout If the completion of the stream does not happen until the provided timeout, the stream is failed @@ -1205,6 +1464,8 @@ with a `TimeoutException`. **cancels** when downstream cancels +--------------------------------------------------------------- + ### idleTimeout If the time between two processed elements exceeds the provided timeout, the stream is failed @@ -1219,6 +1480,8 @@ check is one period (equals to timeout value). **cancels** when downstream cancels +--------------------------------------------------------------- + ### backpressureTimeout If the time between the emission of an element and the following downstream demand exceeds the provided timeout, @@ -1233,6 +1496,8 @@ check is one period (equals to timeout value). **cancels** when downstream cancels +--------------------------------------------------------------- + ### keepAlive Injects additional (configured) elements if upstream does not emit for a configured amount of time. @@ -1245,6 +1510,8 @@ Injects additional (configured) elements if upstream does not emit for a configu **cancels** when downstream cancels +--------------------------------------------------------------- + ### initialDelay Delays the initial element by the specified duration. @@ -1257,11 +1524,17 @@ Delays the initial element by the specified duration. **cancels** when downstream cancels +--------------------------------------------------------------- + +
+ ## Fan-in stages These stages take multiple streams as their input and provide a single output combining the elements from all of the inputs in different ways. +--------------------------------------------------------------- + ### merge Merge multiple sources. Picks elements randomly if all sources has elements ready. @@ -1272,6 +1545,8 @@ Merge multiple sources. Picks elements randomly if all sources has elements read **completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) +--------------------------------------------------------------- + ### mergeSorted Merge multiple sources. Waits for one element to be ready from each input stream and emits the @@ -1283,6 +1558,8 @@ smallest element. **completes** when all upstreams complete +--------------------------------------------------------------- + ### mergePreferred Merge multiple sources. Prefer one source if all sources has elements ready. @@ -1293,6 +1570,8 @@ Merge multiple sources. Prefer one source if all sources has elements ready. **completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) +--------------------------------------------------------------- + ### zip Combines elements from each of multiple sources into *Pair* s and passes the pairs downstream. @@ -1303,6 +1582,8 @@ Combines elements from each of multiple sources into *Pair* s and passes the pai **completes** when any upstream completes +--------------------------------------------------------------- + ### zipWith Combines elements from multiple sources through a `combine` function and passes the @@ -1314,6 +1595,8 @@ returned value downstream. **completes** when any upstream completes +--------------------------------------------------------------- + ### zipWithIndex Zips elements of current flow with its indices. @@ -1324,6 +1607,8 @@ Zips elements of current flow with its indices. **completes** when upstream completes +--------------------------------------------------------------- + ### concat After completion of the original upstream the elements of the given source will be emitted. @@ -1334,6 +1619,8 @@ After completion of the original upstream the elements of the given source will **completes** when all upstreams complete +--------------------------------------------------------------- + ### prepend Prepends the given source to the flow, consuming it until completion before the original source is consumed. @@ -1346,6 +1633,8 @@ If materialized values needs to be collected `prependMat` is available. **completes** when all upstreams complete +--------------------------------------------------------------- + ### orElse If the primary source completes without emitting any elements, the elements from the secondary source @@ -1364,6 +1653,8 @@ is available from the second stream **completes** the primary stream completes after emitting at least one element, when the primary stream completes without emitting and the secondary stream already has completed or when the secondary stream completes +--------------------------------------------------------------- + ### interleave Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one @@ -1375,11 +1666,17 @@ source completes the rest of the other stream will be emitted. **completes** when both upstreams have completed +--------------------------------------------------------------- + +
+ ## Fan-out stages These have one input and multiple outputs. They might route the elements between different outputs, or emit elements on multiple outputs at the same time. +--------------------------------------------------------------- + ### unzip Takes a stream of two element tuples and unzips the two elements ino two different downstreams. @@ -1390,6 +1687,8 @@ Takes a stream of two element tuples and unzips the two elements ino two differe **completes** when upstream completes +--------------------------------------------------------------- + ### unzipWith Splits each element of input into multiple downstreams using a function @@ -1400,6 +1699,8 @@ Splits each element of input into multiple downstreams using a function **completes** when upstream completes +--------------------------------------------------------------- + ### broadcast Emit each incoming element each of `n` outputs. @@ -1410,6 +1711,8 @@ Emit each incoming element each of `n` outputs. **completes** when upstream completes +--------------------------------------------------------------- + ### balance Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. @@ -1420,6 +1723,8 @@ Fan-out the stream to several streams. Each upstream element is emitted to the f **completes** when upstream completes +--------------------------------------------------------------- + ### partition Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the @@ -1431,8 +1736,14 @@ partitioner function applied to the element. **completes** when upstream completes and no output is pending +--------------------------------------------------------------- + +
+ ## Watching status stages +--------------------------------------------------------------- + ### watchTermination Materializes to a `CompletionStage` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. @@ -1444,6 +1755,8 @@ The stage otherwise passes through elements unchanged. **completes** when upstream completes +--------------------------------------------------------------- + ### monitor Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage. The stage otherwise @@ -1454,4 +1767,6 @@ event, and may therefore affect performance. **backpressures** when downstream **backpressures** -**completes** when upstream completes \ No newline at end of file +**completes** when upstream completes + +--------------------------------------------------------------- diff --git a/akka-docs/src/main/paradox/java/stream/stream-composition.md b/akka-docs/src/main/paradox/java/stream/stream-composition.md index 8c785f72a4..275df25a50 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-composition.md +++ b/akka-docs/src/main/paradox/java/stream/stream-composition.md @@ -11,12 +11,8 @@ be processed arrive and leave the stage. In this view, a `Source` is nothing els output port, or, a `BidiFlow` is a "box" with exactly two input and two output ports. In the figure below we illustrate the most common used stages viewed as "boxes". -| - ![compose_shapes.png](../../images/compose_shapes.png) -| - The *linear* stages are `Source`, `Sink` and `Flow`, as these can be used to compose strict chains of processing stages. Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build @@ -35,12 +31,8 @@ to interact with. One good example is the `Http` server component, which is enco The following figure demonstrates various composite stages, that contain various other type of stages internally, but hiding them behind a *shape* that looks like a `Source`, `Flow`, etc. -| - ![compose_composites.png](../../images/compose_composites.png) -| - One interesting example above is a `Flow` which is composed of a disconnected `Sink` and `Source`. This can be achieved by using the `fromSinkAndSource()` constructor method on `Flow` which takes the two parts as parameters. @@ -62,12 +54,8 @@ These mechanics allow arbitrary nesting of modules. For example the following fi that is built from a composite `Source` and a composite `Sink` (which in turn contains a composite `Flow`). -| - ![compose_nested_flow.png](../../images/compose_nested_flow.png) -| - The above diagram contains one more shape that we have not seen yet, which is called `RunnableGraph`. It turns out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is *closed*. This is what the `RunnableGraph` class represents. This is the shape that a `Materializer` can take @@ -93,12 +81,8 @@ Once we have hidden the internals of our components, they act like any other bui we hide some of the internals of our composites, the result looks just like if any other predefine component has been used: -| - ![compose_nested_flow_opaque.png](../../images/compose_nested_flow_opaque.png) -| - If we look at usage of built-in components, and our custom components, there is no difference in usage as the code snippet below demonstrates. @@ -115,12 +99,8 @@ operate on are uniform across all DSLs and fit together nicely. As a first example, let's look at a more complex layout: -| - ![compose_graph.png](../../images/compose_graph.png) -| - The diagram shows a `RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages, directed and non-directed cycles. The `runnable()` method of the `GraphDSL` factory object allows the creation of a @@ -133,19 +113,13 @@ It is possible to refer to the ports, so another version might look like this: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph-alt } -| - Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL. The way to do it is to use the `create()` method on `GraphDSL` factory. If we remove the sources and sinks from the previous example, what remains is a partial graph: -| - ![compose_graph_partial.png](../../images/compose_graph_partial.png) -| - We can recreate a similar graph in code, using the DSL in a similar way than before: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-graph } @@ -159,12 +133,8 @@ matching built-in ones. The resulting graph is already a properly wrapped module, so there is no need to call *named()* to encapsulate the graph, but it is a good practice to give names to modules to help debugging. -| - ![compose_graph_shape.png](../../images/compose_graph_shape.png) -| - Since our partial graph has the right shape, it can be already used in the simpler, linear DSL: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-use } @@ -175,12 +145,8 @@ has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are si For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods. To demonstrate this, we will create the following graph: -| - ![compose_graph_flow.png](../../images/compose_graph_flow.png) -| - The code version of the above closed graph might look like this: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-flow-dsl } @@ -236,12 +202,8 @@ graphically demonstrates what is happening. The propagation of the individual materialized values from the enclosed modules towards the top will look like this: -| - ![compose_mat.png](../../images/compose_mat.png) -| - To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a materialized type of `CompletableFuture>>`. By using the combiner function `Keep.left()`, the resulting materialized type is of the nested module (indicated by the color *red* on the diagram): @@ -297,11 +259,7 @@ the same attribute explicitly set. `nestedSource` gets the default attributes fr on the other hand has this attribute set, so it will be used by all nested modules. `nestedFlow` will inherit from `nestedSink` except the `map` stage which has again an explicitly provided attribute overriding the inherited one. -| - ![compose_attributes.png](../../images/compose_attributes.png) -| - This diagram illustrates the inheritance process for the example code (representing the materializer default attributes as the color *red*, the attributes set on `nestedSink` as *blue* and the attributes set on `nestedFlow` as *green*). diff --git a/akka-docs/src/main/paradox/java/stream/stream-cookbook.md b/akka-docs/src/main/paradox/java/stream/stream-cookbook.md index b10c9ece41..ae3808c1a5 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-cookbook.md +++ b/akka-docs/src/main/paradox/java/stream/stream-cookbook.md @@ -156,7 +156,7 @@ Sometimes we want to map elements into multiple groups simultaneously. To achieve the desired result, we attack the problem in two steps: * first, using a function `topicMapper` that gives a list of topics (groups) a message belongs to, we transform our -stream of `Message` to a stream of :class:`Pair`` where for each topic the message belongs to a separate pair +stream of `Message` to a stream of `Pair` where for each topic the message belongs to a separate pair will be emitted. This is achieved by using `mapConcat` * Then we take this new stream of message topic pairs (containing a separate pair for each topic a given message belongs to) and feed it into groupBy, using the topic as the group key. @@ -374,4 +374,4 @@ but only if this does not interfere with normal traffic. There is a built-in operation that allows to do this directly: -@@snip [RecipeKeepAlive.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java) { #inject-keepalive } \ No newline at end of file +@@snip [RecipeKeepAlive.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java) { #inject-keepalive } diff --git a/akka-docs/src/main/paradox/java/stream/stream-customize.md b/akka-docs/src/main/paradox/java/stream/stream-customize.md index 271ca35c9a..78df1752b1 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-customize.md +++ b/akka-docs/src/main/paradox/java/stream/stream-customize.md @@ -92,12 +92,8 @@ the initial state while orange indicates the end state. If an operation is not l to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen in that state. -| - ![outport_transitions.png](../../images/outport_transitions.png) -| - The following operations are available for *input* ports: * `pull(in)` requests a new element from an input port. This is only possible after the port has been pushed by upstream. @@ -127,12 +123,8 @@ the initial state while orange indicates the end state. If an operation is not l to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen in that state. -| - ![inport_transitions.png](../../images/inport_transitions.png) -| - Finally, there are two methods available for convenience to complete the stage and all of its ports: * `completeStage()` is equivalent to closing all output ports and cancelling all input ports. @@ -144,7 +136,7 @@ of actions which will greatly simplify some use cases at the cost of some extra between the two APIs could be described as that the first one is signal driven from the outside, while this API is more active and drives its surroundings. -The operations of this part of the :class:`GraphStage` API are: +The operations of this part of the `GraphStage` API are: * `emit(out, elem)` and `emitMultiple(out, Iterable(elem1, elem2))` replaces the `OutHandler` with a handler that emits one or more elements when there is demand, and then reinstalls the current handlers @@ -158,7 +150,7 @@ The following methods are safe to call after invoking `emit` and `read` (and wil operation when those are done): `complete(out)`, `completeStage()`, `emit`, `emitMultiple`, `abortEmitting()` and `abortReading()` -An example of how this API simplifies a stage can be found below in the second version of the :class:`Duplicator`. +An example of how this API simplifies a stage can be found below in the second version of the `Duplicator`. ### Custom linear processing stages using GraphStage @@ -169,20 +161,12 @@ Such a stage can be illustrated as a box with two flows as it is seen in the illustration below. Demand flowing upstream leading to elements flowing downstream. -| - ![graph_stage_conceptual.png](../../images/graph_stage_conceptual.png) -| - To illustrate these concepts we create a small `GraphStage` that implements the `map` transformation. -| - ![graph_stage_map.png](../../images/graph_stage_map.png) -| - Map calls `push(out)` from the `onPush()` handler and it also calls `pull()` from the `onPull` handler resulting in the conceptual wiring above, and fully expressed in code below: @@ -194,12 +178,8 @@ demand is passed along upstream elements passed on downstream. To demonstrate a many-to-one stage we will implement filter. The conceptual wiring of `Filter` looks like this: -| - ![graph_stage_filter.png](../../images/graph_stage_filter.png) -| - As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map example by adding a conditional in the `onPush` handler and decide between a `pull(in)` or `push(out)` call @@ -210,12 +190,8 @@ example by adding a conditional in the `onPush` handler and decide between a `pu To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: -| - ![graph_stage_duplicate.png](../../images/graph_stage_duplicate.png) -| - This is a stage that has state: an option with the last element it has seen indicating if it has duplicated this last element already or not. We must also make sure to emit the extra element if the upstream completes. @@ -238,12 +214,8 @@ reinstate the original handlers: Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually would correspond to the following structure: -| - ![graph_stage_chain.png](../../images/graph_stage_chain.png) -| - In code this is only a few lines, using the `via` use our custom stages in a stream: @@snip [GraphStageDocTest.java]($code$/java/jdocs/stream/GraphStageDocTest.java) { #graph-stage-chain } @@ -251,12 +223,8 @@ In code this is only a few lines, using the `via` use our custom stages in a str If we attempt to draw the sequence of events, it shows that there is one "event token" in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts. -| - ![graph_stage_tracks_1.png](../../images/graph_stage_tracks_1.png) -| - ### Completion Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit @@ -400,21 +368,13 @@ The next diagram illustrates the event sequence for a buffer with capacity of tw the downstream demand is slow to start and the buffer will fill up with upstream elements before any demand is seen from downstream. -| - ![graph_stage_detached_tracks_1.png](../../images/graph_stage_detached_tracks_1.png) -| - Another scenario would be where the demand from downstream starts coming in before any element is pushed into the buffer stage. -| - ![graph_stage_detached_tracks_2.png](../../images/graph_stage_detached_tracks_2.png) -| - The first difference we can notice is that our `Buffer` stage is automatically pulling its upstream on initialization. The buffer has demand for up to two elements without any downstream demand. @@ -452,4 +412,4 @@ Cleaning up resources should be done in `GraphStageLogic.postStop` and not in th callbacks. The reason for this is that when the stage itself completes or is failed there is no signal from the upstreams or the downstreams. Even for stages that do not complete or fail in this manner, this can happen when the `Materializer` is shutdown or the `ActorSystem` is terminated while a stream is still running, what is called an -"abrupt termination". \ No newline at end of file +"abrupt termination". diff --git a/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md b/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md index e06ef9b18e..658e94d395 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md +++ b/akka-docs/src/main/paradox/java/stream/stream-flows-and-basics.md @@ -246,12 +246,8 @@ work on the tasks in parallel. It is important to note that asynchronous boundar flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work by adding information to the flow graph that has been constructed up to this point: -| - ![asyncBoundary.png](../../images/asyncBoundary.png) -| - This means that everything that is inside the red bubble will be executed by one actor and everything outside of it by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all processing stages that have been added since them. @@ -303,4 +299,4 @@ been signalled already – thus the ordering in the case of zipping is defined b If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using `MergePreferred` or `GraphStage` – which gives you full control over how the -merge is performed. \ No newline at end of file +merge is performed. diff --git a/akka-docs/src/main/paradox/java/stream/stream-parallelism.md b/akka-docs/src/main/paradox/java/stream/stream-parallelism.md index fd8d913f70..c3c058754b 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-parallelism.md +++ b/akka-docs/src/main/paradox/java/stream/stream-parallelism.md @@ -41,10 +41,12 @@ be able to operate at full throughput because they will wait on a previous or su pancake example frying the second half of the pancake is usually faster than frying the first half, `fryingPan2` will not be able to operate at full capacity [1]. -note:: -: Asynchronous stream processing stages have internal buffers to make communication between them more efficient. +@@@ note + +Asynchronous stream processing stages have internal buffers to make communication between them more efficient. For more details about the behavior of these and how to add additional buffers refer to @ref:[Buffers and working with rate](stream-rate.md). +@@@ ## Parallel processing @@ -102,4 +104,4 @@ at the entry point of the pipeline. This only matters however if the processing deviation. > [1] Roland's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan -to be slightly lower than the first in order to achieve a more homogeneous result. \ No newline at end of file +to be slightly lower than the first in order to achieve a more homogeneous result. diff --git a/akka-docs/src/main/paradox/scala/actors.md b/akka-docs/src/main/paradox/scala/actors.md index b65a3cbb5c..3e095b23f2 100644 --- a/akka-docs/src/main/paradox/scala/actors.md +++ b/akka-docs/src/main/paradox/scala/actors.md @@ -437,7 +437,6 @@ result: It is always preferable to communicate with other Actors using their ActorRef instead of relying upon ActorSelection. Exceptions are -> * sending messages using the @ref:[At-Least-Once Delivery](persistence.md#at-least-once-delivery) facility * initiating first contact with a remote system @@ -491,12 +490,15 @@ An example demonstrating actor look-up is given in @ref:[Remoting Sample](remoti ## Messages and immutability -**IMPORTANT**: Messages can be any kind of object but have to be -immutable. Scala can’t enforce immutability (yet) so this has to be by -convention. Primitives like String, Int, Boolean are always immutable. Apart -from these the recommended approach is to use Scala case classes which are -immutable (if you don’t explicitly expose the state) and works great with -pattern matching at the receiver side. +@@@ warning { title=IMPORTANT } + +Messages can be any kind of object but have to be immutable. Scala can’t enforce +immutability (yet) so this has to be by convention. Primitives like String, Int, +Boolean are always immutable. Apart from these the recommended approach is to +use Scala case classes which are immutable (if you don’t explicitly expose the +state) and works great with pattern matching at the receiver side. + +@@@ Here is an example: @@ -581,11 +583,11 @@ taken from one of the following locations in order of precedence: 1. explicitly given timeout as in: -@@snip [ActorDocSpec.scala]($code$/scala/docs/actor/ActorDocSpec.scala) { #using-explicit-timeout } + @@snip [ActorDocSpec.scala]($code$/scala/docs/actor/ActorDocSpec.scala) { #using-explicit-timeout } 2. implicit argument of type `akka.util.Timeout`, e.g. -@@snip [ActorDocSpec.scala]($code$/scala/docs/actor/ActorDocSpec.scala) { #using-implicit-timeout } + @@snip [ActorDocSpec.scala]($code$/scala/docs/actor/ActorDocSpec.scala) { #using-implicit-timeout } See @ref:[Futures](futures.md) for more information on how to await or query a future. @@ -1053,4 +1055,4 @@ the potential issues is that messages might be lost when sent to remote actors. an uninitialized state might lead to the condition that it receives a user message before the initialization has been done. -@@@ \ No newline at end of file +@@@ diff --git a/akka-docs/src/main/paradox/scala/additional/faq.md b/akka-docs/src/main/paradox/scala/additional/faq.md index f0301b279f..fa0410a1fe 100644 --- a/akka-docs/src/main/paradox/scala/additional/faq.md +++ b/akka-docs/src/main/paradox/scala/additional/faq.md @@ -115,11 +115,8 @@ akka.protocol://system@host:1234/user/my/actor/hierarchy/path Observe all the parts you need here: - * - `protocol` - is the protocol to be used to communicate with the remote system. - : Most of the cases this is *tcp*. - + * `protocol` is the protocol to be used to communicate with the remote system. + Most of the cases this is *tcp*. * `system` is the remote system’s name (must match exactly, case-sensitive!) * `host` is the remote system’s IP address or DNS name, and it must match that system’s configuration (i.e. *akka.remote.netty.tcp.hostname*) diff --git a/akka-docs/src/main/paradox/scala/agents.md b/akka-docs/src/main/paradox/scala/agents.md index 9c5b7b46bc..b00b4a6b87 100644 --- a/akka-docs/src/main/paradox/scala/agents.md +++ b/akka-docs/src/main/paradox/scala/agents.md @@ -2,9 +2,9 @@ Agents in Akka are inspired by [agents in Clojure](http://clojure.org/agents). -@@@ warning +@@@ warning { title="Deprecation warning" } -**Deprecation warning** - Agents have been deprecated and are scheduled for removal +Agents have been deprecated and are scheduled for removal in the next major version. We have found that their leaky abstraction (they do not work over the network) make them inferior to pure Actors, and in face of the soon inclusion of Akka Typed we see little value in maintaining the current Agents. @@ -120,4 +120,4 @@ that transaction. If you send to an Agent within a transaction then the dispatch to the Agent will be held until that transaction commits, and discarded if the transaction is aborted. Here's an example: -@@snip [AgentDocSpec.scala]($code$/scala/docs/agent/AgentDocSpec.scala) { #transfer-example } \ No newline at end of file +@@snip [AgentDocSpec.scala]($code$/scala/docs/agent/AgentDocSpec.scala) { #transfer-example } diff --git a/akka-docs/src/main/paradox/scala/cluster-client.md b/akka-docs/src/main/paradox/scala/cluster-client.md index f6acafefad..57016b0a1c 100644 --- a/akka-docs/src/main/paradox/scala/cluster-client.md +++ b/akka-docs/src/main/paradox/scala/cluster-client.md @@ -42,23 +42,23 @@ The `ClusterClientReceptionist` sends out notifications in relation to having re from a `ClusterClient`. This notification enables the server containing the receptionist to become aware of what clients are connected. -**1. ClusterClient.Send** +1. **ClusterClient.Send** -The message will be delivered to one recipient with a matching path, if any such -exists. If several entries match the path the message will be delivered -to one random destination. The sender() of the message can specify that local -affinity is preferred, i.e. the message is sent to an actor in the same local actor -system as the used receptionist actor, if any such exists, otherwise random to any other -matching entry. + The message will be delivered to one recipient with a matching path, if any such + exists. If several entries match the path the message will be delivered + to one random destination. The sender() of the message can specify that local + affinity is preferred, i.e. the message is sent to an actor in the same local actor + system as the used receptionist actor, if any such exists, otherwise random to any other + matching entry. -**2. ClusterClient.SendToAll** +2. **ClusterClient.SendToAll** -The message will be delivered to all recipients with a matching path. + The message will be delivered to all recipients with a matching path. -**3. ClusterClient.Publish** +3. **ClusterClient.Publish** -The message will be delivered to all recipients Actors that have been registered as subscribers -to the named topic. + The message will be delivered to all recipients Actors that have been registered as subscribers + to the named topic. Response messages from the destination actor are tunneled via the receptionist to avoid inbound connections from other cluster nodes to the client, i.e. @@ -193,4 +193,4 @@ within a configurable interval. This is configured with the `reconnect-timeout`, This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the client will be stopped in that case a monitoring actor can watch it and upon `Terminate` a new set of initial -contacts can be fetched and a new cluster client started. \ No newline at end of file +contacts can be fetched and a new cluster client started. diff --git a/akka-docs/src/main/paradox/scala/cluster-sharding.md b/akka-docs/src/main/paradox/scala/cluster-sharding.md index 99cfbf72ae..728b0dd4fb 100644 --- a/akka-docs/src/main/paradox/scala/cluster-sharding.md +++ b/akka-docs/src/main/paradox/scala/cluster-sharding.md @@ -57,7 +57,6 @@ identifier and the shard identifier from incoming messages. This example illustrates two different ways to define the entity identifier in the messages: -> * The `Get` message includes the identifier itself. * The `EntityEnvelope` holds the identifier, and the actual message that is sent to the entity actor is wrapped in the envelope. @@ -420,4 +419,4 @@ a `ShardRegion.ClusterShardingStats` containing the identifiers of the shards ru of entities that are alive in each shard. The purpose of these messages is testing and monitoring, they are not provided to give access to -directly sending messages to the individual entities. \ No newline at end of file +directly sending messages to the individual entities. diff --git a/akka-docs/src/main/paradox/scala/dispatchers.md b/akka-docs/src/main/paradox/scala/dispatchers.md index d22452f27e..cc1ba2a6b7 100644 --- a/akka-docs/src/main/paradox/scala/dispatchers.md +++ b/akka-docs/src/main/paradox/scala/dispatchers.md @@ -38,7 +38,6 @@ You can read more about parallelism in the JDK's [ForkJoinPool documentation](ht Another example that uses the "thread-pool-executor": -> @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config } @@@ note @@ -75,33 +74,37 @@ where you'd use periods to denote sub-sections, like this: `"foo.bar.my-dispatch There are 3 different types of message dispatchers: - * Dispatcher - * This is an event-based dispatcher that binds a set of Actors to a thread pool. It is the default dispatcher -used if one is not specified. +* **Dispatcher** + + This is an event-based dispatcher that binds a set of Actors to a thread + pool. It is the default dispatcher used if one is not specified. + * Sharability: Unlimited * Mailboxes: Any, creates one per Actor * Use cases: Default dispatcher, Bulkheading - * - Driven by: - `java.util.concurrent.ExecutorService` - : specify using "executor" using "fork-join-executor", -"thread-pool-executor" or the FQCN of -an `akka.dispatcher.ExecutorServiceConfigurator` - - * PinnedDispatcher - * This dispatcher dedicates a unique thread for each actor using it; i.e. each actor will have its own thread pool with only one thread in the pool. + * Driven by: `java.util.concurrent.ExecutorService`. + Specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of + an `akka.dispatcher.ExecutorServiceConfigurator`. + +* **PinnedDispatcher** + + This dispatcher dedicates a unique thread for each actor using it; i.e. + each actor will have its own thread pool with only one thread in the pool. + * Sharability: None * Mailboxes: Any, creates one per Actor * Use cases: Bulkheading - * - Driven by: Any - `akka.dispatch.ThreadPoolExecutorConfigurator` - : by default a "thread-pool-executor" + * Driven by: Any `akka.dispatch.ThreadPoolExecutorConfigurator`. + By default a "thread-pool-executor". - * CallingThreadDispatcher - * This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, -but it can be used from different threads concurrently for the same actor. See @ref:[Scala-CallingThreadDispatcher](testing.md#scala-callingthreaddispatcher) -for details and restrictions. +* **CallingThreadDispatcher** + + This dispatcher runs invocations on the current thread only. This + dispatcher does not create any new threads, but it can be used from + different threads concurrently for the same actor. + See @ref:[Scala-CallingThreadDispatcher](testing.md#scala-callingthreaddispatcher) + for details and restrictions. + * Sharability: Unlimited * Mailboxes: Any, creates one per Actor per Thread (on demand) * Use cases: Testing @@ -136,4 +139,4 @@ and that pool will have only one thread. Note that it's not guaranteed that the *same* thread is used over time, since the core pool timeout is used for `PinnedDispatcher` to keep resource usage down in case of idle actors. To use the same thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the -configuration of the `PinnedDispatcher`. \ No newline at end of file +configuration of the `PinnedDispatcher`. diff --git a/akka-docs/src/main/paradox/scala/fault-tolerance.md b/akka-docs/src/main/paradox/scala/fault-tolerance.md index a85478e3ca..ae0d49e9e6 100644 --- a/akka-docs/src/main/paradox/scala/fault-tolerance.md +++ b/akka-docs/src/main/paradox/scala/fault-tolerance.md @@ -16,7 +16,7 @@ Read the following source code. The inlined comments explain the different piece the fault handling and why they are added. It is also highly recommended to run this sample as it is easy to follow the log output to understand what is happening at runtime. -@@toc +@@toc { depth=1 } @@@ index @@ -164,4 +164,4 @@ different supervisor which overrides this behavior. With this parent, the child survives the escalated restart, as demonstrated in the last test: -@@snip [FaultHandlingDocSpec.scala]($code$/scala/docs/actor/FaultHandlingDocSpec.scala) { #escalate-restart } \ No newline at end of file +@@snip [FaultHandlingDocSpec.scala]($code$/scala/docs/actor/FaultHandlingDocSpec.scala) { #escalate-restart } diff --git a/akka-docs/src/main/paradox/scala/fsm.md b/akka-docs/src/main/paradox/scala/fsm.md index 1f675f4b12..569de4e44a 100644 --- a/akka-docs/src/main/paradox/scala/fsm.md +++ b/akka-docs/src/main/paradox/scala/fsm.md @@ -7,13 +7,11 @@ is best described in the [Erlang design principles](http://www.erlang.org/docume A FSM can be described as a set of relations of the form: -> -**State(S) x Event(E) -> Actions (A), State(S')** +> **State(S) x Event(E) -> Actions (A), State(S')** These relations are interpreted as meaning: -> -*If we are in state S and the event E occurs, we should perform the actions A +> *If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S'.* ## A Simple Example @@ -50,7 +48,6 @@ The basic strategy is to declare the actor, mixing in the `FSM` trait and specifying the possible states and data values as type parameters. Within the body of the actor a DSL is used for declaring the state machine: -> * `startWith` defines the initial state and initial data * then there is one `when() { ... }` declaration per state to be handled (could potentially be multiple ones, the passed @@ -131,7 +128,6 @@ the FSM logic. The `FSM` trait takes two type parameters: -> 1. the supertype of all state names, usually a sealed trait with case objects extending it, 2. the type of the state data which are tracked by the `FSM` module @@ -150,8 +146,9 @@ internal state explicit in a few well-known places. A state is defined by one or more invocations of the method -> -`when([, stateTimeout = ])(stateFunction)`. +``` +when([, stateTimeout = ])(stateFunction) +``` The given name must be an object which is type-compatible with the first type parameter given to the `FSM` trait. This object is used as a hash key, @@ -194,8 +191,9 @@ it still needs to be declared like this: Each FSM needs a starting point, which is declared using -> -`startWith(state, data[, timeout])` +``` +startWith(state, data[, timeout]) +``` The optionally given timeout argument overrides any specification given for the desired initial state. If you want to cancel a default timeout, use @@ -275,8 +273,9 @@ Up to this point, the FSM DSL has been centered on states and events. The dual view is to describe it as a series of transitions. This is enabled by the method -> -`onTransition(handler)` +``` +onTransition(handler) +``` which associates actions with a transition instead of with a state and event. The handler is a partial function which takes a pair of states as input; no @@ -354,8 +353,9 @@ be used several times, e.g. when applying the same transformation to several Besides state timeouts, FSM manages timers identified by `String` names. You may set a timer using -> -`setTimer(name, msg, interval, repeat)` +``` +setTimer(name, msg, interval, repeat) +``` where `msg` is the message object which will be sent after the duration `interval` has elapsed. If `repeat` is `true`, then the timer is @@ -365,15 +365,17 @@ adding the new timer. Timers may be canceled using -> -`cancelTimer(name)` +``` +cancelTimer(name) +``` which is guaranteed to work immediately, meaning that the scheduled message will not be processed after this call even if the timer already fired and queued it. The status of any timer may be inquired with -> -`isTimerActive(name)` +``` +isTimerActive(name) +``` These named timers complement state timeouts because they are not affected by intervening reception of other messages. @@ -382,8 +384,9 @@ intervening reception of other messages. The FSM is stopped by specifying the result state as -> -`stop([reason[, data]])` +``` +stop([reason[, data]]) +``` The reason must be one of `Normal` (which is the default), `Shutdown` or `Failure(reason)`, and the second argument may be given to change the @@ -440,7 +443,6 @@ event trace by `LoggingFSM` instances: This FSM will log at DEBUG level: -> * all processed events, including `StateTimeout` and scheduled timer messages * every setting and cancellation of named timers @@ -478,4 +480,4 @@ zero. A bigger FSM example contrasted with Actor's `become`/`unbecome` can be downloaded as a ready to run @extref[Akka FSM sample](ecs:akka-samples-fsm-scala) together with a tutorial. The source code of this sample can be found in the -@extref[Akka Samples Repository](samples:akka-sample-fsm-scala). \ No newline at end of file +@extref[Akka Samples Repository](samples:akka-sample-fsm-scala). diff --git a/akka-docs/src/main/paradox/scala/general/actor-systems.md b/akka-docs/src/main/paradox/scala/general/actor-systems.md index b0da25f6b7..e43af69192 100644 --- a/akka-docs/src/main/paradox/scala/general/actor-systems.md +++ b/akka-docs/src/main/paradox/scala/general/actor-systems.md @@ -166,4 +166,4 @@ actor, which in turn will recursively stop all its child actors, the system guardian. If you want to execute some operations while terminating `ActorSystem`, -look at @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown). \ No newline at end of file +look at @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown). diff --git a/akka-docs/src/main/paradox/scala/general/configuration.md b/akka-docs/src/main/paradox/scala/general/configuration.md index ca7d9a3d7d..67949fe7b0 100644 --- a/akka-docs/src/main/paradox/scala/general/configuration.md +++ b/akka-docs/src/main/paradox/scala/general/configuration.md @@ -169,7 +169,7 @@ environment independent settings and then override some settings for specific en Specifying system property with `-Dconfig.resource=/dev.conf` will load the `dev.conf` file, which includes the `application.conf` -dev.conf: +### dev.conf ``` include "application" @@ -211,7 +211,8 @@ res1: java.lang.String = # String: 1 "b" : 12 } -}``` +} +``` The comments preceding every item give detailed information about the origin of the setting (file & line number) plus possible comments which were present, @@ -483,4 +484,4 @@ Each Akka module has a reference configuration file with the default values. ### akka-distributed-data -@@snip [reference.conf]($akka$/akka-distributed-data/src/main/resources/reference.conf) \ No newline at end of file +@@snip [reference.conf]($akka$/akka-distributed-data/src/main/resources/reference.conf) diff --git a/akka-docs/src/main/paradox/scala/general/terminology.md b/akka-docs/src/main/paradox/scala/general/terminology.md index 9a16c816bc..46c8a3028c 100644 --- a/akka-docs/src/main/paradox/scala/general/terminology.md +++ b/akka-docs/src/main/paradox/scala/general/terminology.md @@ -108,4 +108,4 @@ is the only one trying, the operation will succeed. > * The Art of Multiprocessor Programming, M. Herlihy and N Shavit, 2008. ISBN 978-0123705914 - * Java Concurrency in Practice, B. Goetz, T. Peierls, J. Bloch, J. Bowbeer, D. Holmes and D. Lea, 2006. ISBN 978-0321349606 \ No newline at end of file + * Java Concurrency in Practice, B. Goetz, T. Peierls, J. Bloch, J. Bowbeer, D. Holmes and D. Lea, 2006. ISBN 978-0321349606 diff --git a/akka-docs/src/main/paradox/scala/guide/quickstart.md b/akka-docs/src/main/paradox/scala/guide/quickstart.md index c46482feb1..39088293ca 100644 --- a/akka-docs/src/main/paradox/scala/guide/quickstart.md +++ b/akka-docs/src/main/paradox/scala/guide/quickstart.md @@ -21,6 +21,6 @@ The easiest way is to use the @scala[`akka-scala-seed` in [Get started with Ligh 1. Unzip the zip file and rename the directory to your preference. -1. Read the @scala[[Guide](http://developer.lightbend.com/guides/hello-akka/)] @java[[Guide](http://developer.lightbend.com/guides/hello-akka/)] for this example project. It describes the example, the basic concepts of actors and how to run the "Hello World" application. ** **FIXME the Guide is in progress [here](https://github.com/akka/akka-scala-seed.g8/pull/4/files#diff-179702d743b88d85b3971cba561e6ace)**. +1. Read the @scala[[Guide](http://developer.lightbend.com/guides/hello-akka/)] @java[[Guide](http://developer.lightbend.com/guides/hello-akka/)] for this example project. It describes the example, the basic concepts of actors and how to run the "Hello World" application. **FIXME the Guide is in progress [here](https://github.com/akka/akka-scala-seed.g8/pull/4/files#diff-179702d743b88d85b3971cba561e6ace)**. After that you can go back here and you are ready to dive deeper. diff --git a/akka-docs/src/main/paradox/scala/guide/tutorial_3.md b/akka-docs/src/main/paradox/scala/guide/tutorial_3.md index 71071f50e0..00971f2996 100644 --- a/akka-docs/src/main/paradox/scala/guide/tutorial_3.md +++ b/akka-docs/src/main/paradox/scala/guide/tutorial_3.md @@ -100,9 +100,13 @@ message is preserved in the upper layers.* We will show you in the next section We also add a safeguard against requests that come with a mismatched group or device ID. This is how the resulting the code looks like: ->@scala[NOTE: We used a feature of scala pattern matching where we can match if a certain field equals to an expected +@@@ note { .group-scala } + +We used a feature of scala pattern matching where we can match if a certain field equals to an expected value. This is achieved by variables included in backticks, like `` `variable` ``, and it means that the pattern -only match if it contains the value of `variable` in that position.] +only match if it contains the value of `variable` in that position. + +@@@ Scala : @@snip [Device.scala]($code$/scala/tutorial_3/Device.scala) { #device-with-register } @@ -113,11 +117,15 @@ Java We should not leave features untested, so we immediately write two new test cases, one exercising successful registration, the other testing the case when IDs don't match: -> NOTE: We used the `expectNoMsg()` helper method from @scala[`TestProbe`] @java[`TestKit`]. This assertion waits until the defined time-limit +@@@ note + +We used the `expectNoMsg()` helper method from @scala[`TestProbe`] @java[`TestKit`]. This assertion waits until the defined time-limit and fails if it receives any messages during this period. If no messages are received during the waiting period the assertion passes. It is usually a good idea to keep these timeouts low (but not too low) because they add significant test execution time otherwise. +@@@ + Scala : @@snip [DeviceSpec.scala]($code$/scala/tutorial_3/DeviceSpec.scala) { #device-registration-tests } diff --git a/akka-docs/src/main/paradox/scala/io-udp.md b/akka-docs/src/main/paradox/scala/io-udp.md index 7e75c7f5d8..b5697f874a 100644 --- a/akka-docs/src/main/paradox/scala/io-udp.md +++ b/akka-docs/src/main/paradox/scala/io-udp.md @@ -3,7 +3,6 @@ UDP is a connectionless datagram protocol which offers two different ways of communication on the JDK level: -> * sockets which are free to send datagrams to any destination and receive datagrams from any origin * sockets which are restricted to communication with one specific remote @@ -98,4 +97,4 @@ Another socket option will be needed to join a multicast group. Socket options must be provided to `UdpMessage.Bind` message. -@@snip [ScalaUdpMulticast.scala]($code$/scala/docs/io/ScalaUdpMulticast.scala) { #bind } \ No newline at end of file +@@snip [ScalaUdpMulticast.scala]($code$/scala/docs/io/ScalaUdpMulticast.scala) { #bind } diff --git a/akka-docs/src/main/paradox/scala/logging.md b/akka-docs/src/main/paradox/scala/logging.md index 7aa75f1c4b..3aa21eb3b0 100644 --- a/akka-docs/src/main/paradox/scala/logging.md +++ b/akka-docs/src/main/paradox/scala/logging.md @@ -307,7 +307,6 @@ stdout logger is `WARNING` and it can be silenced completely by setting Akka provides a logger for [SL4FJ](http://www.slf4j.org/). This module is available in the 'akka-slf4j.jar'. It has a single dependency: the slf4j-api jar. In your runtime, you also need a SLF4J backend. We recommend [Logback](http://logback.qos.ch/): -> ```scala libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" ``` @@ -543,4 +542,4 @@ shown below: ```scala val log = Logging(system.eventStream, "my.nice.string") -``` \ No newline at end of file +``` diff --git a/akka-docs/src/main/paradox/scala/persistence-schema-evolution.md b/akka-docs/src/main/paradox/scala/persistence-schema-evolution.md index 716d0d17ee..97086f253c 100644 --- a/akka-docs/src/main/paradox/scala/persistence-schema-evolution.md +++ b/akka-docs/src/main/paradox/scala/persistence-schema-evolution.md @@ -37,7 +37,7 @@ type to `PersistentActor` s and @ref:[persistence queries](persistence-query.md) instead of having to explicitly deal with different schemas. In summary, schema evolution in event sourced systems exposes the following characteristics: -: + * Allow the system to continue operating without large scale migrations to be applied, * Allow the system to read "old" events from the underlying storage, however present them in a "new" view to the application logic, * Transparently promote events to the latest versions during recovery (or queries) such that the business logic need not consider multiple versions of events @@ -111,7 +111,7 @@ The below figure explains how the default serialization scheme works, and how it user provided message itself, which we will from here on refer to as the `payload` (highlighted in yellow): ![persistent-message-envelope.png](../images/persistent-message-envelope.png) -> + Akka Persistence provided serializers wrap the user payload in an envelope containing all persistence-relevant information. **If the Journal uses provided Protobuf serializers for the wrapper types (e.g. PersistentRepr), then the payload will be serialized using the user configured serializer, and if none is provided explicitly, Java serialization will be used for it.** @@ -234,7 +234,7 @@ add the overhead of having to maintain the schema. When using serializers like t (except renaming the field and method used during serialization) is needed to perform such evolution: ![persistence-serializer-rename.png](../images/persistence-serializer-rename.png) -> + This is how such a rename would look in protobuf: @@snip [PersistenceSchemaEvolutionDocSpec.scala]($code$/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala) { #protobuf-rename-proto } @@ -262,7 +262,7 @@ automatically by the serializer. You can do these kinds of "promotions" either m or using a library like [Stamina](https://github.com/scalapenos/stamina) which helps to create those `V1->V2->V3->...->Vn` promotion chains without much boilerplate. ![persistence-manual-rename.png](../images/persistence-manual-rename.png) -> + The following snippet showcases how one could apply renames if working with plain JSON (using `spray.json.JsObject`): @@snip [PersistenceSchemaEvolutionDocSpec.scala]($code$/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala) { #rename-plain-json } @@ -294,7 +294,7 @@ for the recovery mechanisms that this entails. For example, a naive way of filte being delivered to a recovering `PersistentActor` is pretty simple, as one can simply filter them out in an @ref:[EventAdapter](persistence.md#event-adapters): ![persistence-drop-event.png](../images/persistence-drop-event.png) -> + The `EventAdapter` can drop old events (**O**) by emitting an empty `EventSeq`. Other events can simply be passed through (**E**). @@ -309,7 +309,6 @@ In the just described technique we have saved the PersistentActor from receiving out in the `EventAdapter`, however the event itself still was deserialized and loaded into memory. This has two notable *downsides*: -> * first, that the deserialization was actually performed, so we spent some of out time budget on the deserialization, even though the event does not contribute anything to the persistent actors state. * second, that we are *unable to remove the event class* from the system – since the serializer still needs to create @@ -324,7 +323,7 @@ This can for example be implemented by using an `SerializerWithStringManifest` that the type is no longer needed, and skip the deserialization all-together: ![persistence-drop-event-serializer.png](../images/persistence-drop-event-serializer.png) -> + The serializer is aware of the old event types that need to be skipped (**O**), and can skip deserializing them alltogether by simply returning a "tombstone" (**T**), which the EventAdapter converts into an empty EventSeq. Other events (**E**) can simply be passed through. @@ -358,7 +357,7 @@ classes which very often may be less user-friendly yet highly optimised for thro these types in a 1:1 style as illustrated below: ![persistence-detach-models.png](../images/persistence-detach-models.png) -> + Domain events (**A**) are adapted to the data model events (**D**) by the `EventAdapter`. The data model can be a format natively understood by the journal, such that it can store it more efficiently or include additional data for the event (e.g. tags), for ease of later querying. @@ -440,7 +439,7 @@ on what the user actually intended to change (instead of the composite `UserDeta of our model). ![persistence-event-adapter-1-n.png](../images/persistence-event-adapter-1-n.png) -> + The `EventAdapter` splits the incoming event into smaller more fine grained events during recovery. During recovery however, we now need to convert the old `V1` model into the `V2` representation of the change. @@ -450,4 +449,4 @@ and the address change is handled similarily: @@snip [PersistenceSchemaEvolutionDocSpec.scala]($code$/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala) { #split-events-during-recovery } By returning an `EventSeq` from the event adapter, the recovered event can be converted to multiple events before -being delivered to the persistent actor. \ No newline at end of file +being delivered to the persistent actor. diff --git a/akka-docs/src/main/paradox/scala/remoting-artery.md b/akka-docs/src/main/paradox/scala/remoting-artery.md index 9fbbc08d77..d515ba891f 100644 --- a/akka-docs/src/main/paradox/scala/remoting-artery.md +++ b/akka-docs/src/main/paradox/scala/remoting-artery.md @@ -701,10 +701,11 @@ Artery has been designed for low latency and as a result it can be CPU hungry wh This is not always desirable. It is possible to tune the tradeoff between CPU usage and latency with the following configuration: -> +``` # Values can be from 1 to 10, where 10 strongly prefers low latency # and 1 strongly prefers less CPU usage akka.remote.artery.advanced.idle-cpu-level = 1 +``` By setting this value to a lower number, it tells Akka to do longer "sleeping" periods on its thread dedicated for [spin-waiting](https://en.wikipedia.org/wiki/Busy_waiting) and hence reducing CPU load when there is no @@ -787,4 +788,4 @@ akka { } } } -``` \ No newline at end of file +``` diff --git a/akka-docs/src/main/paradox/scala/remoting.md b/akka-docs/src/main/paradox/scala/remoting.md index 4d44d121f9..3820786a50 100644 --- a/akka-docs/src/main/paradox/scala/remoting.md +++ b/akka-docs/src/main/paradox/scala/remoting.md @@ -658,4 +658,4 @@ Keep in mind that local.address will most likely be in one of private network ra * *172.16.0.0 - 172.31.255.255* (network class B) * *192.168.0.0 - 192.168.255.255* (network class C) -For further details see [RFC 1597]([https://tools.ietf.org/html/rfc1597](https://tools.ietf.org/html/rfc1597)) and [RFC 1918]([https://tools.ietf.org/html/rfc1918](https://tools.ietf.org/html/rfc1918)). \ No newline at end of file +For further details see [RFC 1597](https://tools.ietf.org/html/rfc1597) and [RFC 1918](https://tools.ietf.org/html/rfc1918). diff --git a/akka-docs/src/main/paradox/scala/security/2017-02-10-java-serialization.md b/akka-docs/src/main/paradox/scala/security/2017-02-10-java-serialization.md index 697c9a1f83..d91f2b134b 100644 --- a/akka-docs/src/main/paradox/scala/security/2017-02-10-java-serialization.md +++ b/akka-docs/src/main/paradox/scala/security/2017-02-10-java-serialization.md @@ -1,6 +1,6 @@ # Java Serialization, Fixed in Akka 2.4.17 -## Date +### Date 10 Feburary 2017 @@ -52,4 +52,4 @@ It will also be fixed in 2.5-M2 or 2.5.0-RC1. ### Acknowledgements -We would like to thank Alvaro Munoz at Hewlett Packard Enterprise Security & Adrian Bravo at Workday for their thorough investigation and bringing this issue to our attention. \ No newline at end of file +We would like to thank Alvaro Munoz at Hewlett Packard Enterprise Security & Adrian Bravo at Workday for their thorough investigation and bringing this issue to our attention. diff --git a/akka-docs/src/main/paradox/scala/security/index.md b/akka-docs/src/main/paradox/scala/security/index.md index 3d3d2e6976..32fb931997 100644 --- a/akka-docs/src/main/paradox/scala/security/index.md +++ b/akka-docs/src/main/paradox/scala/security/index.md @@ -24,10 +24,10 @@ to ensure that a fix can be provided without delay. ## Fixed Security Vulnerabilities -@@toc { depth=1 } +@@toc { .list depth=1 } @@@ index * [2017-02-10-java-serialization](2017-02-10-java-serialization.md) -@@@ \ No newline at end of file +@@@ diff --git a/akka-docs/src/main/paradox/scala/stream/stages-overview.md b/akka-docs/src/main/paradox/scala/stream/stages-overview.md index 764ee7a1fa..74ead74af2 100644 --- a/akka-docs/src/main/paradox/scala/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/scala/stream/stages-overview.md @@ -1,9 +1,13 @@ # Overview of built-in stages and their semantics +
+ ## Source stages These built-in sources are available from `akka.stream.scaladsl.Source`: +--------------------------------------------------------------- + ### fromIterator Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew @@ -15,6 +19,8 @@ If the iterator perform blocking operations, make sure to run it on a separate d **completes** when the iterator reaches its end +--------------------------------------------------------------- + ### apply Stream the values of an `immutable.Seq`. @@ -23,6 +29,8 @@ Stream the values of an `immutable.Seq`. **completes** when the last element of the seq has been emitted +--------------------------------------------------------------- + ### single Stream a single object @@ -31,6 +39,8 @@ Stream a single object **completes** when the single value has been emitted +--------------------------------------------------------------- + ### repeat Stream a single object repeatedly @@ -39,6 +49,8 @@ Stream a single object repeatedly **completes** never +--------------------------------------------------------------- + ### cycle Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning @@ -50,6 +62,8 @@ exception. **completes** never +--------------------------------------------------------------- + ### tick A periodical repetition of an arbitrary object. Delay of first tick is specified @@ -59,6 +73,8 @@ separately from interval of the following ticks. **completes** never +--------------------------------------------------------------- + ### fromFuture Send the single value of the `Future` when it completes and there is demand. @@ -68,6 +84,8 @@ If the future fails the stream is failed with that exception. **completes** after the future has completed +--------------------------------------------------------------- + ### fromCompletionStage Send the single value of the Java `CompletionStage` when it completes and there is demand. @@ -77,6 +95,8 @@ If the future fails the stream is failed with that exception. **completes** after the future has completed +--------------------------------------------------------------- + ### fromFutureSource Streams the elements of the given future source once it successfully completes. @@ -86,6 +106,8 @@ If the future fails the stream is failed. **completes** after the *future* source completes +--------------------------------------------------------------- + ### fromSourceCompletionStage Streams the elements of an asynchronous source once its given *completion* stage completes. @@ -95,6 +117,8 @@ If the *completion* fails the stream is failed with that exception. **completes** after the asynchronous source completes +--------------------------------------------------------------- + ### unfold Stream the result of a function as long as it returns a `Some`, the value inside the option @@ -107,6 +131,8 @@ Can be used to implement many stateful sources without having to touch the more **completes** when the unfold function returns an empty value +--------------------------------------------------------------- + ### unfoldAsync Just like `unfold` but the fold function returns a `Future` which will cause the source to @@ -118,6 +144,8 @@ Can be used to implement many stateful sources without having to touch the more **completes** when the future returned by the unfold function completes with an empty value +--------------------------------------------------------------- + ### empty Complete right away without ever emitting any elements. Useful when you have to provide a source to @@ -127,6 +155,8 @@ an API but there are no elements to emit. **completes** directly +--------------------------------------------------------------- + ### maybe Materialize a `Promise[Option[T]]` that if completed with a `Some[T]` will emit that *T* and then complete @@ -136,6 +166,8 @@ the stream, or if completed with `None` complete the stream right away. **completes** after emitting some value, or directly if the promise is completed with no value +--------------------------------------------------------------- + ### failed Fail directly with a user specified exception. @@ -144,7 +176,9 @@ Fail directly with a user specified exception. **completes** fails the stream directly with the given exception -#### lazily +--------------------------------------------------------------- + +### lazily Defers creation and materialization of a `Source` until there is demand. @@ -152,6 +186,8 @@ Defers creation and materialization of a `Source` until there is demand. **completes** depends on the wrapped `Source` +--------------------------------------------------------------- + ### actorPublisher Wrap an actor extending `ActorPublisher` as a source. @@ -160,6 +196,8 @@ Wrap an actor extending `ActorPublisher` as a source. **completes** when the actor stops +--------------------------------------------------------------- + ### actorRef Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain @@ -170,6 +208,8 @@ elements or failing the stream, the strategy is chosen by the user. **completes** when the actorref is sent `akka.actor.Status.Success` or `PoisonPill` +--------------------------------------------------------------- + ### combine Combine several sources, using a given strategy such as merge or concat, into one source. @@ -178,6 +218,8 @@ Combine several sources, using a given strategy such as merge or concat, into on **completes** when all sources has completed +--------------------------------------------------------------- + ### unfoldResource Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. @@ -186,6 +228,8 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa **completes** when read function returns `None` +--------------------------------------------------------------- + ### unfoldResourceAsync Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. @@ -195,6 +239,8 @@ Functions return `Future` to achieve asynchronous processing **completes** when `Future` from read function returns `None` +--------------------------------------------------------------- + ### queue Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains @@ -206,14 +252,20 @@ a strategy specified by the user. Functionality for tracking when an element has **completes** when downstream completes +--------------------------------------------------------------- + ### asSubscriber Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`. +--------------------------------------------------------------- + ### fromPublisher Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`. +--------------------------------------------------------------- + ### zipN Combine the elements of multiple streams into a stream of sequences. @@ -222,6 +274,8 @@ Combine the elements of multiple streams into a stream of sequences. **completes** when any upstream completes +--------------------------------------------------------------- + ### zipWithN Combine the elements of multiple streams into a stream of sequences using a combiner function. @@ -230,10 +284,16 @@ Combine the elements of multiple streams into a stream of sequences using a comb **completes** when any upstream completes +--------------------------------------------------------------- + +
+ ## Sink stages These built-in sinks are available from `akka.stream.scaladsl.Sink`: +--------------------------------------------------------------- + ### head Materializes into a `Future` which completes with the first value arriving, @@ -243,6 +303,8 @@ after this the stream is canceled. If no element is emitted, the future is be fa **backpressures** never +--------------------------------------------------------------- + ### headOption Materializes into a `Future[Option[T]]` which completes with the first value arriving wrapped in a `Some`, @@ -252,6 +314,8 @@ or a `None` if the stream completes without any elements emitted. **backpressures** never +--------------------------------------------------------------- + ### last Materializes into a `Future` which will complete with the last value emitted when the stream @@ -261,6 +325,8 @@ completes. If the stream completes with no elements the future is failed. **backpressures** never +--------------------------------------------------------------- + ### lastOption Materialize a `Future[Option[T]]` which completes with the last value @@ -271,6 +337,8 @@ completed with `None`. **backpressures** never +--------------------------------------------------------------- + ### ignore Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually @@ -280,12 +348,16 @@ do anything with the elements. **backpressures** never +--------------------------------------------------------------- + ### cancelled Immediately cancel the stream **cancels** immediately +--------------------------------------------------------------- + ### seq Collect values emitted from the stream into a collection, the collection is available through a `Future` or @@ -294,6 +366,8 @@ if more element are emitted the sink will cancel the stream **cancels** If too many values are collected +--------------------------------------------------------------- + ### foreach Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. @@ -307,6 +381,8 @@ Note that it is not safe to mutate state from the procedure. **backpressures** when the previous procedure invocation has not yet completed +--------------------------------------------------------------- + ### foreachParallel Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. @@ -315,6 +391,8 @@ Like `foreach` but allows up to `parallellism` procedure calls to happen in para **backpressures** when the previous parallel procedure invocations has not yet completed +--------------------------------------------------------------- + ### onComplete Invoke a callback when the stream has completed or failed. @@ -323,6 +401,8 @@ Invoke a callback when the stream has completed or failed. **backpressures** never +--------------------------------------------------------------- + ### lazyInit Invoke sinkFactory function to create a real sink upon receiving the first element. Internal `Sink` will not be created if there are no elements, @@ -332,6 +412,8 @@ because of completion or error. *fallback* will be invoked if there was no eleme **backpressures** when initialized and when created sink backpressures +--------------------------------------------------------------- + ### queue Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. The queue contains @@ -341,6 +423,8 @@ a buffer in case stream emitting elements faster than queue pulling them. **backpressures** when buffer has some space +--------------------------------------------------------------- + ### fold Fold over emitted element with a function, where each invocation will get the new element and the result from the @@ -355,6 +439,8 @@ between invocations. **backpressures** when the previous fold function invocation has not yet completed +--------------------------------------------------------------- + ### reduce Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation @@ -366,6 +452,8 @@ Materializes into a future that will be completed by the last result of the redu **backpressures** when the previous reduction function invocation has not yet completed +--------------------------------------------------------------- + ### combine Combine several sinks into one using a user specified strategy @@ -374,6 +462,8 @@ Combine several sinks into one using a user specified strategy **backpressures** depends on the strategy +--------------------------------------------------------------- + ### actorRef Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox. @@ -382,6 +472,8 @@ Send the elements from the stream to an `ActorRef`. No backpressure so care must **backpressures** never +--------------------------------------------------------------- + ### actorRefWithAck Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, @@ -391,6 +483,8 @@ to provide back pressure onto the sink. **backpressures** when the actor acknowledgement has not arrived +--------------------------------------------------------------- + ### actorSubscriber Create an actor from a `Props` upon materialization, where the actor implements `ActorSubscriber`, which will @@ -402,20 +496,30 @@ Materializes into an `ActorRef` to the created actor. **backpressures** depends on the actor implementation +--------------------------------------------------------------- + ### asPublisher Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`. +--------------------------------------------------------------- + ### fromSubscriber Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink +--------------------------------------------------------------- + +
+ ## Additional Sink and Source converters Sources and sinks for integrating with `java.io.InputStream` and `java.io.OutputStream` can be found on `StreamConverters`. As they are blocking APIs the implementations of these stages are run on a separate dispatcher configured through the `akka.stream.blocking-io-dispatcher`. +--------------------------------------------------------------- + ### fromOutputStream Create a sink that wraps an `OutputStream`. Takes a function that produces an `OutputStream`, when the sink is @@ -430,6 +534,8 @@ to handle multiple invocations. The `OutputStream` will be closed when the stream that flows into the `Sink` is completed, and the `Sink` will cancel its inflow when the `OutputStream` is no longer writable. +--------------------------------------------------------------- + ### asInputStream Create a sink which materializes into an `InputStream` that can be read to trigger demand through the sink. @@ -438,6 +544,8 @@ Bytes emitted through the stream will be available for reading through the `Inpu The `InputStream` will be ended when the stream flowing into this `Sink` completes, and the closing the `InputStream` will cancel the inflow of this `Sink`. +--------------------------------------------------------------- + ### fromInputStream Create a source that wraps an `InputStream`. Takes a function that produces an `InputStream`, when the source is @@ -452,6 +560,8 @@ to handle multiple invocations. The `InputStream` will be closed when the `Source` is canceled from its downstream, and reaching the end of the `InputStream` will complete the `Source`. +--------------------------------------------------------------- + ### asOutputStream Create a source that materializes into an `OutputStream`. When bytes are written to the `OutputStream` they @@ -460,6 +570,8 @@ are emitted from the source. The `OutputStream` will no longer be writable when the `Source` has been canceled from its downstream, and closing the `OutputStream` will complete the `Source`. +--------------------------------------------------------------- + ### asJavaStream Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink. @@ -470,11 +582,15 @@ The Java 8 `Stream` will be ended when the stream flowing into this `Sink` compl Be aware that Java `Stream` blocks current thread while waiting on next element from downstream. +--------------------------------------------------------------- + ### fromJavaStream Create a source that wraps a Java 8 `Stream`. `Source` uses a stream iterator to get all its elements and send them downstream on demand. +--------------------------------------------------------------- + ### javaCollector Create a sink which materializes into a `Future` which will be completed with a result of the Java 8 `Collector` @@ -486,6 +602,8 @@ The `Collector` can also do reduction at the end. Reduction processing is perfor Note that a flow can be materialized multiple times, so the function producing the `Collector` must be able to handle multiple invocations. +--------------------------------------------------------------- + ### javaCollectorParallelUnordered Create a sink which materializes into a `Future` which will be completed with a result of the Java 8 `Collector` @@ -497,19 +615,31 @@ The `Collector` can also do reduction at the end. Reduction processing is perfor Note that a flow can be materialized multiple times, so the function producing the `Collector` must be able to handle multiple invocations. +--------------------------------------------------------------- + +
+ ## File IO Sinks and Sources Sources and sinks for reading and writing files can be found on `FileIO`. +--------------------------------------------------------------- + ### fromPath Emit the contents of a file, as `ByteString` s, materializes into a `Future` which will be completed with a `IOResult` upon reaching the end of the file or if there is a failure. +--------------------------------------------------------------- + ### toPath Create a sink which will write incoming `ByteString` s to a given file path. +--------------------------------------------------------------- + +
+ ## Flow stages All flows by default backpressure if the computation they encapsulate is not fast enough to keep up with the rate of @@ -523,14 +653,18 @@ For in-band error handling of normal errors (dropping elements if a map fails fo supervision support, or explicitly wrap your element types in a proper container that can express error or success states (for example `Try` in Scala). +
+ ## Simple processing stages These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a single input (e.g. `mapConcat') or consume multiple elements before emitting one output (e.g. `filter`). However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the -rate is affected. This is in contrast with [detached stages](#detached-stages-overview) which can change their processing behavior +rate is affected. This is in contrast with [detached stages](#backpressure-aware-stages) which can change their processing behavior depending on being backpressured by downstream or not. +--------------------------------------------------------------- + ### map Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. @@ -541,6 +675,8 @@ Transform each element in the stream by calling a mapping function with it and p **completes** when upstream completes +--------------------------------------------------------------- + ### mapConcat Transform each element into zero or more elements that are individually passed downstream. @@ -551,6 +687,8 @@ Transform each element into zero or more elements that are individually passed d **completes** when upstream completes and all remaining elements has been emitted +--------------------------------------------------------------- + ### statefulMapConcat Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that @@ -562,6 +700,8 @@ the transformation function is created from a factory for every materialization **completes** when upstream completes and all remaining elements has been emitted +--------------------------------------------------------------- + ### filter Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if @@ -573,6 +713,8 @@ it returns false the element is discarded. **completes** when upstream completes +--------------------------------------------------------------- + ### filterNot Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if @@ -584,6 +726,8 @@ it returns true the element is discarded. **completes** when upstream completes +--------------------------------------------------------------- + ### collect Apply a partial function to each incoming element, if the partial function is defined for a value the returned @@ -595,6 +739,8 @@ value is passed downstream. Can often replace `filter` followed by `map` to achi **completes** when upstream completes +--------------------------------------------------------------- + ### grouped Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of @@ -606,6 +752,8 @@ elements downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### sliding Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream. @@ -618,6 +766,8 @@ Note: the last window might be smaller than the requested size due to end of str **completes** when upstream completes +--------------------------------------------------------------- + ### scan Emit its current value which starts at `zero` and then applies the current and next value to the given function @@ -632,6 +782,8 @@ the second element is required from downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### scanAsync Just like `scan` but receiving a function that results in a `Future` to the next value. @@ -642,6 +794,8 @@ Just like `scan` but receiving a function that results in a `Future` to the next **completes** when upstream completes and the last `Future` is resolved +--------------------------------------------------------------- + ### fold Start with current value `zero` and then apply the current and next value to the given function, when upstream @@ -653,6 +807,8 @@ complete the current value is emitted downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### foldAsync Just like `fold` but receiving a function that results in a `Future` to the next value. @@ -663,6 +819,8 @@ Just like `fold` but receiving a function that results in a `Future` to the next **completes** when upstream completes and the last `Future` is resolved +--------------------------------------------------------------- + ### reduce Start with first element and then apply the current and next value to the given function, when upstream @@ -674,6 +832,8 @@ complete the current value is emitted downstream. Similar to `fold`. **completes** when upstream completes +--------------------------------------------------------------- + ### drop Drop `n` elements and then pass any subsequent element downstream. @@ -684,6 +844,8 @@ Drop `n` elements and then pass any subsequent element downstream. **completes** when upstream completes +--------------------------------------------------------------- + ### take Pass `n` incoming elements downstream and then complete @@ -694,6 +856,8 @@ Pass `n` incoming elements downstream and then complete **completes** when the defined number of elements has been taken or upstream completes +--------------------------------------------------------------- + ### takeWhile Pass elements downstream as long as a predicate function return true for the element include the element @@ -705,6 +869,8 @@ when the predicate first return false and then complete. **completes** when predicate returned false or upstream completes +--------------------------------------------------------------- + ### dropWhile Drop elements as long as a predicate function return true for the element @@ -715,6 +881,8 @@ Drop elements as long as a predicate function return true for the element **completes** when upstream completes +--------------------------------------------------------------- + ### recover Allow sending of one last element downstream when a failure has happened upstream. @@ -727,6 +895,8 @@ Throwing an exception inside `recover` _will_ be logged on ERROR level automatic **completes** when upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### recoverWith Allow switching to alternative Source when a failure has happened upstream. @@ -739,6 +909,8 @@ Throwing an exception inside `recoverWith` _will_ be logged on ERROR level autom **completes** upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### recoverWithRetries RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after @@ -755,6 +927,8 @@ This stage can recover the failure signal, but not the skipped elements, which w **completes** when upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### mapError While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging @@ -770,6 +944,8 @@ Similarily to `recover` throwing an exception inside `mapError` _will_ be logged **backpressures** when downstream backpressures **completes** when upstream completes or upstream failed with exception pf can handle +--------------------------------------------------------------- + ### detach Detach upstream demand from downstream demand without detaching the stream rates. @@ -780,6 +956,8 @@ Detach upstream demand from downstream demand without detaching the stream rates **completes** when upstream completes +--------------------------------------------------------------- + ### throttle Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where @@ -791,6 +969,8 @@ a function has to be provided to calculate the individual cost of each element. **completes** when upstream completes +--------------------------------------------------------------- + ### intersperse Intersperse stream with provided element similar to `List.mkString`. It can inject start and end marker elements to stream. @@ -801,6 +981,8 @@ Intersperse stream with provided element similar to `List.mkString`. It can inje **completes** when upstream completes +--------------------------------------------------------------- + ### limit Limit number of element from upstream to given `max` number. @@ -811,6 +993,8 @@ Limit number of element from upstream to given `max` number. **completes** when upstream completes and the number of emitted elements has not reached max +--------------------------------------------------------------- + ### limitWeighted Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. @@ -822,6 +1006,8 @@ Evaluated cost of each element defines how many elements will be allowed to trav **completes** when upstream completes and the number of emitted elements has not reached max +--------------------------------------------------------------- + ### log Log elements flowing through the stream as well as completion and erroring. By default element and @@ -834,6 +1020,8 @@ This can be changed by calling `Attributes.logLevels(...)` on the given Flow. **completes** when upstream completes +--------------------------------------------------------------- + ### recoverWithRetries Switch to alternative Source on flow failure. It stays in effect after a failure has been recovered up to `attempts` @@ -845,8 +1033,14 @@ number of times. Each time a failure is fed into the partial function and a new **completes** when upstream completes or upstream failed with exception provided partial function can handle +--------------------------------------------------------------- + +
+ ## Flow stages composed of Sinks and Sources +--------------------------------------------------------------- + ### Flow.fromSinkAndSource Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink` @@ -856,6 +1050,8 @@ Note that termination events, like completion and cancelation is not automatical of the such-composed Flow. Use `CoupledTerminationFlow` if you want to couple termination of both of the ends, for example most useful in handling websocket connections. +--------------------------------------------------------------- + ### CoupledTerminationFlow.fromSinkAndSource Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. @@ -866,29 +1062,28 @@ Similar to `Flow.fromSinkAndSource` however couples the termination of these two E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled, however the Sink will also be completed. The table below illustrates the effects in detail: -+=================================================+=============================+=================================+ | Returned Flow | Sink (in) | Source (out) | -+=================================================+=============================+=================================+ +|-------------------------------------------------|-----------------------------|---------------------------------| | cause: upstream (sink-side) receives completion | effect: receives completion | effect: receives cancel | -+-------------------------------------------------+-----------------------------+---------------------------------+ | cause: upstream (sink-side) receives error | effect: receives error | effect: receives cancel | -+-------------------------------------------------+-----------------------------+---------------------------------+ | cause: downstream (source-side) receives cancel | effect: completes | effect: receives cancel | -+-------------------------------------------------+-----------------------------+---------------------------------+ | effect: cancels upstream, completes downstream | effect: completes | cause: signals complete | -+-------------------------------------------------+-----------------------------+---------------------------------+ | effect: cancels upstream, errors downstream | effect: receives error | cause: signals error or throws | -+-------------------------------------------------+-----------------------------+---------------------------------+ | effect: cancels upstream, completes downstream | cause: cancels | effect: receives cancel | -+=================================================+=============================+=================================+ The order in which the *in* and *out* sides receive their respective completion signals is not defined, do not rely on its ordering. +--------------------------------------------------------------- + +
+ ## Asynchronous processing stages These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous operation at the same time (usually handling the completion of a Future). +--------------------------------------------------------------- + ### mapAsync Pass incoming elements to a function that return a `Future` result. When the future arrives the result is passed @@ -903,6 +1098,8 @@ If a Future fails, the stream also fails (unless a different supervision strateg **completes** when upstream completes and all futures has been completed and all elements has been emitted +--------------------------------------------------------------- + ### mapAsyncUnordered Like `mapAsync` but `Future` results are passed downstream as they arrive regardless of the order of the elements @@ -916,10 +1113,16 @@ If a Future fails, the stream also fails (unless a different supervision strateg **completes** upstream completes and all futures has been completed and all elements has been emitted +--------------------------------------------------------------- + +
+ ## Timer driven stages These stages process elements using timers, delaying, dropping or grouping elements for certain time durations. +--------------------------------------------------------------- + ### takeWithin Pass elements downstream within a timeout and then complete. @@ -930,6 +1133,8 @@ Pass elements downstream within a timeout and then complete. **completes** upstream completes or timer fires +--------------------------------------------------------------- + ### dropWithin Drop elements until a timeout has fired @@ -940,6 +1145,8 @@ Drop elements until a timeout has fired **completes** upstream completes +--------------------------------------------------------------- + ### groupedWithin Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, @@ -953,8 +1160,10 @@ but not if no elements has been grouped (i.e: no empty groups), or when limit ha **completes** when upstream completes -groupedWeightedWithin -^^^^^^^^^^^^^ +--------------------------------------------------------------- + +### groupedWeightedWithin + Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group. @@ -966,6 +1175,8 @@ but not if no elements has been grouped (i.e: no empty groups), or when weight l **completes** when upstream completes +--------------------------------------------------------------- + ### initialDelay Delay the initial element by a user specified duration from stream materialization. @@ -976,6 +1187,8 @@ Delay the initial element by a user specified duration from stream materializati **completes** when upstream completes +--------------------------------------------------------------- + ### delay Delay every element passed through with a specific duration. @@ -986,11 +1199,17 @@ Delay every element passed through with a specific duration. **completes** when upstream completes and buffered elements has been drained - + +--------------------------------------------------------------- + +
+ ## Backpressure aware stages These stages are aware of the backpressure provided by their downstreams and able to adapt their behavior to that signal. +--------------------------------------------------------------- + ### conflate Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as @@ -1003,6 +1222,8 @@ average of incoming numbers, if aggregation should lead to a different type `con **completes** when upstream completes +--------------------------------------------------------------- + ### conflateWithSeed Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there @@ -1015,6 +1236,8 @@ transform it to the summary type. **completes** when upstream completes +--------------------------------------------------------------- + ### batch Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there @@ -1033,6 +1256,8 @@ aggregated to the batched value. **completes** when upstream completes and a "possibly pending" element was drained +--------------------------------------------------------------- + ### batchWeighted Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there @@ -1049,6 +1274,8 @@ aggregated to the batched value. **completes** upstream completes and a "possibly pending" element was drained +--------------------------------------------------------------- + ### expand Allow for a faster downstream by expanding the last incoming element to an `Iterator`. For example @@ -1060,6 +1287,8 @@ Allow for a faster downstream by expanding the last incoming element to an `Iter **completes** when upstream completes +--------------------------------------------------------------- + ### buffer (Backpressure) Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full backpressure @@ -1071,6 +1300,8 @@ is applied. **completes** when upstream completes and buffered elements has been drained +--------------------------------------------------------------- + ### buffer (Drop) Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full elements are @@ -1087,6 +1318,8 @@ dropped according to the specified `OverflowStrategy`: **completes** upstream completes and buffered elements has been drained +--------------------------------------------------------------- + ### buffer (Fail) Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full the stage fails @@ -1098,11 +1331,17 @@ the flow with a `BufferOverflowException`. **completes** when upstream completes and buffered elements has been drained +--------------------------------------------------------------- + +
+ ## Nesting and flattening stages These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains nested streams and turn them into a stream of elements instead (flattening). +--------------------------------------------------------------- + ### prefixAndTail Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) @@ -1114,6 +1353,8 @@ and returns a pair containing a strict sequence of the taken element and a strea **completes** when prefix elements has been consumed and substream has been consumed +--------------------------------------------------------------- + ### groupBy Demultiplex the incoming stream into separate output streams. @@ -1123,6 +1364,8 @@ there is an element pending for a group whose substream backpressures **completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +--------------------------------------------------------------- + ### splitWhen Split off elements into a new substream whenever a predicate function return `true`. @@ -1133,6 +1376,8 @@ Split off elements into a new substream whenever a predicate function return `tr **completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +--------------------------------------------------------------- + ### splitAfter End the current substream whenever a predicate returns `true`, starting a new substream for the next element. @@ -1143,6 +1388,8 @@ End the current substream whenever a predicate returns `true`, starting a new su **completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) +--------------------------------------------------------------- + ### flatMapConcat Transform each input element into a `Source` whose elements are then flattened into the output stream through @@ -1154,6 +1401,8 @@ concatenation. This means each source is fully consumed before consumption of th **completes** when upstream completes and all consumed substreams complete +--------------------------------------------------------------- + ### flatMapMerge Transform each input element into a `Source` whose elements are then flattened into the output stream through @@ -1165,10 +1414,16 @@ merging. The maximum number of merged sources has to be specified. **completes** when upstream completes and all consumed substreams complete +--------------------------------------------------------------- + +
+ ## Time aware stages Those stages operate taking time into consideration. +--------------------------------------------------------------- + ### initialTimeout If the first element has not passed through this stage before the provided timeout, the stream is failed @@ -1182,6 +1437,8 @@ with a `TimeoutException`. **cancels** when downstream cancels +--------------------------------------------------------------- + ### completionTimeout If the completion of the stream does not happen until the provided timeout, the stream is failed @@ -1195,6 +1452,8 @@ with a `TimeoutException`. **cancels** when downstream cancels +--------------------------------------------------------------- + ### idleTimeout If the time between two processed elements exceeds the provided timeout, the stream is failed @@ -1209,6 +1468,8 @@ check is one period (equals to timeout value). **cancels** when downstream cancels +--------------------------------------------------------------- + ### backpressureTimeout If the time between the emission of an element and the following downstream demand exceeds the provided timeout, @@ -1223,6 +1484,8 @@ check is one period (equals to timeout value). **cancels** when downstream cancels +--------------------------------------------------------------- + ### keepAlive Injects additional (configured) elements if upstream does not emit for a configured amount of time. @@ -1235,6 +1498,8 @@ Injects additional (configured) elements if upstream does not emit for a configu **cancels** when downstream cancels +--------------------------------------------------------------- + ### initialDelay Delays the initial element by the specified duration. @@ -1247,11 +1512,17 @@ Delays the initial element by the specified duration. **cancels** when downstream cancels +--------------------------------------------------------------- + +
+ ## Fan-in stages These stages take multiple streams as their input and provide a single output combining the elements from all of the inputs in different ways. +--------------------------------------------------------------- + ### merge Merge multiple sources. Picks elements randomly if all sources has elements ready. @@ -1262,6 +1533,8 @@ Merge multiple sources. Picks elements randomly if all sources has elements read **completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) +--------------------------------------------------------------- + ### mergeSorted Merge multiple sources. Waits for one element to be ready from each input stream and emits the @@ -1273,6 +1546,8 @@ smallest element. **completes** when all upstreams complete +--------------------------------------------------------------- + ### mergePreferred Merge multiple sources. Prefer one source if all sources has elements ready. @@ -1283,6 +1558,8 @@ Merge multiple sources. Prefer one source if all sources has elements ready. **completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) +--------------------------------------------------------------- + ### zip Combines elements from each of multiple sources into tuples and passes the tuples downstream. @@ -1293,6 +1570,8 @@ Combines elements from each of multiple sources into tuples and passes the tuple **completes** when any upstream completes +--------------------------------------------------------------- + ### zipWith Combines elements from multiple sources through a `combine` function and passes the @@ -1304,6 +1583,8 @@ returned value downstream. **completes** when any upstream completes +--------------------------------------------------------------- + ### zipWithIndex Zips elements of current flow with its indices. @@ -1314,6 +1595,8 @@ Zips elements of current flow with its indices. **completes** when upstream completes +--------------------------------------------------------------- + ### concat After completion of the original upstream the elements of the given source will be emitted. @@ -1324,6 +1607,8 @@ After completion of the original upstream the elements of the given source will **completes** when all upstreams complete +--------------------------------------------------------------- + ### ++ Just a shorthand for concat @@ -1334,6 +1619,8 @@ Just a shorthand for concat **completes** when all upstreams complete +--------------------------------------------------------------- + ### prepend Prepends the given source to the flow, consuming it until completion before the original source is consumed. @@ -1346,6 +1633,8 @@ If materialized values needs to be collected `prependMat` is available. **completes** when all upstreams complete +--------------------------------------------------------------- + ### orElse If the primary source completes without emitting any elements, the elements from the secondary source @@ -1364,6 +1653,8 @@ is available from the second stream **completes** the primary stream completes after emitting at least one element, when the primary stream completes without emitting and the secondary stream already has completed or when the secondary stream completes +--------------------------------------------------------------- + ### interleave Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one @@ -1375,11 +1666,17 @@ source completes the rest of the other stream will be emitted. **completes** when both upstreams have completed +--------------------------------------------------------------- + +
+ ## Fan-out stages These have one input and multiple outputs. They might route the elements between different outputs, or emit elements on multiple outputs at the same time. +--------------------------------------------------------------- + ### unzip Takes a stream of two element tuples and unzips the two elements ino two different downstreams. @@ -1390,6 +1687,8 @@ Takes a stream of two element tuples and unzips the two elements ino two differe **completes** when upstream completes +--------------------------------------------------------------- + ### unzipWith Splits each element of input into multiple downstreams using a function @@ -1400,6 +1699,8 @@ Splits each element of input into multiple downstreams using a function **completes** when upstream completes +--------------------------------------------------------------- + ### broadcast Emit each incoming element each of `n` outputs. @@ -1410,6 +1711,8 @@ Emit each incoming element each of `n` outputs. **completes** when upstream completes +--------------------------------------------------------------- + ### balance Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. @@ -1420,6 +1723,8 @@ Fan-out the stream to several streams. Each upstream element is emitted to the f **completes** when upstream completes +--------------------------------------------------------------- + ### partition Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the @@ -1431,8 +1736,14 @@ partitioner function applied to the element. **completes** when upstream completes and no output is pending +--------------------------------------------------------------- + +
+ ## Watching status stages +--------------------------------------------------------------- + ### watchTermination Materializes to a `Future` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. @@ -1444,6 +1755,8 @@ The stage otherwise passes through elements unchanged. **completes** when upstream completes +--------------------------------------------------------------- + ### monitor Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage. The stage otherwise @@ -1454,4 +1767,7 @@ event, and may therefore affect performance. **backpressures** when downstream **backpressures** -**completes** when upstream completes \ No newline at end of file +**completes** when upstream completes + +--------------------------------------------------------------- + diff --git a/akka-docs/src/main/paradox/scala/stream/stream-composition.md b/akka-docs/src/main/paradox/scala/stream/stream-composition.md index 0f2d24e262..875b59336c 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-composition.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-composition.md @@ -11,12 +11,8 @@ be processed arrive and leave the stage. In this view, a `Source` is nothing els output port, or, a `BidiFlow` is a "box" with exactly two input and two output ports. In the figure below we illustrate the most common used stages viewed as "boxes". -| - ![compose_shapes.png](../../images/compose_shapes.png) -| - The *linear* stages are `Source`, `Sink` and `Flow`, as these can be used to compose strict chains of processing stages. Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build @@ -35,12 +31,8 @@ to interact with. One good example is the `Http` server component, which is enco The following figure demonstrates various composite stages, that contain various other type of stages internally, but hiding them behind a *shape* that looks like a `Source`, `Flow`, etc. -| - ![compose_composites.png](../../images/compose_composites.png) -| - One interesting example above is a `Flow` which is composed of a disconnected `Sink` and `Source`. This can be achieved by using the `fromSinkAndSource()` constructor method on `Flow` which takes the two parts as parameters. @@ -62,12 +54,8 @@ These mechanics allow arbitrary nesting of modules. For example the following fi that is built from a composite `Source` and a composite `Sink` (which in turn contains a composite `Flow`). -| - ![compose_nested_flow.png](../../images/compose_nested_flow.png) -| - The above diagram contains one more shape that we have not seen yet, which is called `RunnableGraph`. It turns out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is *closed*. This is what the `RunnableGraph` class represents. This is the shape that a `Materializer` can take @@ -93,12 +81,8 @@ Once we have hidden the internals of our components, they act like any other bui we hide some of the internals of our composites, the result looks just like if any other predefine component has been used: -| - ![compose_nested_flow_opaque.png](../../images/compose_nested_flow_opaque.png) -| - If we look at usage of built-in components, and our custom components, there is no difference in usage as the code snippet below demonstrates. @@ -115,12 +99,8 @@ operate on are uniform across all DSLs and fit together nicely. As a first example, let's look at a more complex layout: -| - ![compose_graph.png](../../images/compose_graph.png) -| - The diagram shows a `RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages, directed and non-directed cycles. The `runnable()` method of the `GraphDSL` object allows the creation of a @@ -134,19 +114,13 @@ explicitly, and it is not necessary to import our linear stages via `add()`, so @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph-alt } -| - Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL. The way to do it is to use the `create()` factory method on `GraphDSL`. If we remove the sources and sinks from the previous example, what remains is a partial graph: -| - ![compose_graph_partial.png](../../images/compose_graph_partial.png) -| - We can recreate a similar graph in code, using the DSL in a similar way than before: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-graph } @@ -160,12 +134,8 @@ matching built-in ones. The resulting graph is already a properly wrapped module, so there is no need to call *named()* to encapsulate the graph, but it is a good practice to give names to modules to help debugging. -| - ![compose_graph_shape.png](../../images/compose_graph_shape.png) -| - Since our partial graph has the right shape, it can be already used in the simpler, linear DSL: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-use } @@ -176,12 +146,8 @@ has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are si For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods. To demonstrate this, we will create the following graph: -| - ![compose_graph_flow.png](../../images/compose_graph_flow.png) -| - The code version of the above closed graph might look like this: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-flow-dsl } @@ -237,12 +203,8 @@ graphically demonstrates what is happening. The propagation of the individual materialized values from the enclosed modules towards the top will look like this: -| - ![compose_mat.png](../../images/compose_mat.png) -| - To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a materialized type of `Promise[[Option[Int]]`. By using the combiner function `Keep.left`, the resulting materialized type is of the nested module (indicated by the color *red* on the diagram): @@ -296,11 +258,7 @@ the same attribute explicitly set. `nestedSource` gets the default attributes fr on the other hand has this attribute set, so it will be used by all nested modules. `nestedFlow` will inherit from `nestedSink` except the `map` stage which has again an explicitly provided attribute overriding the inherited one. -| - ![compose_attributes.png](../../images/compose_attributes.png) -| - This diagram illustrates the inheritance process for the example code (representing the materializer default attributes -as the color *red*, the attributes set on `nestedSink` as *blue* and the attributes set on `nestedFlow` as *green*). \ No newline at end of file +as the color *red*, the attributes set on `nestedSink` as *blue* and the attributes set on `nestedFlow` as *green*). diff --git a/akka-docs/src/main/paradox/scala/stream/stream-customize.md b/akka-docs/src/main/paradox/scala/stream/stream-customize.md index fdc29c1841..32e4b360b8 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-customize.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-customize.md @@ -95,12 +95,8 @@ the initial state while orange indicates the end state. If an operation is not l to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen in that state. -| - ![outport_transitions.png](../../images/outport_transitions.png) -| - The following operations are available for *input* ports: * `pull(in)` requests a new element from an input port. This is only possible after the port has been pushed by upstream. @@ -130,12 +126,8 @@ the initial state while orange indicates the end state. If an operation is not l to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen in that state. -| - ![inport_transitions.png](../../images/inport_transitions.png) -| - Finally, there are two methods available for convenience to complete the stage and all of its ports: * `completeStage()` is equivalent to closing all output ports and cancelling all input ports. @@ -147,7 +139,7 @@ of actions which will greatly simplify some use cases at the cost of some extra between the two APIs could be described as that the first one is signal driven from the outside, while this API is more active and drives its surroundings. -The operations of this part of the :class:`GraphStage` API are: +The operations of this part of the `GraphStage` API are: * `emit(out, elem)` and `emitMultiple(out, Iterable(elem1, elem2))` replaces the `OutHandler` with a handler that emits one or more elements when there is demand, and then reinstalls the current handlers @@ -161,7 +153,7 @@ The following methods are safe to call after invoking `emit` and `read` (and wil operation when those are done): `complete(out)`, `completeStage()`, `emit`, `emitMultiple`, `abortEmitting()` and `abortReading()` -An example of how this API simplifies a stage can be found below in the second version of the :class:`Duplicator`. +An example of how this API simplifies a stage can be found below in the second version of the `Duplicator`. ### Custom linear processing stages using GraphStage @@ -172,20 +164,12 @@ Such a stage can be illustrated as a box with two flows as it is seen in the illustration below. Demand flowing upstream leading to elements flowing downstream. -| - ![graph_stage_conceptual.png](../../images/graph_stage_conceptual.png) -| - To illustrate these concepts we create a small `GraphStage` that implements the `map` transformation. -| - ![graph_stage_map.png](../../images/graph_stage_map.png) -| - Map calls `push(out)` from the `onPush()` handler and it also calls `pull()` from the `onPull` handler resulting in the conceptual wiring above, and fully expressed in code below: @@ -197,12 +181,8 @@ demand is passed along upstream elements passed on downstream. To demonstrate a many-to-one stage we will implement filter. The conceptual wiring of `Filter` looks like this: -| - ![graph_stage_filter.png](../../images/graph_stage_filter.png) -| - As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map example by adding a conditional in the `onPush` handler and decide between a `pull(in)` or `push(out)` call @@ -213,12 +193,8 @@ example by adding a conditional in the `onPush` handler and decide between a `pu To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: -| - ![graph_stage_duplicate.png](../../images/graph_stage_duplicate.png) -| - This is a stage that has state: an option with the last element it has seen indicating if it has duplicated this last element already or not. We must also make sure to emit the extra element if the upstream completes. @@ -241,12 +217,8 @@ reinstate the original handlers: Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually would correspond to the following structure: -| - ![graph_stage_chain.png](../../images/graph_stage_chain.png) -| - In code this is only a few lines, using the `via` use our custom stages in a stream: @@snip [GraphStageDocSpec.scala]($code$/scala/docs/stream/GraphStageDocSpec.scala) { #graph-stage-chain } @@ -254,12 +226,8 @@ In code this is only a few lines, using the `via` use our custom stages in a str If we attempt to draw the sequence of events, it shows that there is one "event token" in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts. -| - ![graph_stage_tracks_1.png](../../images/graph_stage_tracks_1.png) -| - ### Completion Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit @@ -403,21 +371,13 @@ The next diagram illustrates the event sequence for a buffer with capacity of tw the downstream demand is slow to start and the buffer will fill up with upstream elements before any demand is seen from downstream. -| - ![graph_stage_detached_tracks_1.png](../../images/graph_stage_detached_tracks_1.png) -| - Another scenario would be where the demand from downstream starts coming in before any element is pushed into the buffer stage. -| - ![graph_stage_detached_tracks_2.png](../../images/graph_stage_detached_tracks_2.png) -| - The first difference we can notice is that our `Buffer` stage is automatically pulling its upstream on initialization. The buffer has demand for up to two elements without any downstream demand. @@ -487,4 +447,4 @@ shown in the linked sketch the author encountered such a density of compiler Sta that he gave up). It is interesting to note that a simplified form of this problem has found its way into the [dotty test suite](https://github.com/lampepfl/dotty/pull/1186/files). -Dotty is the development version of Scala on its way to Scala 3. \ No newline at end of file +Dotty is the development version of Scala on its way to Scala 3. diff --git a/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md b/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md index 15b1f0f46e..1a9bae68db 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md @@ -250,12 +250,8 @@ work on the tasks in parallel. It is important to note that asynchronous boundar flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work by adding information to the flow graph that has been constructed up to this point: -| - ![asyncBoundary.png](../../images/asyncBoundary.png) -| - This means that everything that is inside the red bubble will be executed by one actor and everything outside of it by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all processing stages that have been added since them. @@ -308,4 +304,4 @@ been signalled already – thus the ordering in the case of zipping is defined b If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using `MergePreferred` or `GraphStage` – which gives you full control over how the -merge is performed. \ No newline at end of file +merge is performed. diff --git a/akka-docs/src/main/paradox/scala/typed.md b/akka-docs/src/main/paradox/scala/typed.md index ad0b82e0a0..6df60c0694 100644 --- a/akka-docs/src/main/paradox/scala/typed.md +++ b/akka-docs/src/main/paradox/scala/typed.md @@ -88,11 +88,15 @@ example we make a small detour to highlight some of the theory behind this. ## A Little Bit of Theory -The [Actor Model](http://en.wikipedia.org/wiki/Actor_model1. send a finite number of messages to Actors it knows2. create a finite number of new Actors3. designate the behavior to be applied to the next message) as defined by Hewitt, Bishop and Steiger in 1973 is a -computational model that expresses exactly what it means for computation to be -distributed. The processing units—Actors—can only communicate by exchanging -messages and upon reception of a message an Actor can do the following three -fundamental actions: +The [Actor Model](http://en.wikipedia.org/wiki/Actor_model) as defined by +Hewitt, Bishop and Steiger in 1973 is a computational model that expresses +exactly what it means for computation to be distributed. The processing +units—Actors—can only communicate by exchanging messages and upon reception of a +message an Actor can do the following three fundamental actions: + +1. send a finite number of messages to Actors it knows +2. create a finite number of new Actors +3. designate the behavior to be applied to the next message The Akka Typed project expresses these actions using behaviors and addresses. Messages can be sent to an address and behind this façade there is a behavior @@ -284,4 +288,4 @@ having to worry about timeouts and spurious failures. Another side-effect is that behaviors can nicely be composed and decorated, see the `And`, `Or`, `Widened`, `ContextAware` combinators; nothing about these is special or internal, new combinators can be written as external -libraries or tailor-made for each project. \ No newline at end of file +libraries or tailor-made for each project.