diff --git a/akka-docs/src/main/paradox/stream/stream-cookbook.md b/akka-docs/src/main/paradox/stream/stream-cookbook.md index 77e0c3880a..cb37f999e3 100644 --- a/akka-docs/src/main/paradox/stream/stream-cookbook.md +++ b/akka-docs/src/main/paradox/stream/stream-cookbook.md @@ -18,7 +18,9 @@ To use Akka Streams, add the module to your project: This is a collection of patterns to demonstrate various usage of the Akka Streams API by solving small targeted problems in the format of "recipes". The purpose of this page is to give inspiration and ideas how to approach various small tasks involving streams. The recipes in this page can be used directly as-is, but they are most powerful as -starting points: customization of the code snippets is warmly encouraged. +starting points: customization of the code snippets is warmly encouraged. The recipes can be extended or can provide a +basis for the implementation of other [patterns](https://doc.akka.io/docs/alpakka/current/patterns.html) involving +[Alpakka](https://doc.akka.io/docs/alpakka/current). This part also serves as supplementary material for the main body of documentation. It is a good idea to have this page open while reading the manual and look for examples demonstrating various streaming concepts @@ -159,6 +161,43 @@ Scala Java : @@snip [RecipeDecompress.java](/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeDecompress.java) { #decompress-gzip } +### Implementing a Splitter + +**Situation:** Given a stream of messages, where each message is a composition of different elements, we want to split +the message into a series of individual sub-messages, each of which may be processed in a different way. + +The [Splitter](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html) is an integration +pattern as described in [Enterprise Integration Patterns](https://www.enterpriseintegrationpatterns.com). Let's say +that we have a stream containing strings. Each string contains a few numbers separated by "-". We want to create out +of this a stream that only contains the numbers. + +Scala +: @@snip [RecipeSplitter.scala](/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala) { #Simple-Split } + +Java +: @@snip [RecipeSplitter.java](/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java) { #Simple-Split } + +### Implementing a Splitter and Aggregator + +**Situation:** Given a message, we want to split the message and aggregate its sub-messages into a new message + +Sometimes it's very useful to split a message and aggregate its sub-messages into a new message. This involves a +combination of [Splitter](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html) +and [Aggregator](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html) + +Let's say that now we want to create a new stream containing the sums of the numbers in each original string. + + +Scala +: @@snip [RecipeSplitter.scala](/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala) { #Aggregate-Split } + +Java +: @@snip [RecipeSplitter.java](/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java) { #Aggregate-Split } + +While in real life this solution is overkill for such a simple problem (you can just do everything in a map), +more complex scenarios, involving in particular I/O, will benefit from the fact that you can parallelize sub-streams +and get back-pressure for "free". + ### Implementing reduce-by-key **Situation:** Given a stream of elements, we want to calculate some aggregated value on different subgroups of the diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java new file mode 100644 index 0000000000..20d7766b33 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package jdocs.stream.javadsl.cookbook; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.javadsl.TestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + +public class RecipeSplitter { + private static ActorSystem system; + + @Test + public void simpleSplit() throws ExecutionException, InterruptedException { + // #Simple-Split + // Sample Source + Source source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4")); + + CompletionStage> ret = + source + .map(s -> Arrays.asList(s.split("-"))) + .mapConcat(f -> f) + // Sub-streams logic + .map(s -> Integer.valueOf(s)) + .runWith(Sink.seq(), system); + + // Verify results + List list = ret.toCompletableFuture().get(); + assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4)); + // #Simple-Split + } + + @Test + public void splitAggregate() throws ExecutionException, InterruptedException { + // #Aggregate-Split + // Sample Source + Source source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4")); + + CompletionStage> ret = + source + .map(s -> Arrays.asList(s.split("-"))) + // split all messages into sub-streams + .splitWhen(a -> true) + // now split each collection + .mapConcat(f -> f) + // Sub-streams logic + .map(s -> Integer.valueOf(s)) + // aggregate each sub-stream + .reduce((a, b) -> a + b) + // and merge back the result into the original stream + .mergeSubstreams() + .runWith(Sink.seq(), system); + + // Verify results + List list = ret.toCompletableFuture().get(); + assert list.equals(Arrays.asList(6, 5, 7)); + // #Aggregate-Split + } + + @BeforeClass + public static void setup() throws Exception { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() throws Exception { + TestKit.shutdownActorSystem(system); + } +} diff --git a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala new file mode 100644 index 0000000000..463590a163 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSplitter.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.stream.cookbook + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Sink, Source} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class RecipeSplitter extends AnyWordSpec with BeforeAndAfterAll with Matchers with ScalaFutures{ + + implicit val system = ActorSystem("Test") + + "Splitter" should { + " simple split " in { + + //#Simple-Split + //Sample Source + val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4")) + + val ret = source + .map(s => s.split("-").toList) + .mapConcat(identity) + //Sub-streams logic + .map(s => s.toInt) + .runWith(Sink.seq) + + //Verify results + + ret.futureValue should be(Vector(1, 2, 3, 2, 3, 3, 4)) + //#Simple-Split + + } + + " aggregate split" in { + + //#Aggregate-Split + //Sample Source + val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4")) + + val result = source + .map(s => s.split("-").toList) + //split all messages into sub-streams + .splitWhen(a => true) + //now split each collection + .mapConcat(identity) + //Sub-streams logic + .map(s => s.toInt) + //aggregate each sub-stream + .reduce((a, b) => a + b) + //and merge back the result into the original stream + .mergeSubstreams + .runWith(Sink.seq); + + //Verify results + result.futureValue should be(Vector(6, 5, 7)) + //#Aggregate-Split + + } + + } + + override protected def afterAll(): Unit = system.terminate() + +}