diff --git a/.scalafmt.conf b/.scalafmt.conf index af59ff8d08..4452a40ba8 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -44,9 +44,10 @@ rewrite.neverInfix.excludeFilters = [ allElementsOf inOrderElementsOf theSameElementsAs + theSameElementsInOrderAs ] rewriteTokens = { "⇒": "=>" "→": "->" "←": "<-" -} \ No newline at end of file +} diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md b/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md index eacbd63d97..9dd496256f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md @@ -1,6 +1,6 @@ # Flow.asFlowWithContext -Turns a Flow into a FlowWithContext which can propagate a context per element along a stream. +Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream. @ref[Simple operators](../index.md#simple-operators) @@ -12,6 +12,21 @@ Turns a Flow into a FlowWithContext which can propagate a context per element al See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation. -Turns a @apidoc[Flow] into a @apidoc[FlowWithContext] which can propagate a context per element along a stream. +Extracts context data from the elements of a @apidoc[Flow] so that it can be turned into a @apidoc[FlowWithContext] which can propagate that context per element along a stream. The first function passed into `asFlowWithContext` must turn each incoming pair of element and context value into an element of this @apidoc[Flow]. The second function passed into `asFlowWithContext` must turn each outgoing element of this @apidoc[Flow] into an outgoing context value. + +See also: + +* @ref[Context Propagation](../../stream-context.md) +* @ref[`Source.asSourceWithContext`](../Source/asSourceWithContext.md) Turns a `Source` into a `SourceWithContext` which can propagate a context per element along a stream. + +## Example + +Elements from this flow have a correlation number, but the flow structure should focus on the text message in the elements. The first converter in `asFlowWithContext` applies to the end of the "with context" flow to turn it into a regular flow again. The second converter function chooses the second value in the @scala[tuple]@java[pair] as the context. Another `map` operator makes the first value the stream elements in the `FlowWithContext`. + +Scala +: @@snip [snip](/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala) { #asFlowWithContext } + +Java +: @@snip [snip](/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java) { #imports #asFlowWithContext } diff --git a/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md index 7c807ded79..50dcc53b7b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md @@ -1,6 +1,6 @@ # Source.asSourceWithContext -Turns a Source into a SourceWithContext which can propagate a context per element along a stream. +Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream. @ref[Source operators](../index.md#source-operators) @@ -12,5 +12,21 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation. -Turns a @apidoc[Source] into a @apidoc[SourceWithContext] which can propagate a context per element along a stream. +Extracts context data from the elements of a @apidoc[Source] so that it can be turned into a @apidoc[SourceWithContext] which can propagate that context per element along a stream. The function passed into `asSourceWithContext` must turn elements into contexts, one context for every element. + +See also: + +* @ref[Context Propagation](../../stream-context.md) +* @ref[`Flow.asFlowWithContext`](../Flow/asFlowWithContext.md) Turns a `Flow` into a `FlowWithContext` which can propagate a context per element along a stream. + + +## Example + +Elements from this source have a correlation number, but the flow structure should focus on the text message in the elements. `asSourceWithContext` chooses the second value in the @scala[tuple]@java[pair] as the context. Another `map` operator makes the first value the stream elements in the `SourceWithContext`. + +Scala +: @@snip [snip](/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala) { #asSourceWithContext } + +Java +: @@snip [snip](/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java) { #imports #asSourceWithContext } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index ab571a8c5e..3ae52747d8 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -7,7 +7,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] | |Operator|Description| |--|--|--| -|Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| +|Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| @@ -140,7 +140,7 @@ depending on being backpressured by downstream or not. | |Operator|Description| |--|--|--| -|Flow|@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.| +|Flow|@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.| |Source/Flow|@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.| |Source/Flow|@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.| |Flow|@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java b/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java new file mode 100644 index 0000000000..d5be002f9c --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/WithContextTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2015-2020 Lightbend Inc. + */ + +package jdocs.stream.operators; + +import akka.actor.ActorSystem; +// #imports +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.javadsl.*; +// #imports +import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; + +public class WithContextTest extends AbstractJavaTest { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("WithContextTest"); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void documentAsSourceWithContext() throws Exception { + // #asSourceWithContext + + // values with their contexts as pairs + Collection> values = + Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3)); + + // a regular source with pairs as elements + Source, NotUsed> source = Source.from(values); + + // split the pair into stream elements and their context + SourceWithContext sourceWithContext = + source + .asSourceWithContext(Pair::second) // pick the second pair element as context + .map(Pair::first); // keep the first pair element as stream element + + SourceWithContext mapped = + sourceWithContext + // regular operators apply to the element without seeing the context + .map(s -> s.replace('e', 'y')); + + // running the source and asserting the outcome + CompletionStage>> result = mapped.runWith(Sink.seq(), system); + List> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertThat( + list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3))); + // #asSourceWithContext + } + + @Test + public void documentAsFlowWithContext() throws Exception { + // #asFlowWithContext + + // a regular flow with pairs as elements + Flow, Pair, NotUsed> flow = // ... + // #asFlowWithContext + Flow.create(); + // #asFlowWithContext + + // Declare the "flow with context" + // ingoing: String and Integer + // outgoing: String and Integer + FlowWithContext flowWithContext = + // convert the flow of pairs into a "flow with context" + flow.asFlowWithContext( + // at the end of this flow: put the elements and the context back into a pair + Pair::create, + // pick the second element of the incoming pair as context + Pair::second) + // keep the first pair element as stream element + .map(Pair::first); + + FlowWithContext mapped = + flowWithContext + // regular operators apply to the element without seeing the context + .map(s -> s.replace('e', 'y')); + + // running the flow with some sample data and asserting the outcome + Collection> values = + Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3)); + + SourceWithContext source = + Source.from(values).asSourceWithContext(Pair::second).map(Pair::first); + + CompletionStage>> result = + source.via(mapped).runWith(Sink.seq(), system); + List> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertThat( + list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3))); + // #asFlowWithContext + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala b/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala new file mode 100644 index 0000000000..d2daa4bd2e --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/WithContextSpec.scala @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package docs.stream.operators + +import akka.testkit.AkkaSpec + +class WithContextSpec extends AkkaSpec { + + "use asSourceWithContext" in { + // #asSourceWithContext + import akka.NotUsed + import akka.stream.scaladsl.Source + import akka.stream.scaladsl.SourceWithContext + import scala.collection.immutable + + // values with their contexts as tuples + val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3) + + // a regular source with the tuples as elements + val source: Source[(String, Int), NotUsed] = Source(values) + + // split the tuple into stream elements and their context + val sourceWithContext: SourceWithContext[String, Int, NotUsed] = + source + .asSourceWithContext(_._2) // pick the second tuple element as context + .map(_._1) // keep the first tuple element as stream element + + val mapped: SourceWithContext[String, Int, NotUsed] = sourceWithContext + // regular operators apply to the element without seeing the context + .map(s => s.reverse) + + // running the source and asserting the outcome + import akka.stream.scaladsl.Sink + val result = mapped.runWith(Sink.seq) + result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3) + // #asSourceWithContext + } + + "use asFlowWithContext" in { + // #asFlowWithContext + import akka.NotUsed + import akka.stream.scaladsl.Flow + import akka.stream.scaladsl.FlowWithContext + // a regular flow with pairs as elements + val flow: Flow[(String, Int), (String, Int), NotUsed] = // ??? + // #asFlowWithContext + Flow[(String, Int)] + // #asFlowWithContext + + // Declare the "flow with context" + // ingoing: String and Integer + // outgoing: String and Integer + val flowWithContext: FlowWithContext[String, Int, String, Int, NotUsed] = + // convert the flow of pairs into a "flow with context" + flow + .asFlowWithContext[String, Int, Int]( + // at the end of this flow: put the elements and the context back into a tuple + collapseContext = Tuple2.apply)( + // pick the second element of the incoming pair as context + extractContext = _._2) + .map(_._1) // keep the first pair element as stream element + + val mapped = flowWithContext + // regular operators apply to the element without seeing the context + .map(_.reverse) + + // running the flow with some sample data and asserting the outcome + import akka.stream.scaladsl.Source + import akka.stream.scaladsl.Sink + import scala.collection.immutable + + val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3) + val source = Source(values).asSourceWithContext(_._2).map(_._1) + + val result = source.via(mapped).runWith(Sink.seq) + result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3) + // #asFlowWithContext + } +}