Operator docs: examples for asSource/FlowWithContext (#29822)

* Operator docs: examples for asSource/FlowWithContext

* javafmtAll

* Try to capture the essence better

* Update index

* Include the test use in the snippets
This commit is contained in:
Enno Runne 2020-11-25 12:21:16 +01:00 committed by GitHub
parent c9980216a1
commit 459e74b921
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 235 additions and 7 deletions

View file

@ -44,6 +44,7 @@ rewrite.neverInfix.excludeFilters = [
allElementsOf
inOrderElementsOf
theSameElementsAs
theSameElementsInOrderAs
]
rewriteTokens = {
"⇒": "=>"

View file

@ -1,6 +1,6 @@
# Flow.asFlowWithContext
Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.
Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.
@ref[Simple operators](../index.md#simple-operators)
@ -12,6 +12,21 @@ Turns a Flow into a FlowWithContext which can propagate a context per element al
See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation.
Turns a @apidoc[Flow] into a @apidoc[FlowWithContext] which can propagate a context per element along a stream.
Extracts context data from the elements of a @apidoc[Flow] so that it can be turned into a @apidoc[FlowWithContext] which can propagate that context per element along a stream.
The first function passed into `asFlowWithContext` must turn each incoming pair of element and context value into an element of this @apidoc[Flow].
The second function passed into `asFlowWithContext` must turn each outgoing element of this @apidoc[Flow] into an outgoing context value.
See also:
* @ref[Context Propagation](../../stream-context.md)
* @ref[`Source.asSourceWithContext`](../Source/asSourceWithContext.md) Turns a `Source` into a `SourceWithContext` which can propagate a context per element along a stream.
## Example
Elements from this flow have a correlation number, but the flow structure should focus on the text message in the elements. The first converter in `asFlowWithContext` applies to the end of the "with context" flow to turn it into a regular flow again. The second converter function chooses the second value in the @scala[tuple]@java[pair] as the context. Another `map` operator makes the first value the stream elements in the `FlowWithContext`.
Scala
: @@snip [snip](/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala) { #asFlowWithContext }
Java
: @@snip [snip](/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java) { #imports #asFlowWithContext }

View file

@ -1,6 +1,6 @@
# Source.asSourceWithContext
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream.
@ref[Source operators](../index.md#source-operators)
@ -12,5 +12,21 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen
See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation.
Turns a @apidoc[Source] into a @apidoc[SourceWithContext] which can propagate a context per element along a stream.
Extracts context data from the elements of a @apidoc[Source] so that it can be turned into a @apidoc[SourceWithContext] which can propagate that context per element along a stream.
The function passed into `asSourceWithContext` must turn elements into contexts, one context for every element.
See also:
* @ref[Context Propagation](../../stream-context.md)
* @ref[`Flow.asFlowWithContext`](../Flow/asFlowWithContext.md) Turns a `Flow` into a `FlowWithContext` which can propagate a context per element along a stream.
## Example
Elements from this source have a correlation number, but the flow structure should focus on the text message in the elements. `asSourceWithContext` chooses the second value in the @scala[tuple]@java[pair] as the context. Another `map` operator makes the first value the stream elements in the `SourceWithContext`.
Scala
: @@snip [snip](/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala) { #asSourceWithContext }
Java
: @@snip [snip](/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java) { #imports #asSourceWithContext }

View file

@ -7,7 +7,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
| |Operator|Description|
|--|--|--|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream.|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|Source|<a name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.|
@ -140,7 +140,7 @@ depending on being backpressured by downstream or not.
| |Operator|Description|
|--|--|--|
|Flow|<a name="asflowwithcontext"></a>@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.|
|Flow|<a name="asflowwithcontext"></a>@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.|
|Source/Flow|<a name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|<a name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Flow|<a name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators;
import akka.actor.ActorSystem;
// #imports
import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.javadsl.*;
// #imports
import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
public class WithContextTest extends AbstractJavaTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("WithContextTest");
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void documentAsSourceWithContext() throws Exception {
// #asSourceWithContext
// values with their contexts as pairs
Collection<Pair<String, Integer>> values =
Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3));
// a regular source with pairs as elements
Source<Pair<String, Integer>, NotUsed> source = Source.from(values);
// split the pair into stream elements and their context
SourceWithContext<String, Integer, NotUsed> sourceWithContext =
source
.asSourceWithContext(Pair::second) // pick the second pair element as context
.map(Pair::first); // keep the first pair element as stream element
SourceWithContext<String, Integer, NotUsed> mapped =
sourceWithContext
// regular operators apply to the element without seeing the context
.map(s -> s.replace('e', 'y'));
// running the source and asserting the outcome
CompletionStage<List<Pair<String, Integer>>> result = mapped.runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(
list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));
// #asSourceWithContext
}
@Test
public void documentAsFlowWithContext() throws Exception {
// #asFlowWithContext
// a regular flow with pairs as elements
Flow<Pair<String, Integer>, Pair<String, Integer>, NotUsed> flow = // ...
// #asFlowWithContext
Flow.create();
// #asFlowWithContext
// Declare the "flow with context"
// ingoing: String and Integer
// outgoing: String and Integer
FlowWithContext<String, Integer, String, Integer, NotUsed> flowWithContext =
// convert the flow of pairs into a "flow with context"
flow.<String, Integer, Integer>asFlowWithContext(
// at the end of this flow: put the elements and the context back into a pair
Pair::create,
// pick the second element of the incoming pair as context
Pair::second)
// keep the first pair element as stream element
.map(Pair::first);
FlowWithContext<String, Integer, String, Integer, NotUsed> mapped =
flowWithContext
// regular operators apply to the element without seeing the context
.map(s -> s.replace('e', 'y'));
// running the flow with some sample data and asserting the outcome
Collection<Pair<String, Integer>> values =
Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3));
SourceWithContext<String, Integer, NotUsed> source =
Source.from(values).asSourceWithContext(Pair::second).map(Pair::first);
CompletionStage<List<Pair<String, Integer>>> result =
source.via(mapped).runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(
list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));
// #asFlowWithContext
}
}

View file

@ -0,0 +1,81 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators
import akka.testkit.AkkaSpec
class WithContextSpec extends AkkaSpec {
"use asSourceWithContext" in {
// #asSourceWithContext
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceWithContext
import scala.collection.immutable
// values with their contexts as tuples
val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3)
// a regular source with the tuples as elements
val source: Source[(String, Int), NotUsed] = Source(values)
// split the tuple into stream elements and their context
val sourceWithContext: SourceWithContext[String, Int, NotUsed] =
source
.asSourceWithContext(_._2) // pick the second tuple element as context
.map(_._1) // keep the first tuple element as stream element
val mapped: SourceWithContext[String, Int, NotUsed] = sourceWithContext
// regular operators apply to the element without seeing the context
.map(s => s.reverse)
// running the source and asserting the outcome
import akka.stream.scaladsl.Sink
val result = mapped.runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
// #asSourceWithContext
}
"use asFlowWithContext" in {
// #asFlowWithContext
import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.FlowWithContext
// a regular flow with pairs as elements
val flow: Flow[(String, Int), (String, Int), NotUsed] = // ???
// #asFlowWithContext
Flow[(String, Int)]
// #asFlowWithContext
// Declare the "flow with context"
// ingoing: String and Integer
// outgoing: String and Integer
val flowWithContext: FlowWithContext[String, Int, String, Int, NotUsed] =
// convert the flow of pairs into a "flow with context"
flow
.asFlowWithContext[String, Int, Int](
// at the end of this flow: put the elements and the context back into a tuple
collapseContext = Tuple2.apply)(
// pick the second element of the incoming pair as context
extractContext = _._2)
.map(_._1) // keep the first pair element as stream element
val mapped = flowWithContext
// regular operators apply to the element without seeing the context
.map(_.reverse)
// running the flow with some sample data and asserting the outcome
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import scala.collection.immutable
val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3)
val source = Source(values).asSourceWithContext(_._2).map(_._1)
val result = source.via(mapped).runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
// #asFlowWithContext
}
}