#1732: Moved splitter and aggregator from Alpakka Docs to Cookbook and … (#31203)

This commit is contained in:
Anthony Cheng 2022-03-10 10:23:55 +00:00 committed by GitHub
parent 1b14e25f82
commit 981eb17f2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 190 additions and 1 deletions

View file

@ -18,7 +18,9 @@ To use Akka Streams, add the module to your project:
This is a collection of patterns to demonstrate various usage of the Akka Streams API by solving small targeted
problems in the format of "recipes". The purpose of this page is to give inspiration and ideas how to approach
various small tasks involving streams. The recipes in this page can be used directly as-is, but they are most powerful as
starting points: customization of the code snippets is warmly encouraged.
starting points: customization of the code snippets is warmly encouraged. The recipes can be extended or can provide a
basis for the implementation of other [patterns](https://doc.akka.io/docs/alpakka/current/patterns.html) involving
[Alpakka](https://doc.akka.io/docs/alpakka/current).
This part also serves as supplementary material for the main body of documentation. It is a good idea to have this page
open while reading the manual and look for examples demonstrating various streaming concepts
@ -159,6 +161,43 @@ Scala
Java
: @@snip [RecipeDecompress.java](/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeDecompress.java) { #decompress-gzip }
### Implementing a Splitter
**Situation:** Given a stream of messages, where each message is a composition of different elements, we want to split
the message into a series of individual sub-messages, each of which may be processed in a different way.
The [Splitter](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html) is an integration
pattern as described in [Enterprise Integration Patterns](https://www.enterpriseintegrationpatterns.com). Let's say
that we have a stream containing strings. Each string contains a few numbers separated by "-". We want to create out
of this a stream that only contains the numbers.
Scala
: @@snip [RecipeSplitter.scala](/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala) { #Simple-Split }
Java
: @@snip [RecipeSplitter.java](/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java) { #Simple-Split }
### Implementing a Splitter and Aggregator
**Situation:** Given a message, we want to split the message and aggregate its sub-messages into a new message
Sometimes it's very useful to split a message and aggregate its sub-messages into a new message. This involves a
combination of [Splitter](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html)
and [Aggregator](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html)
Let's say that now we want to create a new stream containing the sums of the numbers in each original string.
Scala
: @@snip [RecipeSplitter.scala](/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala) { #Aggregate-Split }
Java
: @@snip [RecipeSplitter.java](/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java) { #Aggregate-Split }
While in real life this solution is overkill for such a simple problem (you can just do everything in a map),
more complex scenarios, involving in particular I/O, will benefit from the fact that you can parallelize sub-streams
and get back-pressure for "free".
### Implementing reduce-by-key
**Situation:** Given a stream of elements, we want to calculate some aggregated value on different subgroups of the

View file

@ -0,0 +1,80 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.javadsl.cookbook;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
public class RecipeSplitter {
private static ActorSystem system;
@Test
public void simpleSplit() throws ExecutionException, InterruptedException {
// #Simple-Split
// Sample Source
Source<String, NotUsed> source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4"));
CompletionStage<List<Integer>> ret =
source
.map(s -> Arrays.asList(s.split("-")))
.mapConcat(f -> f)
// Sub-streams logic
.map(s -> Integer.valueOf(s))
.runWith(Sink.seq(), system);
// Verify results
List<Integer> list = ret.toCompletableFuture().get();
assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4));
// #Simple-Split
}
@Test
public void splitAggregate() throws ExecutionException, InterruptedException {
// #Aggregate-Split
// Sample Source
Source<String, NotUsed> source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4"));
CompletionStage<List<Integer>> ret =
source
.map(s -> Arrays.asList(s.split("-")))
// split all messages into sub-streams
.splitWhen(a -> true)
// now split each collection
.mapConcat(f -> f)
// Sub-streams logic
.map(s -> Integer.valueOf(s))
// aggregate each sub-stream
.reduce((a, b) -> a + b)
// and merge back the result into the original stream
.mergeSubstreams()
.runWith(Sink.seq(), system);
// Verify results
List<Integer> list = ret.toCompletableFuture().get();
assert list.equals(Arrays.asList(6, 5, 7));
// #Aggregate-Split
}
@BeforeClass
public static void setup() throws Exception {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() throws Exception {
TestKit.shutdownActorSystem(system);
}
}

View file

@ -0,0 +1,70 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.cookbook
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class RecipeSplitter extends AnyWordSpec with BeforeAndAfterAll with Matchers with ScalaFutures{
implicit val system = ActorSystem("Test")
"Splitter" should {
" simple split " in {
//#Simple-Split
//Sample Source
val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4"))
val ret = source
.map(s => s.split("-").toList)
.mapConcat(identity)
//Sub-streams logic
.map(s => s.toInt)
.runWith(Sink.seq)
//Verify results
ret.futureValue should be(Vector(1, 2, 3, 2, 3, 3, 4))
//#Simple-Split
}
" aggregate split" in {
//#Aggregate-Split
//Sample Source
val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4"))
val result = source
.map(s => s.split("-").toList)
//split all messages into sub-streams
.splitWhen(a => true)
//now split each collection
.mapConcat(identity)
//Sub-streams logic
.map(s => s.toInt)
//aggregate each sub-stream
.reduce((a, b) => a + b)
//and merge back the result into the original stream
.mergeSubstreams
.runWith(Sink.seq);
//Verify results
result.futureValue should be(Vector(6, 5, 7))
//#Aggregate-Split
}
}
override protected def afterAll(): Unit = system.terminate()
}