diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md index f944daac7a..44cbaffd30 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md @@ -4,18 +4,33 @@ Transform each input element into a `Source` whose elements are then flattened i @ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapMerge } - -@@@ +@apidoc[Flow.flatMapMerge](Flow) { scala="#flatMapMerge[T,M](breadth:Int,f:Out=%3Eakka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]" java="#flatMapMerge(int,akka.japi.function.Function)" } ## Description Transform each input element into a `Source` whose elements are then flattened into the output stream through -merging. The maximum number of merged sources has to be specified. +merging. The maximum number of merged sources has to be specified. When this is met `flatMapMerge` does not +request any more elements meaning that it back pressures until one of the existing `Source`s completes. +Order of the elements for each `Source` is preserved but there is no deterministic order between elements from +different active `Source`s. + +See also: @ref:[flatMapConcat](flatMapConcat.md) + +## Example + +In the following example `flatMapMerge` is used to create a `Source` for each incoming customerId. This could, for example, +be a calculation or a query to a database. There can be `breadth` active sources at any given time so +events for different customers could interleave in any order but events for the same customer will be in the +order emitted by the underlying `Source`; + +Scala +: @@snip [FlatMapMerge.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala) { #flatmap-merge } + +Java +: @@snip [FlatMapMerge.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java) { #flatmap-merge } + ## Reactive Streams semantics @@ -23,7 +38,7 @@ merging. The maximum number of merged sources has to be specified. **emits** when one of the currently consumed substreams has an element available -**backpressures** when downstream backpressures +**backpressures** when downstream backpressures or the max number of substreams is reached **completes** when upstream completes and all consumed substreams complete diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java new file mode 100644 index 0000000000..c0c8da7ecb --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Arrays; + +public class FlatMapMerge { + private static ActorSystem system = null; + + // #flatmap-merge + // e.g. could be a query to a database + private Source lookupCustomerEvents(String customerId) { + return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2")); + } + // #flatmap-merge + + void example() { + // #flatmap-merge + Source.from(Arrays.asList("customer-1", "customer-2")) + .flatMapMerge(10, this::lookupCustomerEvents) + .runForeach(System.out::println, system); + // prints - events from different customers could interleave + // customer-1-evt-1 + // customer-2-evt-1 + // customer-1-evt-2 + // customer-2-evt-2 + // #flatmap-merge + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala new file mode 100644 index 0000000000..33f6a3fe8f --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +object FlatMapMerge { + + implicit val system: ActorSystem = ActorSystem() + + // #flatmap-merge + val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2")) + + // e.g. could b a query to a database + def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = { + Source(List(s"$customerId-evt-1", s"$customerId-evt2")) + } + + source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println) + + // prints - events from different customers could interleave + // customer-1-evt-1 + // customer-2-evt-1 + // customer-1-evt-2 + // customer-2-evt-2 + // #flatmap-merge + +}