Use apidoc directive in stream/stream-customeze.md (#22904) (#31447)

This commit is contained in:
Andrei Arlou 2022-08-12 14:44:19 +03:00 committed by GitHub
parent cbe7cd64cd
commit 6e049efce3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -31,12 +31,12 @@ might be easy to make with a custom @ref[`GraphStage`](stream-customize.md)
<a id="graphstage"></a>
## Custom processing with GraphStage
The `GraphStage` abstraction can be used to create arbitrary operators with any number of input
or output ports. It is a counterpart of the `GraphDSL.create()` method which creates new stream processing
The @apidoc[stage.GraphStage] abstraction can be used to create arbitrary operators with any number of input
or output ports. It is a counterpart of the @apidoc[GraphDSL.create()](stream.*.GraphDSL$) {scala="#create%5BS%3C:akka.stream.Shape,IS%3C:akka.stream.Shape,Mat](graphs:Seq%5Bakka.stream.Graph%5BIS,Mat]])(buildBlock:akka.stream.scaladsl.GraphDSL.Builder%5BSeq%5BMat]]=%3E(Seq%5BIS]=%3ES)):akka.stream.Graph%5BS,Seq%5BMat]]" java="#create(java.util.List,akka.japi.function.Function2)"} method which creates new stream processing
operators by composing others. Where `GraphStage` differs is that it creates an operator that is itself not divisible into
smaller ones, and allows state to be maintained inside it in a safe way.
As a first motivating example, we will build a new `Source` that will emit numbers from 1 until it is
As a first motivating example, we will build a new @apidoc[stream.*.Source] that will emit numbers from 1 until it is
cancelled. To start, we need to define the "interface" of our operator, which is called *shape* in Akka Streams terminology
(this is explained in more detail in the section @ref:[Modularity, Composition and Hierarchy](stream-composition.md)). This is how it looks:
@ -47,9 +47,9 @@ Java
: @@snip [GraphStageDocTest.java](/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java) { #simple-source }
As you see, in itself the `GraphStage` only defines the ports of this operator and a shape that contains the ports.
It also has, a currently unimplemented method called `createLogic`. If you recall, operators are reusable in multiple
It also has, a currently unimplemented method called @apidoc[createLogic](stage.GraphStage) {scala="#createLogic(inheritedAttributes:akka.stream.Attributes):akka.stream.stage.GraphStageLogic" java="#createLogic(akka.stream.Attributes)"}. If you recall, operators are reusable in multiple
materializations, each resulting in a different executing entity. In the case of `GraphStage` the actual running
logic is modeled as an instance of a `GraphStageLogic` which will be created by the materializer by calling
logic is modeled as an instance of a @apidoc[stage.GraphStageLogic] which will be created by the materializer by calling
the `createLogic` method. In other words, all we need to do is to create a suitable logic that will emit the
numbers we want.
@ -60,9 +60,9 @@ confined to the GraphStageLogic that is created for every materialization.
@@@
In order to emit from a `Source` in a backpressured stream one needs first to have demand from downstream.
To receive the necessary events one needs to register a subclass of @scala[`OutHandler`] @java[`AbstractOutHandler`] with the output port
(`Outlet`). This handler will receive events related to the lifecycle of the port. In our case we need to
In order to emit from a @apidoc[stream.*.Source] in a backpressured stream one needs first to have demand from downstream.
To receive the necessary events one needs to register a subclass of @scala[@scaladoc[OutHandler](akka.stream.stage.OutHandler)] @java[@javadoc[AbstractOutHandler](akka.stream.stage.AbstractOutHandler)] with the output port
(@apidoc[stream.Outlet]). This handler will receive events related to the lifecycle of the port. In our case we need to
override `onPull()` which indicates that we are free to emit a single element. There is another callback,
`onDownstreamFinish()` which is called if the downstream cancelled. Since the default behavior of that callback is
to stop the operator, we don't need to override it. In the `onPull` callback we will emit the next number. This
@ -71,7 +71,7 @@ is how it looks like in the end:
Scala
: @@snip [GraphStageDocSpec.scala](/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala) { #custom-source-example }
Instances of the above `GraphStage` are subclasses of @scala[`Graph[SourceShape[Int],NotUsed]`] @java[`Graph<SourceShape<Integer>,NotUsed>`] which means
Instances of the above @apidoc[stage.GraphStage] are subclasses of @scala[`Graph[SourceShape[Int],NotUsed]`] @java[`Graph<SourceShape<Integer>,NotUsed>`] which means
that they are already usable in many situations, but do not provide the DSL methods we usually have for other
`Source` s. In order to convert this `Graph` to a proper `Source` we need to wrap it using
`Source.fromGraph` (see @ref:[Modularity, Composition and Hierarchy](stream-composition.md) for more details about operators and DSLs). Now we can use the
@ -83,11 +83,11 @@ Scala
Java
: @@snip [GraphStageDocTest.java](/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java) { #simple-source-usage }
Similarly, to create a custom `Sink` one can register a subclass `InHandler` with the operator `Inlet`.
The `onPush()` callback is used to signal the handler a new element has been pushed to the operator,
Similarly, to create a custom @apidoc[stream.*.Sink] one can register a subclass @apidoc[stage.InHandler] with the operator @apidoc[stream.Inlet].
The @apidoc[onPush()](stage.InHandler) {scala="#onPush():Unit" java="#onPush()"} callback is used to signal the handler a new element has been pushed to the operator,
and can hence be grabbed and used. `onPush()` can be overridden to provide custom behavior.
Please note, most Sinks would need to request upstream elements as soon as they are created: this can be
done by calling `pull(inlet)` in the `preStart()` callback.
done by calling @apidoc[pull(inlet)](stage.OutHandler) {scala="#onPull():Unit" java="#onPull()"} in the @apidoc[preStart()](stage.GraphStageLogic) {scala="#preStart():Unit" java="#preStart()"} callback.
Scala
: @@snip [GraphStageDocSpec.scala](/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala) { #custom-sink-example }
@ -98,28 +98,28 @@ Java
### Port states, @scala[InHandler] @java[AbstractInHandler] and @scala[OutHandler] @java[AbstractOutHandler]
In order to interact with a port (`Inlet` or `Outlet`) of the operator we need to be able to receive events
In order to interact with a port (@apidoc[stream.Inlet] or @apidoc[stream.Outlet]) of the operator we need to be able to receive events
and generate new events belonging to the port.
#### Output port
From the `GraphStageLogic` the following operations are available on an output port:
From the @apidoc[stage.GraphStageLogic] the following operations are available on an output port:
* `push(out,elem)` pushes an element to the output port. Only possible after the port has been pulled by downstream.
* `complete(out)` closes the output port normally.
* `fail(out,exception)` closes the port with a failure signal.
The events corresponding to an *output* port can be received in an @scala[`OutHandler`] @java[`AbstractOutHandler`] instance registered to the
The events corresponding to an *output* port can be received in an @scala[@scaladoc[OutHandler](akka.stream.stage.OutHandler)]@java[@javadoc[AbstractOutHandler](akka.stream.stage.AbstractOutHandler)] instance registered to the
output port using `setHandler(out,handler)`. This handler has two callbacks:
* `onPull()` is called when the output port is ready to emit the next element, `push(out, elem)` is now allowed
* @apidoc[onPull()](akka.stream.stage.OutHandler) {scala="#onPull():Unit" java="#onPull()"] is called when the output port is ready to emit the next element, `push(out, elem)` is now allowed
to be called on this port.
* `onDownstreamFinish()` is called once the downstream has cancelled and no longer allows messages to be pushed to it.
* @apidoc[onDownstreamFinish()](akka.stream.stage.OutHandler) {scala="#onDownstreamFinish(cause:Throwable):Unit" java="#onDownstreamFinish(java.lang.Throwable)"} is called once the downstream has cancelled and no longer allows messages to be pushed to it.
No more `onPull()` will arrive after this event. If not overridden this will default to stopping the operator.
Also, there are two query methods available for output ports:
* `isAvailable(out)` returns true if the port can be pushed
* @apidoc[isAvailable(out)](stage.GraphStageLogic) {scala="#isAvailable[T](out:akka.stream.Outlet[T]):Boolean" java="#isAvailable(akka.stream.Outlet)"} returns true if the port can be pushed
* `isClosed(out)` returns true if the port is closed. At this point the port can not be pushed and will not be pulled anymore.
The relationship of the above operations, events and queries are summarized in the state machine below. Green shows
@ -138,7 +138,7 @@ The following operations are available for *input* ports:
port is pushed again by the upstream.
* `cancel(in)` closes the input port.
The events corresponding to an *input* port can be received in an @scala[`InHandler`] @java[`AbstractInHandler`] instance registered to the
The events corresponding to an *input* port can be received in an @scala[@scaladoc[InHandler](akka.stream.stage.InHandler)] @java[@javadoc[AbstractInHandler](akka.stream.stage.AbstractInHandler)] instance registered to the
input port using `setHandler(in, handler)`. This handler has three callbacks:
* `onPush()` is called when the input port has now a new element. Now it is possible to acquire this element using
@ -166,8 +166,8 @@ in that state.
Finally, there are two methods available for convenience to complete the operator and all of its ports:
* `completeStage()` is equivalent to closing all output ports and cancelling all input ports.
* `failStage(exception)` is equivalent to failing all output ports and cancelling all input ports.
* @apidoc[completeStage()](stage.GraphStageLogic) {scala="#completeStage():Unit" java="#completeStage()"} is equivalent to closing all output ports and cancelling all input ports.
* @apidoc[failStage(exception)](stage.GraphStageLogic) {scala="#failStage(ex:Throwable):Unit" java="#failStage(java.lang.Throwable)"} is equivalent to failing all output ports and cancelling all input ports.
#### Emit
@ -177,7 +177,7 @@ of actions which will greatly simplify some use cases at the cost of some extra
between the two APIs could be described as that the first one is signal driven from the outside, while this API
is more active and drives its surroundings.
The operations of this part of the `GraphStage` API are:
The operations of this part of the @apidoc[stage.GraphStage] API are:
* `emit(out, elem)` and `emitMultiple(out, Iterable(elem1, elem2))` replaces the `OutHandler` with a handler that emits
one or more elements when there is demand, and then reinstalls the current handlers
@ -195,7 +195,7 @@ An example of how this API simplifies an operator can be found below in the seco
### Custom linear operators using GraphStage
To define custom linear operators, you should extend `GraphStage` using `FlowShape` which has one input and one output.
To define custom linear operators, you should extend @apidoc[stage.GraphStage] using @apidoc[stream.FlowShape] which has one input and one output.
Such an operator can be illustrated as a box with two flows as it is
seen in the illustration below. Demand flowing upstream leading to elements
@ -254,7 +254,7 @@ In this case a pull from downstream might be consumed by the operator itself rat
than passed along upstream as the operator might contain an element it wants to
push. Note that we also need to handle the case where the upstream closes while
the operator still has elements it wants to push downstream. This is done by
overriding *onUpstreamFinish* in the @scala[*InHandler*] @java[*AbstractInHandler*] and provide custom logic
overriding `onUpstreamFinish` in the @scala[@scaladoc[InHandler](akka.stream.stage.InHandler)]@java[@javadoc[AbstractInHandler](akka.stream.stage.AbstractInHandler)] and provide custom logic
that should happen when the upstream has been finished.
This example can be simplified by replacing the usage of a mutable state with calls to
@ -272,7 +272,7 @@ which conceptually would correspond to the following structure:
![graph_stage_chain.png](../images/graph_stage_chain.png)
In code this is only a few lines, using the `via` use our custom operators in a stream:
In code this is only a few lines, using the @apidoc[via](stream.*.Source) {scala="#via[T,Mat2](flow:akka.stream.Graph[akka.stream.FlowShape[Out,T],Mat2]):Source.this.Repr[T]" java="#via(akka.stream.Graph)"} use our custom operators in a stream:
Scala
: @@snip [GraphStageDocSpec.scala](/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala) { #graph-operator-chain }
@ -290,12 +290,12 @@ in circulation in a potential chain of operators, just like our conceptual "rail
Completion handling usually (but not exclusively) comes into the picture when operators need to emit
a few more elements after their upstream source has been completed. We have seen an example of this in our
first `Duplicator` implementation where the last element needs to be doubled even after the upstream neighbor
operator has been completed. This can be done by overriding the `onUpstreamFinish` method in @scala[`InHandler`]@java[`AbstractInHandler`].
operator has been completed. This can be done by overriding the `onUpstreamFinish` method in @scala[@scaladoc[InHandler](akka.stream.stage.InHandler)]@java[@javadoc[AbstractInHandler](akka.stream.stage.AbstractInHandler)].
Operators by default automatically stop once all of their ports (input and output) have been closed externally or internally.
It is possible to opt out from this behavior by invoking `setKeepGoing(true)` (which is not supported from the operators
constructor and usually done in `preStart`). In this case the operator **must** be explicitly closed by calling `completeStage()`
or `failStage(exception)`. This feature carries the risk of leaking streams and actors, therefore it should be used
constructor and usually done in `preStart`). In this case the operator **must** be explicitly closed by calling @apidoc[completeStage()](stage.GraphStageLogic) {scala="#completeStage():Unit" java="#completeStage()"}
or `@apidoc[failStage(exception)](stage.GraphStageLogic) {scala="#failStage(ex:Throwable):Unit" java="#failStage(java.lang.Throwable)"}. This feature carries the risk of leaking streams and actors, therefore it should be used
with care.
### Logging inside GraphStages
@ -305,17 +305,17 @@ more advanced operators which may need to be debugged at some point.
@@@ div { .group-scala }
The helper trait `akka.stream.operator.StageLogging` is provided to enable you to obtain a `LoggingAdapter`
inside of a `GraphStage` as long as the `Materializer` you're using is able to provide you with a logger.
In that sense, it serves a very similar purpose as `ActorLogging` does for Actors.
The helper trait @scaladoc[akka.stream.stage.StageLogging](StageLogging) is provided to enable you to obtain a @apidoc[akka.event.LoggingAdapter]
inside of a @apidoc[stage.GraphStage] as long as the @apidoc[stream.Materializer] you're using is able to provide you with a logger.
In that sense, it serves a very similar purpose as @apidoc[actor.ActorLogging] does for Actors.
@@@
@@@ div { .group-java }
You can extend the @apidoc[GraphStageLogicWithLogging] or @apidoc[TimerGraphStageLogicWithLogging] classes
instead of the usual `GraphStageLogic` to enable you to obtain a `LoggingAdapter` inside your operator as long as
the `Materializer` you're using is able to provide you with a logger.
instead of the usual @apidoc[stage.GraphStageLogic] to enable you to obtain a @apidoc[akka.event.LoggingAdapter] inside your operator as long as
the @apidoc[stream.Materializer] you're using is able to provide you with a logger.
@@@
@ -338,13 +338,13 @@ Java
@@@ note
**SPI Note:** If you're implementing a Materializer, you can add this ability to your materializer by implementing
`MaterializerLoggingProvider` in your `Materializer`.
@apidoc[stream.MaterializerLoggingProvider] in your @apidoc[stream.Materializer].
@@@
### Using timers
It is possible to use timers in `GraphStages` by using `TimerGraphStageLogic` as the base class for
It is possible to use timers in `GraphStages` by using @apidoc[stage.TimerGraphStageLogic] as the base class for
the returned logic. Timers can be scheduled by calling one of `scheduleOnce(timerKey,delay)`, `scheduleAtFixedRate(timerKey,initialDelay,interval)` or
`scheduleWithFixedDelay(timerKey,initialDelay,interval)` and passing an object as a key for that timer (can be any object, for example
a `String`). The `onTimer(timerKey)` method needs to be overridden, and it will be called once the timer of `timerKey`
@ -367,11 +367,11 @@ Java
### Using asynchronous side-channels
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
or a callback from a 3rd party API) one must acquire a `AsyncCallback` by calling `getAsyncCallback()` from the
or a callback from a 3rd party API) one must acquire a @apidoc[stage.AsyncCallback] by calling @apidoc[getAsyncCallback()](stage.GraphStageLogic) {scala="#getAsyncCallback[T](handler:T=%3EUnit):akka.stream.stage.AsyncCallback[T]" java="#getAsyncCallback(scala.Function1)"} from the
operator logic. The method `getAsyncCallback` takes as a parameter a callback that will be called once the asynchronous
event fires. It is important to **not call the callback directly**, instead, the external API must call the
`invoke(event)` method on the returned `AsyncCallback`. The execution engine will take care of calling the
provided callback in a thread-safe way. The callback can safely access the state of the `GraphStageLogic`
provided callback in a thread-safe way. The callback can safely access the state of the @apidoc[stage.GraphStageLogic]
implementation.
Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the
@ -392,9 +392,9 @@ Java
**This is a @ref:[may change](../common/may-change.md) feature***
It is possible to acquire an ActorRef that can be addressed from the outside of the operator, similarly how
`AsyncCallback` allows injecting asynchronous events into an operator logic. This reference can be obtained
@apidoc[stage.AsyncCallback] allows injecting asynchronous events into an operator logic. This reference can be obtained
by calling `getStageActor(receive)` passing in a function that takes a `Pair` of the sender
`ActorRef` and the received message. This reference can be used to watch other actors by calling its `watch(ref)`
@apidoc[akka.actor.ActorRef] and the received message. This reference can be used to watch other actors by calling its `watch(ref)`
or `unwatch(ref)` methods. The reference can be also watched by external actors. The current limitations of this
`ActorRef` are:
@ -405,9 +405,9 @@ or `unwatch(ref)` methods. The reference can be also watched by external actors.
### Custom materialized values
Custom operators can return materialized values instead of `NotUsed` by inheriting from `GraphStageWithMaterializedValue`
Custom operators can return materialized values instead of @apidoc[akka.NotUsed] by inheriting from @scala[@scaladoc[GraphStageWithMaterializedValue](akka.stream.stage.GraphStageWithMaterializedValue)]@java[@javadoc[AbstractGraphStageWithMaterializedValue](akka.stream.stage.AbstractGraphStageWithMaterializedValue)]
instead of the simpler `GraphStage`. The difference is that in this case the method
`createLogicAndMaterializedValue(inheritedAttributes)` needs to be overridden, and in addition to the
@scala[@scaladoc[createLogicAndMaterializedValue(inheritedAttributes)](akka.stream.stage.GraphStageWithMaterializedValue#createLogicAndMaterializedValue(inheritedAttributes:akka.stream.Attributes):(akka.stream.stage.GraphStageLogic,M))]@java[@javadoc[createLogicAndMaterializedValue(inheritedAttributes](akka.stream.stage.AbstractGraphStageWithMaterializedValue#createLogicAndMaterializedValue(akka.stream.Attributes))] needs to be overridden, and in addition to the
operator logic the materialized value must be provided
@@@ warning
@ -430,7 +430,7 @@ Java
**This section is a stub and will be extended in the next release**
Operators can access the `Attributes` object created by the materializer. This contains all the applied (inherited)
Operators can access the @apidoc[akka.stream.Attributes] object created by the materializer. This contains all the applied (inherited)
attributes applying to the operator, ordered from least specific (outermost) towards the most specific (innermost)
attribute. It is the responsibility of the operator to decide how to reconcile this inheritance chain to a final effective
decision.
@ -499,19 +499,19 @@ behavior.
If an operator manages a resource with a lifecycle, for example objects that need to be shutdown when they are not
used anymore it is important to make sure this will happen in all circumstances when the operator shuts down.
Cleaning up resources should be done in `GraphStageLogic.postStop` and not in the `InHandler` and `OutHandler`
Cleaning up resources should be done in @apidoc[GraphStageLogic.postStop](stage.GraphStageLogic) {scala="#postStop():Unit" java="#postStop()"} and not in the @scala[@scaladoc[InHandler](akka.stream.stage.InHandler)]@java[@javadoc[AbstractInHandler](akka.stream.stage.AbstractInHandler)] and @scala[@scaladoc[OutHandler](akka.stream.stage.OutHandler)] @java[@javadoc[AbstractOutHandler](akka.stream.stage.AbstractOutHandler)]
callbacks. The reason for this is that when the operator itself completes or is failed there is no signal from the upstreams
or the downstreams. Even for operators that do not complete or fail in this manner, this can happen when the
`Materializer` is shutdown or the `ActorSystem` is terminated while a stream is still running, what is called an
@apidoc[akka.stream.Materializer] is shutdown or the @apidoc[akka.actor.ActorSystem] is terminated while a stream is still running, what is called an
"abrupt termination".
@@@ div { .group-scala }
## Extending Flow Operators with Custom Operators
The most general way of extending any `Source`, `Flow` or `SubFlow` (e.g. from `groupBy`) is
demonstrated above: create an operator of flow-shape like the `Duplicator` example given above and use the `.via(...)`
operator to integrate it into your stream topology. This works with all `FlowOps` sub-types, including the
The most general way of extending any @apidoc[Source], @apidoc[Flow] or @apidoc[SubFlow](stream.*.SubFlow) (e.g. from `groupBy`) is
demonstrated above: create an operator of flow-shape like the `Duplicator` example given above and use the @apidoc[.via(...)](stream.*.Source) {scala="#via[T,Mat2](flow:akka.stream.Graph[akka.stream.FlowShape[Out,T],Mat2]):Source.this.Repr[T]" java="#via(akka.stream.Graph)"}
operator to integrate it into your stream topology. This works with all @scaladoc[FlowOps](akka.stream.scaladsl.FlowOps) sub-types, including the
ports that you connect with the graph DSL.
Advanced Scala users may wonder whether it is possible to write extension methods that enrich `FlowOps` to