From 23ea1a4fed8ef9ebfc143267518818df06495c40 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Mon, 27 Apr 2020 15:46:16 +0100 Subject: [PATCH] Docs: flatMapConcat (#28584) --- .../operators/Source-or-Flow/flatMapConcat.md | 22 +++++++++--- .../operators/sourceorflow/FlatMapConcat.java | 35 +++++++++++++++++++ .../sourceorflow/FlatMapConcat.scala | 31 ++++++++++++++++ 3 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapConcat.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapConcat.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapConcat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapConcat.md index 7b71e962d5..27f7bcd030 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapConcat.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapConcat.md @@ -4,18 +4,32 @@ Transform each input element into a `Source` whose elements are then flattened i @ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators) -@@@div { .group-scala } ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapConcat } +@apidoc[Flow.flatMapConcat](Flow) { scala="#flatMapConcat[T,M](f:Out=%3akka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]" java="#flatMapConcat(akka.japi.function.Function)" } -@@@ +See also: @ref:[flatMapMerge](flatMapMerge.md) ## Description Transform each input element into a `Source` whose elements are then flattened into the output stream through -concatenation. This means each source is fully consumed before consumption of the next source starts. +concatenation. This means each source is fully consumed before consumption of the next source starts. + +## Example + +In the following example `flatMapConcat` is used to create a `Source` for each incoming customerId. This could be, for example, + a calculation or a query to a database. Each customer is then passed to `lookupCustomerEvents` which returns +a `Source`. All the events for a customer are delivered before moving to the next customer. + +Scala +: @@snip [FlatMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapConcat.scala) { #flatmap-concat} + +Java +: @@snip [FlatMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapConcat.java) { #flatmap-concat } + + + ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapConcat.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapConcat.java new file mode 100644 index 0000000000..b0ebadeaf6 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapConcat.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Arrays; + +public class FlatMapConcat { + private static ActorSystem system = null; + + // #flatmap-concat + // e.g. could be a query to a database + private Source lookupCustomerEvents(String customerId) { + return Source.from(Arrays.asList(customerId + "-event-1", customerId + "-event-2")); + } + // #flatmap-concat + + void example() { + // #flatmap-concat + Source.from(Arrays.asList("customer-1", "customer-2")) + .flatMapConcat(this::lookupCustomerEvents) + .runForeach(System.out::println, system); + // prints - events from each customer consecutively + // customer-1-event-1 + // customer-1-event-2 + // customer-2-event-1 + // customer-2-event-2 + // #flatmap-concat + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapConcat.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapConcat.scala new file mode 100644 index 0000000000..0438ed4713 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapConcat.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +object FlatMapConcat { + + implicit val system: ActorSystem = ActorSystem() + + // #flatmap-concat + val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2")) + + // e.g. could b a query to a database + def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = { + Source(List(s"$customerId-event-1", s"$customerId-event-2")) + } + + source.flatMapConcat(customerId => lookupCustomerEvents(customerId)).runForeach(println) + + // prints - events from each customer consecutively + // customer-1-event-1 + // customer-1-event-2 + // customer-2-event-1 + // customer-2-event-2 + // #flatmap-concat + +}