Adds mapAsync(Unordered) operator samples (#29600)

This commit is contained in:
Ignasi Marimon-Clos 2020-09-16 12:24:49 +02:00 committed by GitHub
parent 8732b69446
commit ffc4235980
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 318 additions and 0 deletions

View file

@ -19,6 +19,56 @@ order will be kept when results complete. For use cases where order does not mat
If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream.
If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)
## Examples
Imagine you are consuming messages from a broker. These messages represent business events produced on a service upstream. In that case, you want to consume the messages in order and one at a time:
Scala
: @@snip [MapAsyncs.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala) { #mapasync-strict-order }
Java
: @@snip [MapAsyncs.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java) { #mapasync-strict-order }
When running the stream above the logging output would look like:
```
[...]
Processing event numner Event(33)...
Completed processing 33
`mapAsync` emitted event number: 33
Processing event numner Event(34)...
Completed processing 34
`mapAsync` emitted event number: 34
[...]
```
If, instead, you may process information concurrently, but still emit the messages downstream in order, you may increase the parallelism. In this case, the events could some IoT payload with weather metrics, for example, where processing the data in strict ordering is not critical:
Scala
: @@snip [MapAsyncs.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala) { #mapasync-concurrent }
Java
: @@snip [MapAsyncs.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java) { #mapasync-concurrent }
In this case, the logging soon shows how processing of the events happens concurrently which may break the ordering. Still, the stage emits the events back in the correct order:
```
[...]
Processing event numner Event(15)...
Processing event numner Event(16)...
Completed processing 16
Processing event numner Event(17)...
Completed processing 17
Completed processing 15
`mapAsync` emitted event number: 15
`mapAsync` emitted event number: 16
Processing event numner Event(18)...
`mapAsync` emitted event number: 17
[...]
```
See also @ref[mapAsyncUnordered](mapAsyncUnordered.md#examples).
## Reactive Streams semantics
@@@div { .callout }

View file

@ -18,6 +18,40 @@ that triggered them.
If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream.
If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)
## Examples
Imagine you are consuming messages from a source, and you prioritize throughput over order (this could be uncorrelated messages so order is irrelevant). You may use the `mapAsyncUnordered` (so messages are emitted as soon as they've been processed) with some parallelism (so processing happens concurrently) :
Scala
: @@snip [MapAsyncs.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala) { #mapasyncunordered }
Java
: @@snip [MapAsyncs.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java) { #mapasyncunordered }
When running the stream above the logging output would look like:
```
[...]
Processing event numner Event(27)...
Completed processing 27
`mapAsyncUnordered` emitted event number: 27
Processing event numner Event(28)...
Completed processing 22
`mapAsyncUnordered` emitted event number: 22
Processing event numner Event(29)...
Completed processing 26
`mapAsyncUnordered` emitted event number: 26
Processing event numner Event(30)...
Completed processing 30
`mapAsyncUnordered` emitted event number: 30
Processing event numner Event(31)...
Completed processing 31
`mapAsyncUnordered` emitted event number: 31
[...]
```
See @ref[mapAsync](mapAsync.md#examples) for a variant with ordering guarantees.
## Reactive Streams semantics
@@@div { .callout }

View file

@ -0,0 +1,118 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
public class MapAsyncs {
private final Random random = new Random();
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
private final Source<Event, NotUsed> events =
Source.fromIterator(() -> Stream.iterate(1, i -> i + 1).iterator())
.throttle(1, Duration.ofMillis(50))
.map(Event::new);
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
private final ActorSystem system = ActorSystem.create("mapAsync-operator-examples");
public MapAsyncs() {}
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
public CompletionStage<Integer> eventHandler(Event in) throws InterruptedException {
System.out.println("Processing event number " + in + "...");
// ...
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
CompletionStage<Integer> cs;
if (random.nextInt(5) == 0) {
cs =
Patterns.after(
Duration.ofMillis(500),
system,
() -> CompletableFuture.completedFuture(in.sequenceNumber));
} else {
cs = CompletableFuture.completedFuture(in.sequenceNumber);
}
return cs.thenApply(
i -> {
System.out.println("Completed processing " + i.intValue());
return i;
});
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
}
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
private void runStrictOrder() {
// #mapasync-strict-order
events
.mapAsync(1, this::eventHandler)
.map(in -> "`mapSync` emitted event number " + in.intValue())
.runWith(Sink.foreach(str -> System.out.println(str)), system);
// #mapasync-strict-order
}
private void run() {
// #mapasync-concurrent
events
.mapAsync(10, this::eventHandler)
.map(in -> "`mapSync` emitted event number " + in.intValue())
.runWith(Sink.foreach(str -> System.out.println(str)), system);
// #mapasync-concurrent
}
private void runUnordered() {
// #mapasyncunordered
events
.mapAsyncUnordered(10, this::eventHandler)
.map(in -> "`mapSync` emitted event number " + in.intValue())
.runWith(Sink.foreach(str -> System.out.println(str)), system);
// #mapasyncunordered
}
public static void main(String[] args) {
new MapAsyncs().run();
}
static class Event {
public final int sequenceNumber;
public Event(int sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
@Override
public String toString() {
return "Event(" + sequenceNumber + ')';
}
}
}

View file

@ -0,0 +1,116 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.Timeout
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
/**
*
*/
object CommonMapAsync {
case class Event(sequenceNumber: Int)
implicit val sys: ActorSystem = ActorSystem("mapAsync-stream")
implicit val exCtx: ExecutionContextExecutor = sys.dispatcher
implicit val timeout: Timeout = 3.seconds
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
val events: Source[Event, NotUsed] = //...
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
Source.fromIterator(() => Iterator.from(1)).throttle(1, 50.millis).map { in =>
Event(in)
}
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
def eventHandler(event: Event): Future[Int] = {
println(s"Processing event $event...")
//...
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
val result =
if (Random.nextInt(5) == 0) {
akka.pattern.after(500.millis)(Future.successful(event.sequenceNumber))
} else {
Future.successful(event.sequenceNumber)
}
result.map { x =>
println(s"Completed processing $x")
x
}
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
}
// #mapasync-strict-order
// #mapasync-concurrent
// #mapasyncunordered
}
object MapAsyncStrictOrder extends App {
import CommonMapAsync._
// #mapasync-strict-order
events
.mapAsync(1) { in =>
eventHandler(in)
}
.map { in =>
println(s"`mapAsync` emitted event number: $in")
}
// #mapasync-strict-order
.runWith(Sink.ignore)
}
object MapAsync extends App {
import CommonMapAsync._
// #mapasync-concurrent
events
.mapAsync(3) { in =>
eventHandler(in)
}
.map { in =>
println(s"`mapAsync` emitted event number: $in")
}
// #mapasync-concurrent
.runWith(Sink.ignore)
}
object MapAsyncUnordered extends App {
import CommonMapAsync._
// #mapasyncunordered
events
.mapAsyncUnordered(3) { in =>
eventHandler(in)
}
.map { in =>
println(s"`mapAsyncUnordered` emitted event number: $in")
}
// #mapasyncunordered
.runWith(Sink.ignore)
}