Operator examples for statefulMapConcat #25468

This commit is contained in:
Johan Andrén 2020-03-10 10:36:07 +01:00 committed by GitHub
parent 52c83dca34
commit d8aed9e9d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 263 additions and 7 deletions

View file

@ -19,6 +19,8 @@ This can be used to flatten collections into individual stream elements.
Returning an empty iterable results in zero elements being passed downstream Returning an empty iterable results in zero elements being passed downstream
rather than the stream being cancelled. rather than the stream being cancelled.
See also @ref:[statefulMapConcat](statefulMapConcat.md)
## Example ## Example
The following takes a stream of integers and emits each element twice downstream. The following takes a stream of integers and emits each element twice downstream.

View file

@ -4,18 +4,55 @@ Transform each element into zero or more elements that are individually passed d
@ref[Simple operators](../index.md#simple-operators) @ref[Simple operators](../index.md#simple-operators)
@@@div { .group-scala }
## Signature ## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #statefulMapConcat } @apidoc[Flow.statefulMapConcat](Flow) { scala="#statefulMapConcat[T](f:()=>Out=>scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]" java="#statefulMapConcat(akka.japi.function.Creator)" }
@@@
## Description ## Description
Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that
the transformation function is created from a factory for every materialization of the flow. the transformation function is created from a factory for every materialization of the flow. This makes it possible to create and
use mutable state for the operation, each new materialization of the stream will have its own state.
For cases where no state is needed but only a way to emit zero or more elements for every incoming element you can use @ref:[mapConcat](mapConcat.md)
## Examples
In this first sample we keep a counter, and combine each element with an id that is unique for the stream materialization
(replicating the @ref:[zipWithIndex](zipWithIndex.md) operator):
Scala
: @@snip [StatefulMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala) { #zip-with-index }
Java
: @@snip [StatefulMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java) { #zip-with-index }
In this sample we let the value of the elements have an effect on the following elements, if an element starts
with `blacklist:word` we add it to a black list and filter out any subsequent entries of `word`:
Scala
: @@snip [StatefulMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala) { #blacklist }
Java
: @@snip [StatefulMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java) { #blacklist }
For cases where there is a need to emit elements based on the state when the stream ends, it is possible to add an extra
element signalling the end of the stream before the `statefulMapConcat` operator.
In this sample we collect all elements starting with the letter `b` and emit those once we have reached the end of the stream using
a special end element. The end element is a special string to keep the sample concise, in a real application it may make sense to use types instead.
Scala
: @@snip [StatefulMapConcat.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala) { #bs-last }
Java
: @@snip [StatefulMapConcat.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMapConcat.java) { #bs-last }
When defining aggregates like this you should consider if it is safe to let the state grow without bounds or if you should
rather drop elements or throw an exception if the collected set of elements grows too big.
For even more fine grained capabilities than can be achieved with `statefulMapConcat` take a look at @ref[stream customization](../../stream-customize.md).
## Reactive Streams semantics ## Reactive Streams semantics
@ -28,4 +65,3 @@ the transformation function is created from a factory for every materialization
**completes** when upstream completes and all remaining elements has been emitted **completes** when upstream completes and all remaining elements has been emitted
@@@ @@@

View file

@ -0,0 +1,119 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.flow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import java.util.*;
public class StatefulMapConcat {
static final ActorSystem<?> system = null;
static void zipWithIndex() {
// #zip-with-index
Source<Pair<String, Long>, NotUsed> letterAndIndex =
Source.from(Arrays.asList("a", "b", "c", "d"))
.statefulMapConcat(
() -> {
// variables we close over with lambdas must be final, so we use a container,
// a 1 element array, for the actual value.
long[] counter = {0};
// we return the function that will be invoked for each element
return (element) -> {
counter[0] += 1;
// we return an iterable with the single element
return Arrays.asList(new Pair(element, counter[0]));
};
});
letterAndIndex.runForeach(System.out::println, system);
// prints
// Pair(a,1)
// Pair(b,2)
// Pair(c,3)
// Pair(d,4)
// #zip-with-index
}
static void blacklist() {
// #blacklist
Source<String, NotUsed> fruitsAndBlacklistCommands =
Source.from(
Arrays.asList(
"banana", "pear", "orange", "blacklist:banana", "banana", "pear", "banana"));
Flow<String, String, NotUsed> blacklistingFlow =
Flow.of(String.class)
.statefulMapConcat(
() -> {
Set<String> blacklist = new HashSet<>();
return (element) -> {
if (element.startsWith("blacklist:")) {
blacklist.add(element.substring(10));
return Collections
.emptyList(); // no element downstream when adding a blacklisted keyword
} else if (blacklist.contains(element)) {
return Collections
.emptyList(); // no element downstream if element is blacklisted
} else {
return Collections.singletonList(element);
}
};
});
fruitsAndBlacklistCommands.via(blacklistingFlow).runForeach(System.out::println, system);
// prints
// banana
// pear
// orange
// pear
// #blacklist
}
static void reactOnEnd() {
// #bs-last
Source<String, NotUsed> words =
Source.from(Arrays.asList("baboon", "crocodile", "bat", "flamingo", "hedgehog", "beaver"));
Flow<String, String, NotUsed> bWordsLast =
Flow.of(String.class)
.concat(Source.single("-end-"))
.statefulMapConcat(
() -> {
List<String> stashedBWords = new ArrayList<>();
return (element) -> {
if (element.startsWith("b")) {
// add to stash and emit no element
stashedBWords.add(element);
return Collections.emptyList();
} else if (element.equals("-end-")) {
// return in the stashed words in the order they got stashed
return stashedBWords;
} else {
// emit the element as is
return Collections.singletonList(element);
}
};
});
words.via(bWordsLast).runForeach(System.out::println, system);
// prints
// crocodile
// flamingo
// hedgehog
// baboon
// bat
// beaver
// #bs-last
}
}

View file

@ -0,0 +1,99 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.flow
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source
class StatefulMapConcat {
implicit val system: ActorSystem = ???
def zipWithIndex(): Unit = {
// #zip-with-index
val letterAndIndex = Source("a" :: "b" :: "c" :: "d" :: Nil).statefulMapConcat { () =>
var counter = 0L
// we return the function that will be invoked for each element
{ element =>
counter += 1
// we return an iterable with the single element
(element, counter) :: Nil
}
}
letterAndIndex.runForeach(println)
// prints
// (a,1)
// (b,2)
// (c,3)
// (d,4)
// #zip-with-index
}
def blacklist(): Unit = {
// #blacklist
val fruitsAndBlacklistCommands = Source(
"banana" :: "pear" :: "orange" :: "blacklist:banana" :: "banana" :: "pear" :: "banana" :: Nil)
val blacklistingFlow = Flow[String].statefulMapConcat { () =>
var blacklist = Set.empty[String]
{ element =>
if (element.startsWith("blacklist:")) {
blacklist += element.drop(10)
Nil // no element downstream when adding a blacklisted keyword
} else if (blacklist(element)) {
Nil // no element downstream if element is blacklisted
} else {
element :: Nil
}
}
}
fruitsAndBlacklistCommands.via(blacklistingFlow).runForeach(println)
// prints
// banana
// pear
// orange
// pear
// #blacklist
}
def reactOnEnd(): Unit = {
// #bs-last
val words = Source("baboon" :: "crocodile" :: "bat" :: "flamingo" :: "hedgehog" :: "beaver" :: Nil)
val bWordsLast = Flow[String].concat(Source.single("-end-")).statefulMapConcat { () =>
var stashedBWords: List[String] = Nil
{ element =>
if (element.startsWith("b")) {
// prepend to stash and emit no element
stashedBWords = element :: stashedBWords
Nil
} else if (element.equals("-end-")) {
// return in the stashed words in the order they got stashed
stashedBWords.reverse
} else {
// emit the element as is
element :: Nil
}
}
}
words.via(bWordsLast).runForeach(println)
// prints
// crocodile
// flamingo
// hedgehog
// baboon
// bat
// beaver
// #bs-last
}
}