diff --git a/akka-docs/src/main/paradox/stream/index.md b/akka-docs/src/main/paradox/stream/index.md index 1dfce673b4..e35b6af24f 100644 --- a/akka-docs/src/main/paradox/stream/index.md +++ b/akka-docs/src/main/paradox/stream/index.md @@ -26,6 +26,7 @@ To use Akka Streams, add the module to your project: * [stream-graphs](stream-graphs.md) * [stream-composition](stream-composition.md) * [stream-rate](stream-rate.md) +* [stream-context](stream-context.md) * [stream-dynamic](stream-dynamic.md) * [stream-customize](stream-customize.md) * [futures-interop](futures-interop.md) diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md b/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md index 0745f131c7..eacbd63d97 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md @@ -8,9 +8,10 @@ Turns a Flow into a FlowWithContext which can propagate a context per element al @apidoc[Flow.asFlowWithContext](Flow) { scala="#asFlowWithContext[U,CtxU,CtxOut](collapseContext:(U,CtxU)=>In)(extractContext:Out=>CtxOut):akka.stream.scaladsl.FlowWithContext[U,CtxU,Out,CtxOut,Mat]" java="#asFlowWithContext(akka.japi.function.Function2,akka.japi.function.Function)" } - ## Description -Turns a Flow into a FlowWithContext which can propagate a context per element along a stream. -The first function passed into asFlowWithContext must turn each incoming pair of element and context value into an element of this Flow. -The second function passed into asFlowWithContext must turn each outgoing element of this Flow into an outgoing context value. +See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation. + +Turns a @apidoc[Flow] into a @apidoc[FlowWithContext] which can propagate a context per element along a stream. +The first function passed into `asFlowWithContext` must turn each incoming pair of element and context value into an element of this @apidoc[Flow]. +The second function passed into `asFlowWithContext` must turn each outgoing element of this @apidoc[Flow] into an outgoing context value. diff --git a/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md index 9c04c0f902..7c807ded79 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md @@ -8,8 +8,9 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen @apidoc[Source.asSourceWithContext](Source) { scala="#asSourceWithContext[Ctx](f:Out=>Ctx):akka.stream.scaladsl.SourceWithContext[Out,Ctx,Mat]" java="#asSourceWithContext(akka.japi.function.Function)" } - ## Description -Turns a Source into a SourceWithContext which can propagate a context per element along a stream. -The function passed into asSourceWithContext must turn elements into contexts, one context for every element. +See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation. + +Turns a @apidoc[Source] into a @apidoc[SourceWithContext] which can propagate a context per element along a stream. +The function passed into `asSourceWithContext` must turn elements into contexts, one context for every element. diff --git a/akka-docs/src/main/paradox/stream/stream-context.md b/akka-docs/src/main/paradox/stream/stream-context.md new file mode 100644 index 0000000000..392ec52550 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/stream-context.md @@ -0,0 +1,71 @@ +# Context Propagation + +It can be convenient to attach metadata to each element in the stream. + +For example, when reading from an external data source it can be +useful to keep track of the read offset, so it can be marked as processed +when the element reaches the @apidoc[Sink]. + +For this use case we provide the @apidoc[SourceWithContext] and +@apidoc[FlowWithContext] variations on @apidoc[Source] and +@apidoc[Flow]. + +Essentially, a @apidoc[FlowWithContext] is just a @apidoc[Flow] that +contains @scala[tuples]@java[pairs] of element and context, but the +advantage is in the operators: most operators on @apidoc[FlowWithContext] +will work on the element rather than on the @scala[tuple]@java[pair], +allowing you to focus on your application logic rather without worrying +about the context. + +## Restrictions + +Not all operations that are available on @apidoc[Flow] are also available +on @apidoc[FlowWithContext]. This is intentional: in the use case of +keeping track of a read offset, if the @apidoc[FlowWithContext] was +allowed to arbitrarily filter and reorder the stream, the @apidoc[Sink] +would have no way to determine whether an element was skipped +or merely reordered and still in flight. + +For this reason, @apidoc[FlowWithContext] allows filtering operations +(such as `filter`, `filterNot`, `collect`, etc) and grouping operations +(such as `grouped`, `sliding`, etc) but not reordering operations +(such as `mapAsyncUnordered` and `statefulMapConcat`). Finally, +also 'one-to-n' operations such as `mapConcat` are allowed. + +Filtering operations will drop the context along with dropped elements, +while grouping operations will keep all contexts from the elements in +the group. Streaming one-to-many operations such as `mapConcat` +associate the original context with each of the produced elements. + +As an escape hatch, there is a `via` operator that allows you to +insert an arbitrary @apidoc[Flow] that can process the +@scala[tuples]@java[pairs] of elements and context in any way +desired. When using this operator, it is the responsibility of the +implementor to make sure this @apidoc[Flow] does not perform +any operations (such as reordering) that might break assumptions +made by the @apidoc[Sink] consuming the context elements. + +## Creation + +The simplest way to create a @apidoc[SourceWithContext] is to +first create a regular @apidoc[Source] with elements from which +the context can be extracted, and then use +@ref[Source.asSourceWithContext](operators/Source/asSourceWithContext.md). + +## Composition + +When you have a @apidoc[SourceWithContext] `source` that produces +elements of type `Foo` with a context of type `Ctx`, and a +@apidoc[Flow] `flow` from `Foo` to `Bar`, you cannot simply +`source.via(flow)` to arrive at a @apidoc[SourceWithContext] that +produces elements of type `Bar` with contexts of type `Ctx`. The +reason for this is that `flow` might reorder the elements flowing +through it, making `via` challenging to implement. + +There is a @ref[Flow.asFlowWithContext](operators/Flow/asFlowWithContext.md) +which can be used when the types used in the inner +@apidoc[Flow] have room to hold the context. If this is not the +case, a better solution is usually to build the flow from the ground +up as a @apidoc[FlowWithContext], instead of first building a +@apidoc[Flow] and trying to convert it to @apidoc[FlowWithContext] +after-the-fact. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 4267165d1b..53aff73403 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -46,6 +46,10 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). * + * It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected + * by the downstream elements, such as reordering. For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + * * This can be used as an escape hatch for operations that are not (yet) provided with automatic * context propagation here. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 4544482283..a08c16b46c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -43,6 +43,10 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). * + * It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected + * by the downstream elements, such as reordering. For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + * * This can be used as an escape hatch for operations that are not (yet) provided with automatic * context propagation here. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 00eadebe80..530db2353a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -29,6 +29,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). * + * It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected + * by the downstream elements, such as reordering. For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + * * This can be used as an escape hatch for operations that are not (yet) provided with automatic * context propagation here. * @@ -40,6 +44,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). * + * It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected + * by the downstream elements, such as reordering. For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + * * This can be used as an escape hatch for operations that are not (yet) provided with automatic * context propagation here. *