diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md index 8b6e40119f..26a92fdc5d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md @@ -19,6 +19,56 @@ order will be kept when results complete. For use cases where order does not mat If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream. If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) +## Examples + +Imagine you are consuming messages from a broker. These messages represent business events produced on a service upstream. In that case, you want to consume the messages in order and one at a time: + +Scala +: @@snip [MapAsyncs.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala) { #mapasync-strict-order } + +Java +: @@snip [MapAsyncs.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java) { #mapasync-strict-order } + +When running the stream above the logging output would look like: + +``` +[...] +Processing event numner Event(33)... +Completed processing 33 +`mapAsync` emitted event number: 33 +Processing event numner Event(34)... +Completed processing 34 +`mapAsync` emitted event number: 34 +[...] +``` + +If, instead, you may process information concurrently, but still emit the messages downstream in order, you may increase the parallelism. In this case, the events could some IoT payload with weather metrics, for example, where processing the data in strict ordering is not critical: + +Scala +: @@snip [MapAsyncs.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala) { #mapasync-concurrent } + +Java +: @@snip [MapAsyncs.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java) { #mapasync-concurrent } + +In this case, the logging soon shows how processing of the events happens concurrently which may break the ordering. Still, the stage emits the events back in the correct order: + +``` +[...] +Processing event numner Event(15)... +Processing event numner Event(16)... +Completed processing 16 +Processing event numner Event(17)... +Completed processing 17 +Completed processing 15 +`mapAsync` emitted event number: 15 +`mapAsync` emitted event number: 16 +Processing event numner Event(18)... +`mapAsync` emitted event number: 17 +[...] +``` + +See also @ref[mapAsyncUnordered](mapAsyncUnordered.md#examples). + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md index 9eaad32aa4..6e72daf62a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md @@ -18,6 +18,40 @@ that triggered them. If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream. If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) +## Examples + +Imagine you are consuming messages from a source, and you prioritize throughput over order (this could be uncorrelated messages so order is irrelevant). You may use the `mapAsyncUnordered` (so messages are emitted as soon as they've been processed) with some parallelism (so processing happens concurrently) : + +Scala +: @@snip [MapAsyncs.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala) { #mapasyncunordered } + +Java +: @@snip [MapAsyncs.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java) { #mapasyncunordered } + +When running the stream above the logging output would look like: + +``` +[...] +Processing event numner Event(27)... +Completed processing 27 +`mapAsyncUnordered` emitted event number: 27 +Processing event numner Event(28)... +Completed processing 22 +`mapAsyncUnordered` emitted event number: 22 +Processing event numner Event(29)... +Completed processing 26 +`mapAsyncUnordered` emitted event number: 26 +Processing event numner Event(30)... +Completed processing 30 +`mapAsyncUnordered` emitted event number: 30 +Processing event numner Event(31)... +Completed processing 31 +`mapAsyncUnordered` emitted event number: 31 +[...] +``` + +See @ref[mapAsync](mapAsync.md#examples) for a variant with ordering guarantees. + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java new file mode 100644 index 0000000000..1024d6da3a --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MapAsyncs.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.stream.Stream; + +public class MapAsyncs { + + private final Random random = new Random(); + + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + + private final Source events = + Source.fromIterator(() -> Stream.iterate(1, i -> i + 1).iterator()) + .throttle(1, Duration.ofMillis(50)) + .map(Event::new); + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + + private final ActorSystem system = ActorSystem.create("mapAsync-operator-examples"); + + public MapAsyncs() {} + + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + + public CompletionStage eventHandler(Event in) throws InterruptedException { + System.out.println("Processing event number " + in + "..."); + // ... + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + CompletionStage cs; + if (random.nextInt(5) == 0) { + cs = + Patterns.after( + Duration.ofMillis(500), + system, + () -> CompletableFuture.completedFuture(in.sequenceNumber)); + } else { + cs = CompletableFuture.completedFuture(in.sequenceNumber); + } + return cs.thenApply( + i -> { + System.out.println("Completed processing " + i.intValue()); + return i; + }); + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + } + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + + private void runStrictOrder() { + // #mapasync-strict-order + + events + .mapAsync(1, this::eventHandler) + .map(in -> "`mapSync` emitted event number " + in.intValue()) + .runWith(Sink.foreach(str -> System.out.println(str)), system); + // #mapasync-strict-order + } + + private void run() { + // #mapasync-concurrent + + events + .mapAsync(10, this::eventHandler) + .map(in -> "`mapSync` emitted event number " + in.intValue()) + .runWith(Sink.foreach(str -> System.out.println(str)), system); + // #mapasync-concurrent + } + + private void runUnordered() { + // #mapasyncunordered + + events + .mapAsyncUnordered(10, this::eventHandler) + .map(in -> "`mapSync` emitted event number " + in.intValue()) + .runWith(Sink.foreach(str -> System.out.println(str)), system); + // #mapasyncunordered + } + + public static void main(String[] args) { + new MapAsyncs().run(); + } + + static class Event { + public final int sequenceNumber; + + public Event(int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public String toString() { + return "Event(" + sequenceNumber + ')'; + } + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala new file mode 100644 index 0000000000..e08919e15f --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MapAsyncs.scala @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.Timeout + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Random + +/** + * + */ +object CommonMapAsync { + case class Event(sequenceNumber: Int) + + implicit val sys: ActorSystem = ActorSystem("mapAsync-stream") + implicit val exCtx: ExecutionContextExecutor = sys.dispatcher + implicit val timeout: Timeout = 3.seconds + + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + + val events: Source[Event, NotUsed] = //... + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + Source.fromIterator(() => Iterator.from(1)).throttle(1, 50.millis).map { in => + Event(in) + } + + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + + def eventHandler(event: Event): Future[Int] = { + println(s"Processing event $event...") + //... + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + val result = + if (Random.nextInt(5) == 0) { + akka.pattern.after(500.millis)(Future.successful(event.sequenceNumber)) + } else { + Future.successful(event.sequenceNumber) + } + result.map { x => + println(s"Completed processing $x") + x + } + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + } + // #mapasync-strict-order + // #mapasync-concurrent + // #mapasyncunordered + +} + +object MapAsyncStrictOrder extends App { + import CommonMapAsync._ + // #mapasync-strict-order + + events + .mapAsync(1) { in => + eventHandler(in) + } + .map { in => + println(s"`mapAsync` emitted event number: $in") + } + // #mapasync-strict-order + .runWith(Sink.ignore) + +} + +object MapAsync extends App { + import CommonMapAsync._ + // #mapasync-concurrent + + events + .mapAsync(3) { in => + eventHandler(in) + } + .map { in => + println(s"`mapAsync` emitted event number: $in") + } + // #mapasync-concurrent + .runWith(Sink.ignore) + +} + +object MapAsyncUnordered extends App { + import CommonMapAsync._ + // #mapasyncunordered + + events + .mapAsyncUnordered(3) { in => + eventHandler(in) + } + .map { in => + println(s"`mapAsyncUnordered` emitted event number: $in") + } + // #mapasyncunordered + .runWith(Sink.ignore) + +}