diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapConcat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapConcat.md index 5059b9635b..8b02dd8b46 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapConcat.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapConcat.md @@ -19,6 +19,8 @@ This can be used to flatten collections into individual stream elements. Returning an empty iterable results in zero elements being passed downstream rather than the stream being cancelled. +See also @ref:[statefulMapConcat](statefulMapConcat.md) + ## Example The following takes a stream of integers and emits each element twice downstream. diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md index 7c8b5362fd..66c02212fb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md @@ -4,18 +4,55 @@ Transform each element into zero or more elements that are individually passed d @ref[Simple operators](../index.md#simple-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #statefulMapConcat } - -@@@ +@apidoc[Flow.statefulMapConcat](Flow) { scala="#statefulMapConcat[T](f:()=>Out=>scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]" java="#statefulMapConcat(akka.japi.function.Creator)" } ## Description Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that -the transformation function is created from a factory for every materialization of the flow. +the transformation function is created from a factory for every materialization of the flow. This makes it possible to create and +use mutable state for the operation, each new materialization of the stream will have its own state. + +For cases where no state is needed but only a way to emit zero or more elements for every incoming element you can use @ref:[mapConcat](mapConcat.md) + +## Examples + +In this first sample we keep a counter, and combine each element with an id that is unique for the stream materialization +(replicating the @ref:[zipWithIndex](zipWithIndex.md) operator): + +Scala +: @@snip [StatefulMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala) { #zip-with-index } + +Java +: @@snip [StatefulMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java) { #zip-with-index } + +In this sample we let the value of the elements have an effect on the following elements, if an element starts +with `blacklist:word` we add it to a black list and filter out any subsequent entries of `word`: + +Scala +: @@snip [StatefulMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala) { #blacklist } + +Java +: @@snip [StatefulMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java) { #blacklist } + +For cases where there is a need to emit elements based on the state when the stream ends, it is possible to add an extra +element signalling the end of the stream before the `statefulMapConcat` operator. + +In this sample we collect all elements starting with the letter `b` and emit those once we have reached the end of the stream using +a special end element. The end element is a special string to keep the sample concise, in a real application it may make sense to use types instead. + +Scala +: @@snip [StatefulMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala) { #bs-last } + +Java +: @@snip [StatefulMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java) { #bs-last } + +When defining aggregates like this you should consider if it is safe to let the state grow without bounds or if you should +rather drop elements or throw an exception if the collected set of elements grows too big. + +For even more fine grained capabilities than can be achieved with `statefulMapConcat` take a look at @ref[stream customization](../../stream-customize.md). + ## Reactive Streams semantics @@ -28,4 +65,3 @@ the transformation function is created from a factory for every materialization **completes** when upstream completes and all remaining elements has been emitted @@@ - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java b/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java new file mode 100644 index 0000000000..ba501d2c94 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.flow; + +import akka.NotUsed; +import akka.actor.typed.ActorSystem; +import akka.japi.Pair; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; + +import java.util.*; + +public class StatefulMapConcat { + + static final ActorSystem system = null; + + static void zipWithIndex() { + // #zip-with-index + Source, NotUsed> letterAndIndex = + Source.from(Arrays.asList("a", "b", "c", "d")) + .statefulMapConcat( + () -> { + // variables we close over with lambdas must be final, so we use a container, + // a 1 element array, for the actual value. + long[] counter = {0}; + + // we return the function that will be invoked for each element + return (element) -> { + counter[0] += 1; + // we return an iterable with the single element + return Arrays.asList(new Pair(element, counter[0])); + }; + }); + + letterAndIndex.runForeach(System.out::println, system); + // prints + // Pair(a,1) + // Pair(b,2) + // Pair(c,3) + // Pair(d,4) + // #zip-with-index + } + + static void blacklist() { + // #blacklist + Source fruitsAndBlacklistCommands = + Source.from( + Arrays.asList( + "banana", "pear", "orange", "blacklist:banana", "banana", "pear", "banana")); + + Flow blacklistingFlow = + Flow.of(String.class) + .statefulMapConcat( + () -> { + Set blacklist = new HashSet<>(); + + return (element) -> { + if (element.startsWith("blacklist:")) { + blacklist.add(element.substring(10)); + return Collections + .emptyList(); // no element downstream when adding a blacklisted keyword + } else if (blacklist.contains(element)) { + return Collections + .emptyList(); // no element downstream if element is blacklisted + } else { + return Collections.singletonList(element); + } + }; + }); + + fruitsAndBlacklistCommands.via(blacklistingFlow).runForeach(System.out::println, system); + // prints + // banana + // pear + // orange + // pear + // #blacklist + } + + static void reactOnEnd() { + // #bs-last + Source words = + Source.from(Arrays.asList("baboon", "crocodile", "bat", "flamingo", "hedgehog", "beaver")); + + Flow bWordsLast = + Flow.of(String.class) + .concat(Source.single("-end-")) + .statefulMapConcat( + () -> { + List stashedBWords = new ArrayList<>(); + + return (element) -> { + if (element.startsWith("b")) { + // add to stash and emit no element + stashedBWords.add(element); + return Collections.emptyList(); + } else if (element.equals("-end-")) { + // return in the stashed words in the order they got stashed + return stashedBWords; + } else { + // emit the element as is + return Collections.singletonList(element); + } + }; + }); + + words.via(bWordsLast).runForeach(System.out::println, system); + // prints + // crocodile + // flamingo + // hedgehog + // baboon + // bat + // beaver + // #bs-last + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala b/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala new file mode 100644 index 0000000000..b58f4e33e9 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.flow + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Source + +class StatefulMapConcat { + + implicit val system: ActorSystem = ??? + + def zipWithIndex(): Unit = { + // #zip-with-index + val letterAndIndex = Source("a" :: "b" :: "c" :: "d" :: Nil).statefulMapConcat { () => + var counter = 0L + + // we return the function that will be invoked for each element + { element => + counter += 1 + // we return an iterable with the single element + (element, counter) :: Nil + } + } + + letterAndIndex.runForeach(println) + // prints + // (a,1) + // (b,2) + // (c,3) + // (d,4) + // #zip-with-index + } + + def blacklist(): Unit = { + // #blacklist + val fruitsAndBlacklistCommands = Source( + "banana" :: "pear" :: "orange" :: "blacklist:banana" :: "banana" :: "pear" :: "banana" :: Nil) + + val blacklistingFlow = Flow[String].statefulMapConcat { () => + var blacklist = Set.empty[String] + + { element => + if (element.startsWith("blacklist:")) { + blacklist += element.drop(10) + Nil // no element downstream when adding a blacklisted keyword + } else if (blacklist(element)) { + Nil // no element downstream if element is blacklisted + } else { + element :: Nil + } + } + } + + fruitsAndBlacklistCommands.via(blacklistingFlow).runForeach(println) + // prints + // banana + // pear + // orange + // pear + // #blacklist + } + + def reactOnEnd(): Unit = { + // #bs-last + val words = Source("baboon" :: "crocodile" :: "bat" :: "flamingo" :: "hedgehog" :: "beaver" :: Nil) + + val bWordsLast = Flow[String].concat(Source.single("-end-")).statefulMapConcat { () => + var stashedBWords: List[String] = Nil + + { element => + if (element.startsWith("b")) { + // prepend to stash and emit no element + stashedBWords = element :: stashedBWords + Nil + } else if (element.equals("-end-")) { + // return in the stashed words in the order they got stashed + stashedBWords.reverse + } else { + // emit the element as is + element :: Nil + } + } + } + + words.via(bWordsLast).runForeach(println) + // prints + // crocodile + // flamingo + // hedgehog + // baboon + // bat + // beaver + // #bs-last + } + +}