Operator doc for flatmap merge (#28488)
This commit is contained in:
parent
f891f17abd
commit
a8086e86e5
3 changed files with 89 additions and 7 deletions
|
|
@ -4,18 +4,33 @@ Transform each input element into a `Source` whose elements are then flattened i
|
||||||
|
|
||||||
@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
|
@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
|
||||||
|
|
||||||
@@@div { .group-scala }
|
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapMerge }
|
@apidoc[Flow.flatMapMerge](Flow) { scala="#flatMapMerge[T,M](breadth:Int,f:Out=%3Eakka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]" java="#flatMapMerge(int,akka.japi.function.Function)" }
|
||||||
|
|
||||||
@@@
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Transform each input element into a `Source` whose elements are then flattened into the output stream through
|
Transform each input element into a `Source` whose elements are then flattened into the output stream through
|
||||||
merging. The maximum number of merged sources has to be specified.
|
merging. The maximum number of merged sources has to be specified. When this is met `flatMapMerge` does not
|
||||||
|
request any more elements meaning that it back pressures until one of the existing `Source`s completes.
|
||||||
|
Order of the elements for each `Source` is preserved but there is no deterministic order between elements from
|
||||||
|
different active `Source`s.
|
||||||
|
|
||||||
|
See also: @ref:[flatMapConcat](flatMapConcat.md)
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
In the following example `flatMapMerge` is used to create a `Source` for each incoming customerId. This could, for example,
|
||||||
|
be a calculation or a query to a database. There can be `breadth` active sources at any given time so
|
||||||
|
events for different customers could interleave in any order but events for the same customer will be in the
|
||||||
|
order emitted by the underlying `Source`;
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [FlatMapMerge.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala) { #flatmap-merge }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [FlatMapMerge.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java) { #flatmap-merge }
|
||||||
|
|
||||||
|
|
||||||
## Reactive Streams semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
|
@ -23,7 +38,7 @@ merging. The maximum number of merged sources has to be specified.
|
||||||
|
|
||||||
**emits** when one of the currently consumed substreams has an element available
|
**emits** when one of the currently consumed substreams has an element available
|
||||||
|
|
||||||
**backpressures** when downstream backpressures
|
**backpressures** when downstream backpressures or the max number of substreams is reached
|
||||||
|
|
||||||
**completes** when upstream completes and all consumed substreams complete
|
**completes** when upstream completes and all consumed substreams complete
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.stream.operators.sourceorflow;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
public class FlatMapMerge {
|
||||||
|
private static ActorSystem system = null;
|
||||||
|
|
||||||
|
// #flatmap-merge
|
||||||
|
// e.g. could be a query to a database
|
||||||
|
private Source<String, NotUsed> lookupCustomerEvents(String customerId) {
|
||||||
|
return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2"));
|
||||||
|
}
|
||||||
|
// #flatmap-merge
|
||||||
|
|
||||||
|
void example() {
|
||||||
|
// #flatmap-merge
|
||||||
|
Source.from(Arrays.asList("customer-1", "customer-2"))
|
||||||
|
.flatMapMerge(10, this::lookupCustomerEvents)
|
||||||
|
.runForeach(System.out::println, system);
|
||||||
|
// prints - events from different customers could interleave
|
||||||
|
// customer-1-evt-1
|
||||||
|
// customer-2-evt-1
|
||||||
|
// customer-1-evt-2
|
||||||
|
// customer-2-evt-2
|
||||||
|
// #flatmap-merge
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.stream.operators.sourceorflow
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
|
||||||
|
object FlatMapMerge {
|
||||||
|
|
||||||
|
implicit val system: ActorSystem = ActorSystem()
|
||||||
|
|
||||||
|
// #flatmap-merge
|
||||||
|
val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2"))
|
||||||
|
|
||||||
|
// e.g. could b a query to a database
|
||||||
|
def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = {
|
||||||
|
Source(List(s"$customerId-evt-1", s"$customerId-evt2"))
|
||||||
|
}
|
||||||
|
|
||||||
|
source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println)
|
||||||
|
|
||||||
|
// prints - events from different customers could interleave
|
||||||
|
// customer-1-evt-1
|
||||||
|
// customer-2-evt-1
|
||||||
|
// customer-1-evt-2
|
||||||
|
// customer-2-evt-2
|
||||||
|
// #flatmap-merge
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue