diff --git a/akka-docs/src/main/paradox/java/stream/stream-integrations.md b/akka-docs/src/main/paradox/java/stream/stream-integrations.md deleted file mode 100644 index 86bacafbf0..0000000000 --- a/akka-docs/src/main/paradox/java/stream/stream-integrations.md +++ /dev/null @@ -1,514 +0,0 @@ -# Integration - -## Integrating with Actors - -For piping the elements of a stream as messages to an ordinary actor you can use -`ask` in a `mapAsync` or use `Sink.actorRefWithAck`. - -Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is -materialized by `Source.actorRef`. - -### mapAsync + ask - -A nice way to delegate some processing of elements in a stream to an actor is to -use `ask` in `mapAsync`. The back-pressure of the stream is maintained by -the `CompletionStage` of the `ask` and the mailbox of the actor will not be filled with -more messages than the given `parallelism` of the `mapAsync` stage. - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #mapAsync-ask } - -Note that the messages received in the actor will be in the same order as -the stream elements, i.e. the `parallelism` does not change the ordering -of the messages. There is a performance advantage of using parallelism > 1 -even though the actor will only process one message at a time because then there -is already a message in the mailbox when the actor has completed previous -message. - -The actor must reply to the `getSender()` for each message from the stream. That -reply will complete the `CompletionStage` of the `ask` and it will be the element that -is emitted downstreams from `mapAsync`. - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #ask-actor } - -The stream can be completed with failure by sending `akka.actor.Status.Failure` -as reply from the actor. - -If the `ask` fails due to timeout the stream will be completed with -`TimeoutException` failure. If that is not desired outcome you can use `recover` -on the `ask` `CompletionStage`. - -If you don't care about the reply values and only use them as back-pressure signals you -can use `Sink.ignore` after the `mapAsync` stage and then actor is effectively a sink -of the stream. - -The same pattern can be used with @ref:[Actor routers](../routing.md). Then you -can use `mapAsyncUnordered` for better efficiency if you don't care about the -order of the emitted downstream elements (the replies). - -### Sink.actorRefWithAck - -The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. -First element is always *onInitMessage*, then stream is waiting for the given acknowledgement message -from the given actor which means that it is ready to process elements. It also requires the given acknowledgement -message after each stream element to make back-pressure work. - -If the target actor terminates the stream will be cancelled. When the stream is completed successfully the -given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with -failure a `akka.actor.Status.Failure` message will be sent to the destination actor. - -@@@ note - -Using `Sink.actorRef` or ordinary `tell` from a `map` or `foreach` stage means that there is -no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages -fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero -*mailbox-push-timeout-time* or use a rate limiting stage in front. It's often better to -use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though. - -@@@ - -### Source.queue - -`Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside -the stream). The elements will be buffered until the stream can process them. You can `offer` elements to -the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will -be buffered until request for demand is received. - -Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the -buffer is full. - -`SourceQueue.offer` returns `CompletionStage` which completes with -`QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with -`QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - -when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. - -When used from an actor you typically `pipe` the result of the `CompletionStage` back to the actor to -continue processing. - -### Source.actorRef - -Messages sent to the actor that is materialized by `Source.actorRef` will be emitted to the -stream if there is demand from downstream, otherwise they will be buffered until request for -demand is received. - -Depending on the defined `OverflowStrategy` it might drop elements if there is no space -available in the buffer. The strategy `OverflowStrategy.backpressure` is not supported -for this Source type, i.e. elements will be dropped if the buffer is filled by sending -at a rate that is faster than the stream can consume. You should consider using `Source.queue` -if you want a backpressured actor interface. - -The stream can be completed successfully by sending `akka.actor.PoisonPill` or -`akka.actor.Status.Success` to the actor reference. - -The stream can be completed with failure by sending `akka.actor.Status.Failure` to the -actor reference. - -The actor will be stopped when the stream is completed, failed or cancelled from downstream, -i.e. you can watch it to get notified when that happens. - -## Integrating with External Services - -Stream transformations and side effects involving external non-stream based services can be -performed with `mapAsync` or `mapAsyncUnordered`. - -For example, sending emails to the authors of selected tweets using an external -email service: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-server-send } - -We start with the tweet stream of authors: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #tweet-authors } - -Assume that we can lookup their email address using: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup } - -Transforming the stream of authors to a stream of email addresses by using the `lookupEmail` -service can be done with `mapAsync`: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync } - -Finally, sending the emails: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #send-emails } - -`mapAsync` is applying the given function that is calling out to the external service to -each of the elements as they pass through this processing step. The function returns a `CompletionStage` -and the value of that future will be emitted downstreams. The number of Futures -that shall run in parallel is given as the first argument to `mapAsync`. -These Futures may complete in any order, but the elements that are emitted -downstream are in the same order as received from upstream. - -That means that back-pressure works as expected. For example if the `emailServer.send` -is the bottleneck it will limit the rate at which incoming tweets are retrieved and -email addresses looked up. - -The final piece of this pipeline is to generate the demand that pulls the tweet -authors information through the emailing pipeline: we attach a `Sink.ignore` -which makes it all run. If our email process would return some interesting data -for further transformation then we would of course not ignore it but send that -result stream onwards for further processing or storage. - -Note that `mapAsync` preserves the order of the stream elements. In this example the order -is not important and then we can use the more efficient `mapAsyncUnordered`: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #external-service-mapAsyncUnordered } - -In the above example the services conveniently returned a `CompletionStage` of the result. -If that is not the case you need to wrap the call in a `CompletionStage`. If the service call -involves blocking you must also make sure that you run it on a dedicated execution context, to -avoid starvation and disturbance of other tasks in the system. - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #blocking-mapAsync } - -The configuration of the `"blocking-dispatcher"` may look something like: - -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #blocking-dispatcher-config } - -An alternative for blocking calls is to perform them in a `map` operation, still using a -dedicated dispatcher for that operation. - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #blocking-map } - -However, that is not exactly the same as `mapAsync`, since the `mapAsync` may run -several calls concurrently, but `map` performs them one at a time. - -For a service that is exposed as an actor, or if an actor is used as a gateway in front of an -external service, you can use `ask`: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #save-tweets } - -Note that if the `ask` is not completed within the given timeout the stream is completed with failure. -If that is not desired outcome you can use `recover` on the `ask` `CompletionStage`. - -### Illustrating ordering and parallelism - -Let us look at another example to get a better understanding of the ordering -and parallelism characteristics of `mapAsync` and `mapAsyncUnordered`. - -Several `mapAsync` and `mapAsyncUnordered` futures may run concurrently. -The number of concurrent futures are limited by the downstream demand. -For example, if 5 elements have been requested by downstream there will be at most 5 -futures in progress. - -`mapAsync` emits the future results in the same order as the input elements -were received. That means that completed results are only emitted downstream -when earlier results have been completed and emitted. One slow call will thereby -delay the results of all successive calls, even though they are completed before -the slow call. - -`mapAsyncUnordered` emits the future results as soon as they are completed, i.e. -it is possible that the elements are not emitted downstream in the same order as -received from upstream. One slow call will thereby not delay the results of faster -successive calls as long as there is downstream demand of several elements. - -Here is a fictive service that we can use to illustrate these aspects. - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #sometimes-slow-service } - -Elements starting with a lower case character are simulated to take longer time -to process. - -Here is how we can use it with `mapAsync`: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #sometimes-slow-mapAsync } - -The output may look like this: - -``` -before: a -before: B -before: C -before: D -running: a (1) -running: B (2) -before: e -running: C (3) -before: F -running: D (4) -before: g -before: H -completed: C (3) -completed: B (2) -completed: D (1) -completed: a (0) -after: A -after: B -running: e (1) -after: C -after: D -running: F (2) -before: i -before: J -running: g (3) -running: H (4) -completed: H (2) -completed: F (3) -completed: e (1) -completed: g (0) -after: E -after: F -running: i (1) -after: G -after: H -running: J (2) -completed: J (1) -completed: i (0) -after: I -after: J -``` - -Note that `after` lines are in the same order as the `before` lines even -though elements are `completed` in a different order. For example `H` -is `completed` before `g`, but still emitted afterwards. - -The numbers in parenthesis illustrates how many calls that are in progress at -the same time. Here the downstream demand and thereby the number of concurrent -calls are limited by the buffer size (4) of the `ActorMaterializerSettings`. - -Here is how we can use the same service with `mapAsyncUnordered`: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #sometimes-slow-mapAsyncUnordered } - -The output may look like this: - -``` -before: a -before: B -before: C -before: D -running: a (1) -running: B (2) -before: e -running: C (3) -before: F -running: D (4) -before: g -before: H -completed: B (3) -completed: C (1) -completed: D (2) -after: B -after: D -running: e (2) -after: C -running: F (3) -before: i -before: J -completed: F (2) -after: F -running: g (3) -running: H (4) -completed: H (3) -after: H -completed: a (2) -after: A -running: i (3) -running: J (4) -completed: J (3) -after: J -completed: e (2) -after: E -completed: g (1) -after: G -completed: i (0) -after: I -``` - -Note that `after` lines are not in the same order as the `before` lines. For example -`H` overtakes the slow `G`. - -The numbers in parenthesis illustrates how many calls that are in progress at -the same time. Here the downstream demand and thereby the number of concurrent -calls are limited by the buffer size (4) of the `ActorMaterializerSettings`. - - -## Integrating with Reactive Streams - -[Reactive Streams](http://reactive-streams.org/) defines a standard for asynchronous stream processing with non-blocking -back pressure. It makes it possible to plug together stream libraries that adhere to the standard. -Akka Streams is one such library. - -An incomplete list of other implementations: - - * [Reactor (1.1+)](https://github.com/reactor/reactor) - * [RxJava](https://github.com/ReactiveX/RxJavaReactiveStreams) - * [Ratpack](http://www.ratpack.io/manual/current/streams.html) - * [Slick](http://slick.lightbend.com) - -The two most important interfaces in Reactive Streams are the `Publisher` and `Subscriber`. - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #imports } - -Let us assume that a library provides a publisher of tweets: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #tweets-publisher } - -and another library knows how to store author handles in a database: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #author-storage-subscriber } - -Using an Akka Streams `Flow` we can transform the stream and connect those: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #authors #connect-all } - -The `Publisher` is used as an input `Source` to the flow and the -`Subscriber` is used as an output `Sink`. - -A `Flow` can also be also converted to a `RunnableGraph[Processor[In, Out]]` which -materializes to a `Processor` when `run()` is called. `run()` itself can be called multiple -times, resulting in a new `Processor` instance each time. - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #flow-publisher-subscriber } - -A publisher can be connected to a subscriber with the `subscribe` method. - -It is also possible to expose a `Source` as a `Publisher` -by using the Publisher-`Sink`: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #source-publisher } - -A publisher that is created with `Sink.asPublisher(AsPublisher.WITHOUT_FANOUT)` supports only a single subscription. -Additional subscription attempts will be rejected with an `IllegalStateException`. - -A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #author-alert-subscriber #author-storage-subscriber } - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #source-fanoutPublisher } - -The input buffer size of the stage controls how far apart the slowest subscriber can be from the fastest subscriber -before slowing down the stream. - -To make the picture complete, it is also possible to expose a `Sink` as a `Subscriber` -by using the Subscriber-`Source`: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #sink-subscriber } - -It is also possible to use re-wrap `Processor` instances as a `Flow` by -passing a factory function that will create the `Processor` instances: - -@@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #use-processor } - -Please note that a factory is necessary to achieve reusability of the resulting `Flow`. - -### Implementing Reactive Streams Publisher or Subscriber - -As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and -any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you -implement Reactive Streams integrations with built-in stages or @ref:[custom stages](stream-customize.md). - -For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are -provided to support implementing Reactive Streams `Publisher` and `Subscriber` with -an `Actor`. - -These can be consumed by other Reactive Stream libraries or used as an Akka Streams `Source` or `Sink`. - -@@@ warning - -`ActorPublisher` and `ActorSubscriber` will probably be deprecated in future versions of Akka. - -@@@ - -@@@ warning - -`ActorPublisher` and `ActorSubscriber` cannot be used with remote actors, -because if signals of the Reactive Streams protocol (e.g. `request`) are lost the -the stream may deadlock. - -@@@ - -#### ActorPublisher - -@@@ warning - -**Deprecation warning:** `ActorPublisher` is deprecated in favour of the vastly more -type-safe and safe to implement `akka.stream.stage.GraphStage`. It can also -expose a "stage actor ref" is needed to be addressed as-if an Actor. -Custom stages implemented using `GraphStage` are also automatically fusable. - -To learn more about implementing custom stages using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage). - -@@@ - -Extend `akka.stream.actor.AbstractActorPublisher` to implement a -stream publisher that keeps track of the subscription life cycle and requested elements. - -Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: - -@@snip [ActorPublisherDocTest.java]($code$/java/jdocs/stream/ActorPublisherDocTest.java) { #job-manager } - -You send elements to the stream by calling `onNext`. You are allowed to send as many -elements as have been requested by the stream subscriber. This amount can be inquired with -`totalDemand`. It is only allowed to use `onNext` when `isActive` and `totalDemand>0`, -otherwise `onNext` will throw `IllegalStateException`. - -When the stream subscriber requests more elements the `ActorPublisherMessage.Request` message -is delivered to this actor, and you can act on that event. The `totalDemand` -is updated automatically. - -When the stream subscriber cancels the subscription the `ActorPublisherMessage.Cancel` message -is delivered to this actor. After that subsequent calls to `onNext` will be ignored. - -You can complete the stream by calling `onComplete`. After that you are not allowed to -call `onNext`, `onError` and `onComplete`. - -You can terminate the stream with failure by calling `onError`. After that you are not allowed to -call `onNext`, `onError` and `onComplete`. - -If you suspect that this `AbstractActorPublisher` may never get subscribed to, you can override the `subscriptionTimeout` -method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when -the timeout triggers via an `ActorPublisherMessage.SubscriptionTimeoutExceeded` message and MUST then perform -cleanup and stop itself. - -If the actor is stopped the stream will be completed, unless it was not already terminated with -failure, completed or canceled. - -More detailed information can be found in the API documentation. - -This is how it can be used as input `Source` to a `Flow`: - -@@snip [ActorPublisherDocTest.java]($code$/java/jdocs/stream/ActorPublisherDocTest.java) { #actor-publisher-usage } - -You can only attach one subscriber to this publisher. Use a `Broadcast`-element or -attach a `Sink.asPublisher(AsPublisher.WITH_FANOUT)` to enable multiple subscribers. - -#### ActorSubscriber - -@@@ warning - -**Deprecation warning:** `ActorSubscriber` is deprecated in favour of the vastly more -type-safe and safe to implement `akka.stream.stage.GraphStage`. It can also -expose a "stage actor ref" is needed to be addressed as-if an Actor. -Custom stages implemented using `GraphStage` are also automatically fusable. - -To learn more about implementing custom stages using it refer to @ref:[Custom processing with GraphStage](../stream/stream-customize.md#graphstage). - -@@@ - -Extend `akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with -full control of stream back pressure. It will receive -`ActorSubscriberMessage.OnNext`, `ActorSubscriberMessage.OnComplete` and `ActorSubscriberMessage.OnError` -messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. - -Here is an example of such an actor. It dispatches incoming jobs to child worker actors: - -@@snip [ActorSubscriberDocTest.java]($code$/java/jdocs/stream/ActorSubscriberDocTest.java) { #worker-pool } - -Subclass must define the `RequestStrategy` to control stream back pressure. -After each incoming message the `AbstractActorSubscriber` will automatically invoke -the `RequestStrategy.requestDemand` and propagate the returned demand to the stream. - - * The provided `WatermarkRequestStrategy` is a good strategy if the actor performs work itself. - * The provided `MaxInFlightRequestStrategy` is useful if messages are queued internally or -delegated to other actors. - * You can also implement a custom `RequestStrategy` or call `request` manually together with -`ZeroRequestStrategy` or some other strategy. In that case -you must also call `request` when the actor is started or when it is ready, otherwise -it will not receive any elements. - -More detailed information can be found in the API documentation. - -This is how it can be used as output `Sink` to a `Flow`: - -@@snip [ActorSubscriberDocTest.java]($code$/java/jdocs/stream/ActorSubscriberDocTest.java) { #actor-subscriber-usage } diff --git a/akka-docs/src/main/paradox/java/stream/stream-integrations.md b/akka-docs/src/main/paradox/java/stream/stream-integrations.md new file mode 120000 index 0000000000..41eaaa72fb --- /dev/null +++ b/akka-docs/src/main/paradox/java/stream/stream-integrations.md @@ -0,0 +1 @@ +../../scala/stream/stream-integrations.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/scala/stream/stream-integrations.md b/akka-docs/src/main/paradox/scala/stream/stream-integrations.md index b66be89b09..6fda3f66fc 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-integrations.md @@ -12,10 +12,14 @@ materialized by `Source.actorRef`. A nice way to delegate some processing of elements in a stream to an actor is to use `ask` in `mapAsync`. The back-pressure of the stream is maintained by -the `Future` of the `ask` and the mailbox of the actor will not be filled with +the @scala[`Future`]@java[`CompletionStage`] of the `ask` and the mailbox of the actor will not be filled with more messages than the given `parallelism` of the `mapAsync` stage. -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #mapAsync-ask } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #mapAsync-ask } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #mapAsync-ask } Note that the messages received in the actor will be in the same order as the stream elements, i.e. the `parallelism` does not change the ordering @@ -24,24 +28,28 @@ even though the actor will only process one message at a time because then there is already a message in the mailbox when the actor has completed previous message. -The actor must reply to the `sender()` for each message from the stream. That -reply will complete the `Future` of the `ask` and it will be the element that +The actor must reply to the @scala[`sender()`]@java[`getSender()`] for each message from the stream. That +reply will complete the @scala[`Future`]@java[`CompletionStage`] of the `ask` and it will be the element that is emitted downstreams from `mapAsync`. -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #ask-actor } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #ask-actor } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #ask-actor } The stream can be completed with failure by sending `akka.actor.Status.Failure` as reply from the actor. If the `ask` fails due to timeout the stream will be completed with `TimeoutException` failure. If that is not desired outcome you can use `recover` -on the `ask` `Future`. +on the `ask` @scala[`Future`]@java[`CompletionStage`]. If you don't care about the reply values and only use them as back-pressure signals you can use `Sink.ignore` after the `mapAsync` stage and then actor is effectively a sink of the stream. -The same pattern can be used with @ref:[Actor routers](../routing.md). Then you +The same pattern can be used with @ref[Actor routers](../routing.md). Then you can use `mapAsyncUnordered` for better efficiency if you don't care about the order of the emitted downstream elements (the replies). @@ -76,12 +84,12 @@ be buffered until request for demand is received. Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the buffer is full. -`SourceQueue.offer` returns `Future[QueueOfferResult]` which completes with `QueueOfferResult.Enqueued` +`SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`] which completes with `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. -When used from an actor you typically `pipe` the result of the `Future` back to the actor to +When used from an actor you typically `pipe` the result of the @scala[`Future`]@java[`CompletionStage`] back to the actor to continue processing. ### Source.actorRef @@ -113,27 +121,47 @@ performed with `mapAsync` or `mapAsyncUnordered`. For example, sending emails to the authors of selected tweets using an external email service: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-server-send } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-server-send } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-server-send } We start with the tweet stream of authors: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #tweet-authors } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #tweet-authors} + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #tweet-authors } Assume that we can lookup their email address using: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-address-lookup } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-address-lookup } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup } Transforming the stream of authors to a stream of email addresses by using the `lookupEmail` service can be done with `mapAsync`: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-addresses-mapAsync } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-addresses-mapAsync } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync } Finally, sending the emails: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #send-emails } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #send-emails } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #send-emails } `mapAsync` is applying the given function that is calling out to the external service to -each of the elements as they pass through this processing step. The function returns a `Future` +each of the elements as they pass through this processing step. The function returns a @scala[`Future`]@java[`CompletionStage`] and the value of that future will be emitted downstreams. The number of Futures that shall run in parallel is given as the first argument to `mapAsync`. These Futures may complete in any order, but the elements that are emitted @@ -152,14 +180,22 @@ result stream onwards for further processing or storage. Note that `mapAsync` preserves the order of the stream elements. In this example the order is not important and then we can use the more efficient `mapAsyncUnordered`: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #external-service-mapAsyncUnordered } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #external-service-mapAsyncUnordered } -In the above example the services conveniently returned a `Future` of the result. -If that is not the case you need to wrap the call in a `Future`. If the service call +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #external-service-mapAsyncUnordered } + +In the above example the services conveniently returned a @scala[`Future`]@java[`CompletionStage`] of the result. +If that is not the case you need to wrap the call in a @scala[`Future`]@java[`CompletionStage`]. If the service call involves blocking you must also make sure that you run it on a dedicated execution context, to avoid starvation and disturbance of other tasks in the system. -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #blocking-mapAsync } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #blocking-mapAsync } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #blocking-mapAsync } The configuration of the `"blocking-dispatcher"` may look something like: @@ -168,7 +204,11 @@ The configuration of the `"blocking-dispatcher"` may look something like: An alternative for blocking calls is to perform them in a `map` operation, still using a dedicated dispatcher for that operation. -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #blocking-map } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #blocking-map } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #blocking-map } However, that is not exactly the same as `mapAsync`, since the `mapAsync` may run several calls concurrently, but `map` performs them one at a time. @@ -176,10 +216,14 @@ several calls concurrently, but `map` performs them one at a time. For a service that is exposed as an actor, or if an actor is used as a gateway in front of an external service, you can use `ask`: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #save-tweets } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #save-tweets } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #save-tweets } Note that if the `ask` is not completed within the given timeout the stream is completed with failure. -If that is not desired outcome you can use `recover` on the `ask` `Future`. +If that is not desired outcome you can use `recover` on the `ask` @scala[`Future`]@java[`CompletionStage`]. ### Illustrating ordering and parallelism @@ -204,14 +248,22 @@ successive calls as long as there is downstream demand of several elements. Here is a fictive service that we can use to illustrate these aspects. -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #sometimes-slow-service } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #sometimes-slow-service } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #sometimes-slow-service } Elements starting with a lower case character are simulated to take longer time to process. Here is how we can use it with `mapAsync`: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #sometimes-slow-mapAsync } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #sometimes-slow-mapAsync } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #sometimes-slow-mapAsync } The output may look like this: @@ -268,7 +320,11 @@ calls are limited by the buffer size (4) of the `ActorMaterializerSettings`. Here is how we can use the same service with `mapAsyncUnordered`: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #sometimes-slow-mapAsyncUnordered } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #sometimes-slow-mapAsyncUnordered } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #sometimes-slow-mapAsyncUnordered } The output may look like this: @@ -338,19 +394,35 @@ An incomplete list of other implementations: The two most important interfaces in Reactive Streams are the `Publisher` and `Subscriber`. -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #imports } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #imports } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #imports } Let us assume that a library provides a publisher of tweets: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #tweets-publisher } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #tweets-publisher } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #tweets-publisher } and another library knows how to store author handles in a database: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #author-storage-subscriber } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #author-storage-subscriber } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #author-storage-subscriber } Using an Akka Streams `Flow` we can transform the stream and connect those: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #authors #connect-all } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #authors #connect-all } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #authors #connect-all } The `Publisher` is used as an input `Source` to the flow and the `Subscriber` is used as an output `Sink`. @@ -359,23 +431,40 @@ A `Flow` can also be also converted to a `RunnableGraph[Processor[In, Out]]` whi materializes to a `Processor` when `run()` is called. `run()` itself can be called multiple times, resulting in a new `Processor` instance each time. -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #flow-publisher-subscriber } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #flow-publisher-subscriber } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #flow-publisher-subscriber } A publisher can be connected to a subscriber with the `subscribe` method. It is also possible to expose a `Source` as a `Publisher` by using the Publisher-`Sink`: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #source-publisher } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #source-publisher } -A publisher that is created with `Sink.asPublisher(fanout = false)` supports only a single subscription. +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #source-publisher } + +A publisher that is created with @scala[`Sink.asPublisher(fanout = false)`]@java[`Sink.asPublisher(AsPublisher.WITHOUT_FANOUT)`] supports only a single subscription. Additional subscription attempts will be rejected with an `IllegalStateException`. A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #author-alert-subscriber #author-storage-subscriber } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #author-alert-subscriber #author-storage-subscriber } -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #source-fanoutPublisher } +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #author-alert-subscriber #author-storage-subscriber } + + +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #source-fanoutPublisher } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #source-fanoutPublisher } The input buffer size of the stage controls how far apart the slowest subscriber can be from the fastest subscriber before slowing down the stream. @@ -383,12 +472,20 @@ before slowing down the stream. To make the picture complete, it is also possible to expose a `Sink` as a `Subscriber` by using the Subscriber-`Source`: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #sink-subscriber } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #sink-subscriber } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #sink-subscriber } It is also possible to use re-wrap `Processor` instances as a `Flow` by passing a factory function that will create the `Processor` instances: -@@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #use-processor } +Scala +: @@snip [ReactiveStreamsDocSpec.scala]($code$/scala/docs/stream/ReactiveStreamsDocSpec.scala) { #use-processor } + +Java +: @@snip [ReactiveStreamsDocTest.java]($code$/java/jdocs/stream/ReactiveStreamsDocTest.java) { #use-processor } Please note that a factory is necessary to achieve reusability of the resulting `Flow`. @@ -396,7 +493,7 @@ Please note that a factory is necessary to achieve reusability of the resulting As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you -implement Reactive Streams integrations with built-in stages or @ref:[custom stages](stream-customize.md). +implement Reactive Streams integrations with built-in stages or @ref[custom stages](stream-customize.md). For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are provided to support implementing Reactive Streams `Publisher` and `Subscriber` with @@ -427,16 +524,20 @@ type-safe and safe to implement `akka.stream.stage.GraphStage`. It can also expose a "stage actor ref" is needed to be addressed as-if an Actor. Custom stages implemented using `GraphStage` are also automatically fusable. -To learn more about implementing custom stages using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage). +To learn more about implementing custom stages using it refer to @ref[Custom processing with GraphStage](stream-customize.md#graphstage). @@@ -Extend/mixin `akka.stream.actor.ActorPublisher` in your `Actor` to make it a +Extend @scala[`akka.stream.actor.ActorPublisher` in your `Actor` to make it]@java[`akka.stream.actor.AbstractActorPublisher` to implement] a stream publisher that keeps track of the subscription life cycle and requested elements. Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: -@@snip [ActorPublisherDocSpec.scala]($code$/scala/docs/stream/ActorPublisherDocSpec.scala) { #job-manager } +Scala +: @@snip [ActorPublisherDocSpec.scala]($code$/scala/docs/stream/ActorPublisherDocSpec.scala) { #job-manager } + +Java +: @@snip [ActorPublisherDocTest.java]($code$/java/jdocs/stream/ActorPublisherDocTest.java) { #job-manager } You send elements to the stream by calling `onNext`. You are allowed to send as many elements as have been requested by the stream subscriber. This amount can be inquired with @@ -456,7 +557,7 @@ call `onNext`, `onError` and `onComplete`. You can terminate the stream with failure by calling `onError`. After that you are not allowed to call `onNext`, `onError` and `onComplete`. -If you suspect that this `ActorPublisher` may never get subscribed to, you can override the `subscriptionTimeout` +If you suspect that this @scala[`ActorPublisher`]@java[`AbstractActorPublisher`] may never get subscribed to, you can override the `subscriptionTimeout` method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when the timeout triggers via an `ActorPublisherMessage.SubscriptionTimeoutExceeded` message and MUST then perform cleanup and stop itself. @@ -468,10 +569,17 @@ More detailed information can be found in the API documentation. This is how it can be used as input `Source` to a `Flow`: -@@snip [ActorPublisherDocSpec.scala]($code$/scala/docs/stream/ActorPublisherDocSpec.scala) { #actor-publisher-usage } +Scala +: @@snip [ActorPublisherDocSpec.scala]($code$/scala/docs/stream/ActorPublisherDocSpec.scala) { #actor-publisher-usage } -A publisher that is created with `Sink.asPublisher` supports a specified number of subscribers. Additional -subscription attempts will be rejected with an `IllegalStateException`. +Java +: @@snip [ActorPublisherDocTest.java]($code$/java/jdocs/stream/ActorPublisherDocTest.java) { #actor-publisher-usage } + +@scala[A publisher that is created with `Sink.asPublisher` supports a specified number of subscribers. Additional + subscription attempts will be rejected with an `IllegalStateException`. +]@java[You can only attach one subscriber to this publisher. Use a `Broadcast`-element or + attach a `Sink.asPublisher(AsPublisher.WITH_FANOUT)` to enable multiple subscribers. +] #### ActorSubscriber @@ -482,21 +590,25 @@ type-safe and safe to implement `akka.stream.stage.GraphStage`. It can also expose a "stage actor ref" is needed to be addressed as-if an Actor. Custom stages implemented using `GraphStage` are also automatically fusable. -To learn more about implementing custom stages using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage). +To learn more about implementing custom stages using it refer to @ref[Custom processing with GraphStage](stream-customize.md#graphstage). @@@ -Extend/mixin `akka.stream.actor.ActorSubscriber` in your `Actor` to make it a +Extend @scala[`akka.stream.actor.ActorSubscriber` in your `Actor` to make it]@java[`akka.stream.actor.AbstractActorSubscriber` to make your class] a stream subscriber with full control of stream back pressure. It will receive `ActorSubscriberMessage.OnNext`, `ActorSubscriberMessage.OnComplete` and `ActorSubscriberMessage.OnError` messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. Here is an example of such an actor. It dispatches incoming jobs to child worker actors: -@@snip [ActorSubscriberDocSpec.scala]($code$/scala/docs/stream/ActorSubscriberDocSpec.scala) { #worker-pool } +Scala +: @@snip [ActorSubscriberDocSpec.scala]($code$/scala/docs/stream/ActorSubscriberDocSpec.scala) { #worker-pool } + +Java +: @@snip [ActorSubscriberDocTest.java]($code$/java/jdocs/stream/ActorSubscriberDocTest.java) { #worker-pool } Subclass must define the `RequestStrategy` to control stream back pressure. -After each incoming message the `ActorSubscriber` will automatically invoke +After each incoming message the @scala[`ActorSubscriber`]@java[`AbstractActorSubscriber`] will automatically invoke the `RequestStrategy.requestDemand` and propagate the returned demand to the stream. * The provided `WatermarkRequestStrategy` is a good strategy if the actor performs work itself. @@ -511,4 +623,8 @@ More detailed information can be found in the API documentation. This is how it can be used as output `Sink` to a `Flow`: -@@snip [ActorSubscriberDocSpec.scala]($code$/scala/docs/stream/ActorSubscriberDocSpec.scala) { #actor-subscriber-usage } +Scala +: @@snip [ActorSubscriberDocSpec.scala]($code$/scala/docs/stream/ActorSubscriberDocSpec.scala) { #actor-subscriber-usage } + +Java +: @@snip [ActorSubscriberDocTest.java]($code$/java/jdocs/stream/ActorSubscriberDocTest.java) { #actor-subscriber-usage }