Docs: WithContext (#29088)

This commit is contained in:
Arnout Engelen 2020-06-05 11:31:10 +02:00 committed by GitHub
parent 26db4c1f79
commit afc9813c55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 97 additions and 7 deletions

View file

@ -26,6 +26,7 @@ To use Akka Streams, add the module to your project:
* [stream-graphs](stream-graphs.md)
* [stream-composition](stream-composition.md)
* [stream-rate](stream-rate.md)
* [stream-context](stream-context.md)
* [stream-dynamic](stream-dynamic.md)
* [stream-customize](stream-customize.md)
* [futures-interop](futures-interop.md)

View file

@ -8,9 +8,10 @@ Turns a Flow into a FlowWithContext which can propagate a context per element al
@apidoc[Flow.asFlowWithContext](Flow) { scala="#asFlowWithContext[U,CtxU,CtxOut](collapseContext:(U,CtxU)=>In)(extractContext:Out=>CtxOut):akka.stream.scaladsl.FlowWithContext[U,CtxU,Out,CtxOut,Mat]" java="#asFlowWithContext(akka.japi.function.Function2,akka.japi.function.Function)" }
## Description
Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.
The first function passed into asFlowWithContext must turn each incoming pair of element and context value into an element of this Flow.
The second function passed into asFlowWithContext must turn each outgoing element of this Flow into an outgoing context value.
See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation.
Turns a @apidoc[Flow] into a @apidoc[FlowWithContext] which can propagate a context per element along a stream.
The first function passed into `asFlowWithContext` must turn each incoming pair of element and context value into an element of this @apidoc[Flow].
The second function passed into `asFlowWithContext` must turn each outgoing element of this @apidoc[Flow] into an outgoing context value.

View file

@ -8,8 +8,9 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen
@apidoc[Source.asSourceWithContext](Source) { scala="#asSourceWithContext[Ctx](f:Out=>Ctx):akka.stream.scaladsl.SourceWithContext[Out,Ctx,Mat]" java="#asSourceWithContext(akka.japi.function.Function)" }
## Description
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
The function passed into asSourceWithContext must turn elements into contexts, one context for every element.
See @ref[Context Propagation](../../stream-context.md) for a general overview of context propagation.
Turns a @apidoc[Source] into a @apidoc[SourceWithContext] which can propagate a context per element along a stream.
The function passed into `asSourceWithContext` must turn elements into contexts, one context for every element.

View file

@ -0,0 +1,71 @@
# Context Propagation
It can be convenient to attach metadata to each element in the stream.
For example, when reading from an external data source it can be
useful to keep track of the read offset, so it can be marked as processed
when the element reaches the @apidoc[Sink].
For this use case we provide the @apidoc[SourceWithContext] and
@apidoc[FlowWithContext] variations on @apidoc[Source] and
@apidoc[Flow].
Essentially, a @apidoc[FlowWithContext] is just a @apidoc[Flow] that
contains @scala[tuples]@java[pairs] of element and context, but the
advantage is in the operators: most operators on @apidoc[FlowWithContext]
will work on the element rather than on the @scala[tuple]@java[pair],
allowing you to focus on your application logic rather without worrying
about the context.
## Restrictions
Not all operations that are available on @apidoc[Flow] are also available
on @apidoc[FlowWithContext]. This is intentional: in the use case of
keeping track of a read offset, if the @apidoc[FlowWithContext] was
allowed to arbitrarily filter and reorder the stream, the @apidoc[Sink]
would have no way to determine whether an element was skipped
or merely reordered and still in flight.
For this reason, @apidoc[FlowWithContext] allows filtering operations
(such as `filter`, `filterNot`, `collect`, etc) and grouping operations
(such as `grouped`, `sliding`, etc) but not reordering operations
(such as `mapAsyncUnordered` and `statefulMapConcat`). Finally,
also 'one-to-n' operations such as `mapConcat` are allowed.
Filtering operations will drop the context along with dropped elements,
while grouping operations will keep all contexts from the elements in
the group. Streaming one-to-many operations such as `mapConcat`
associate the original context with each of the produced elements.
As an escape hatch, there is a `via` operator that allows you to
insert an arbitrary @apidoc[Flow] that can process the
@scala[tuples]@java[pairs] of elements and context in any way
desired. When using this operator, it is the responsibility of the
implementor to make sure this @apidoc[Flow] does not perform
any operations (such as reordering) that might break assumptions
made by the @apidoc[Sink] consuming the context elements.
## Creation
The simplest way to create a @apidoc[SourceWithContext] is to
first create a regular @apidoc[Source] with elements from which
the context can be extracted, and then use
@ref[Source.asSourceWithContext](operators/Source/asSourceWithContext.md).
## Composition
When you have a @apidoc[SourceWithContext] `source` that produces
elements of type `Foo` with a context of type `Ctx`, and a
@apidoc[Flow] `flow` from `Foo` to `Bar`, you cannot simply
`source.via(flow)` to arrive at a @apidoc[SourceWithContext] that
produces elements of type `Bar` with contexts of type `Ctx`. The
reason for this is that `flow` might reorder the elements flowing
through it, making `via` challenging to implement.
There is a @ref[Flow.asFlowWithContext](operators/Flow/asFlowWithContext.md)
which can be used when the types used in the inner
@apidoc[Flow] have room to hold the context. If this is not the
case, a better solution is usually to build the flow from the ground
up as a @apidoc[FlowWithContext], instead of first building a
@apidoc[Flow] and trying to convert it to @apidoc[FlowWithContext]
after-the-fact.

View file

@ -46,6 +46,10 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected
* by the downstream elements, such as reordering. For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*

View file

@ -43,6 +43,10 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected
* by the downstream elements, such as reordering. For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*

View file

@ -29,6 +29,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected
* by the downstream elements, such as reordering. For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*
@ -40,6 +44,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* It is up to the implementer to ensure the inner flow does not exhibit any behaviour that is not expected
* by the downstream elements, such as reordering. For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*