Merge remote-tracking branch 'pr/19106' into release-2.3-dev
BIN
akka-docs-dev/rst/images/graph_stage_chain.png
Normal file
|
After Width: | Height: | Size: 204 KiB |
3
akka-docs-dev/rst/images/graph_stage_chain.svg
Normal file
|
After Width: | Height: | Size: 14 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_conceptual.png
Normal file
|
After Width: | Height: | Size: 97 KiB |
3
akka-docs-dev/rst/images/graph_stage_conceptual.svg
Normal file
|
After Width: | Height: | Size: 7 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_detached_tracks_1.png
Normal file
|
After Width: | Height: | Size: 108 KiB |
|
After Width: | Height: | Size: 9.4 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_detached_tracks_2.png
Normal file
|
After Width: | Height: | Size: 102 KiB |
|
After Width: | Height: | Size: 8 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_duplicate.png
Normal file
|
After Width: | Height: | Size: 71 KiB |
3
akka-docs-dev/rst/images/graph_stage_duplicate.svg
Normal file
|
After Width: | Height: | Size: 6.4 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_filter.png
Normal file
|
After Width: | Height: | Size: 69 KiB |
3
akka-docs-dev/rst/images/graph_stage_filter.svg
Normal file
|
After Width: | Height: | Size: 5.7 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_map.png
Normal file
|
After Width: | Height: | Size: 57 KiB |
3
akka-docs-dev/rst/images/graph_stage_map.svg
Normal file
|
After Width: | Height: | Size: 5.2 KiB |
BIN
akka-docs-dev/rst/images/graph_stage_tracks_1.png
Normal file
|
After Width: | Height: | Size: 196 KiB |
3
akka-docs-dev/rst/images/graph_stage_tracks_1.svg
Normal file
|
After Width: | Height: | Size: 12 KiB |
|
|
@ -9,6 +9,12 @@ is sometimes necessary to define new transformation stages either because some f
|
||||||
stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph
|
stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph
|
||||||
junctions of various kinds.
|
junctions of various kinds.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
A custom graph stage should not be the first tool you reach for, defining graphs using flows
|
||||||
|
and the graph DSL is in general easier and does to a larger extent protect you from mistakes that
|
||||||
|
might be easy to make with a custom :class:`GraphStage`
|
||||||
|
|
||||||
|
|
||||||
.. _graphstage-java:
|
.. _graphstage-java:
|
||||||
|
|
||||||
Custom processing with GraphStage
|
Custom processing with GraphStage
|
||||||
|
|
@ -79,7 +85,7 @@ in that state.
|
||||||
|
|
|
|
||||||
|
|
||||||
.. image:: ../images/outport_transitions.png
|
.. image:: ../images/outport_transitions.png
|
||||||
:align: center
|
:align: center
|
||||||
|
|
||||||
|
|
|
|
||||||
|
|
||||||
|
|
@ -115,7 +121,7 @@ in that state.
|
||||||
|
|
|
|
||||||
|
|
||||||
.. image:: ../images/inport_transitions.png
|
.. image:: ../images/inport_transitions.png
|
||||||
:align: center
|
:align: center
|
||||||
|
|
||||||
|
|
|
|
||||||
|
|
||||||
|
|
@ -125,22 +131,160 @@ Finally, there are two methods available for convenience to complete the stage a
|
||||||
* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports.
|
* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports.
|
||||||
|
|
||||||
|
|
||||||
|
In some cases it is inconvenient and error prone to react on the regular state machine events with the
|
||||||
|
signal based API described above. For those cases there is a API which allows for a more declarative sequencing
|
||||||
|
of actions which will greatly simplify some use cases at the cost of some extra allocations. The difference
|
||||||
|
between the two APIs could be described as that the first one is signal driven from the outside, while this API
|
||||||
|
is more active and drives its surroundings.
|
||||||
|
|
||||||
|
The operations of this part of the :class:``GraphStage`` API are:
|
||||||
|
|
||||||
|
* ``emit(out, elem)`` and ``emitMultiple(out, Iterable(elem1, elem2))`` replaces the ``OutHandler`` with a handler that emits
|
||||||
|
one or more elements when there is demand, and then reinstalls the current handlers
|
||||||
|
* ``read(in)(andThen)`` and ``readN(in, n)(andThen)`` replaces the ``InHandler`` with a handler that reads one or
|
||||||
|
more elements as they are pushed and allows the handler to react once the requested number of elements has been read.
|
||||||
|
* ``abortEmitting()`` and ``abortReading()`` which will cancel an ongoing emit or read
|
||||||
|
|
||||||
|
Note that since the above methods are implemented by temporarily replacing the handlers of the stage you should never
|
||||||
|
call ``setHandler`` while they are running ``emit`` or ``read`` as that interferes with how they are implemented.
|
||||||
|
The following methods are safe to call after invoking ``emit`` and ``read`` (and will lead to actually running the
|
||||||
|
operation when those are done): ``complete(out)``, ``completeStage()``, ``emit``, ``emitMultiple``, ``abortEmitting()``
|
||||||
|
and ``abortReading()``
|
||||||
|
|
||||||
|
An example of how this API simplifies a stage can be found below in the second version of the :class:``Duplicator``.
|
||||||
|
|
||||||
|
Custom linear processing stages using GraphStage
|
||||||
|
------------------------------------------------
|
||||||
|
|
||||||
|
Graph stages allows for custom linear processing stages through letting them
|
||||||
|
have one input and one output and using :class:`FlowShape` as their shape.
|
||||||
|
|
||||||
|
Such a stage can be illustrated as a box with two flows as it is
|
||||||
|
seen in the illustration below. Demand flowing upstream leading to elements
|
||||||
|
flowing downstream.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_conceptual.png
|
||||||
|
:align: center
|
||||||
|
:width: 500
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
|
||||||
|
To illustrate these concepts we create a small :class:`GraphStage` that implements the ``map`` transformation.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_map.png
|
||||||
|
:align: center
|
||||||
|
:width: 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
Map calls ``push(out)`` from the ``onPush()`` handler and it also calls ``pull()`` from the ``onPull`` handler resulting in the
|
||||||
|
conceptual wiring above, and fully expressed in code below:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#one-to-one
|
||||||
|
|
||||||
|
Map is a typical example of a one-to-one transformation of a stream where
|
||||||
|
demand is passed along upstream elements passed on downstream.
|
||||||
|
|
||||||
|
To demonstrate a many-to-one stage we will implement
|
||||||
|
filter. The conceptual wiring of ``Filter`` looks like this:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_filter.png
|
||||||
|
:align: center
|
||||||
|
:width: 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
|
||||||
|
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
|
||||||
|
we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map
|
||||||
|
example by adding a conditional in the ``onPush`` handler and decide between a ``pull(in)`` or ``push(out)`` call
|
||||||
|
(and of course not having a mapping ``f`` function).
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#many-to-one
|
||||||
|
|
||||||
|
To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage
|
||||||
|
that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_duplicate.png
|
||||||
|
:align: center
|
||||||
|
:width: 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
This is a stage that has state: an option with the last element it has seen indicating if it
|
||||||
|
has duplicated this last element already or not. We must also make sure to emit the extra element
|
||||||
|
if the upstream completes.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#one-to-many
|
||||||
|
|
||||||
|
In this case a pull from downstream might be consumed by the stage itself rather
|
||||||
|
than passed along upstream as the stage might contain an element it wants to
|
||||||
|
push. Note that we also need to handle the case where the upstream closes while
|
||||||
|
the stage still has elements it wants to push downstream. This is done by
|
||||||
|
overriding `onUpstreamFinish` in the `AbstractInHandler` and provide custom logic
|
||||||
|
that should happen when the upstream has been finished.
|
||||||
|
|
||||||
|
This example can be simplified by replacing the usage of a mutable state with calls to
|
||||||
|
``emitMultiple`` which will replace the handlers, emit each of multiple elements and then
|
||||||
|
reinstate the original handlers:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#simpler-one-to-many
|
||||||
|
|
||||||
|
Finally, to demonstrate all of the stages above, we put them together into a processing chain,
|
||||||
|
which conceptually would correspond to the following structure:
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_chain.png
|
||||||
|
:align: center
|
||||||
|
:width: 700
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
In code this is only a few lines, using the ``via`` use our custom stages in a stream:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#graph-stage-chain
|
||||||
|
|
||||||
|
If we attempt to draw the sequence of events, it shows that there is one "event token"
|
||||||
|
in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_tracks_1.png
|
||||||
|
:align: center
|
||||||
|
:width: 700
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
|
||||||
Completion
|
Completion
|
||||||
----------
|
----------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit
|
||||||
|
a few more elements after their upstream source has been completed. We have seen an example of this in our
|
||||||
|
first :class:`Duplicator` implementation where the last element needs to be doubled even after the upstream neighbor
|
||||||
|
stage has been completed. This can be done by overriding the ``onUpstreamFinish`` method in ``AbstractInHandler``.
|
||||||
|
|
||||||
Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally.
|
Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally.
|
||||||
It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in
|
It is possible to opt out from this behavior by invoking ``setKeepGoing(true)`` (which is not supported from the stage’s
|
||||||
the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()``
|
constructor and usually done in ``preStart``). In this case the stage **must** be explicitly closed by calling ``completeStage()``
|
||||||
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
|
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
|
||||||
with care.
|
with care.
|
||||||
|
|
||||||
Using timers
|
Using timers
|
||||||
------------
|
------------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
|
||||||
|
|
||||||
It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for
|
It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for
|
||||||
the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or
|
the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or
|
||||||
``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example
|
``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example
|
||||||
|
|
@ -151,11 +295,14 @@ fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the
|
||||||
Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the
|
Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the
|
||||||
``preStart()`` lifecycle hook.
|
``preStart()`` lifecycle hook.
|
||||||
|
|
||||||
|
In this sample the stage toggles between open and closed, where open means no elements are passed through. The
|
||||||
|
stage starts out as closed but as soon as an element is pushed downstream the gate becomes open for a duration
|
||||||
|
of time during which it will consume and drop upstream messages:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#timed
|
||||||
|
|
||||||
Using asynchronous side-channels
|
Using asynchronous side-channels
|
||||||
--------------------------------
|
--------------------------------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
|
||||||
|
|
||||||
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
|
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
|
||||||
or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the
|
or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the
|
||||||
stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous
|
stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous
|
||||||
|
|
@ -167,6 +314,13 @@ implementation.
|
||||||
Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the
|
Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the
|
||||||
``preStart()`` lifecycle hook instead.
|
``preStart()`` lifecycle hook instead.
|
||||||
|
|
||||||
|
|
||||||
|
This example shows an asynchronous side channel graph stage that starts dropping elements
|
||||||
|
when a future completes:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#async-side-channel
|
||||||
|
|
||||||
|
|
||||||
Integration with actors
|
Integration with actors
|
||||||
-----------------------
|
-----------------------
|
||||||
|
|
||||||
|
|
@ -188,8 +342,6 @@ or ``unwatch(ref)`` methods. The reference can be also watched by external actor
|
||||||
Custom materialized values
|
Custom materialized values
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
|
||||||
|
|
||||||
Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue`
|
Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue`
|
||||||
instead of the simpler :class:`GraphStage`. The difference is that in this case the method
|
instead of the simpler :class:`GraphStage`. The difference is that in this case the method
|
||||||
``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the
|
``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the
|
||||||
|
|
@ -200,6 +352,10 @@ stage logic the materialized value must be provided
|
||||||
the thread that got hold of the materialized value. It is the responsibility of the programmer to add the
|
the thread that got hold of the materialized value. It is the responsibility of the programmer to add the
|
||||||
necessary (non-blocking) synchronization and visibility guarantees to this shared object.
|
necessary (non-blocking) synchronization and visibility guarantees to this shared object.
|
||||||
|
|
||||||
|
In this sample the materialized value is a future containing the first element to go through the stream:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#materialized
|
||||||
|
|
||||||
Using attributes to affect the behavior of a stage
|
Using attributes to affect the behavior of a stage
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
||||||
|
|
@ -213,237 +369,52 @@ decision.
|
||||||
See :ref:`composition-java` for an explanation on how attributes work.
|
See :ref:`composition-java` for an explanation on how attributes work.
|
||||||
|
|
||||||
|
|
||||||
Custom linear processing stages
|
Rate decoupled graph stages
|
||||||
===============================
|
---------------------------
|
||||||
|
|
||||||
To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method
|
Sometimes it is desirable to *decouple* the rate of the upstream and downstream of a stage, synchronizing only
|
||||||
which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this
|
when needed.
|
||||||
page.
|
|
||||||
|
|
||||||
.. _stream-using-push-pull-stage-java:
|
This is achieved in the model by representing a :class:`GraphStage` as a *boundary* between two regions where the
|
||||||
|
demand sent upstream is decoupled from the demand that arrives from downstream. One immediate consequence of this
|
||||||
|
difference is that an ``onPush`` call does not always lead to calling ``push`` and an ``onPull`` call does not always
|
||||||
|
lead to calling ``pull``.
|
||||||
|
|
||||||
Using PushPullStage
|
One of the important use-case for this is to build buffer-like entities, that allow independent progress
|
||||||
-------------------
|
|
||||||
|
|
||||||
The most elementary transformation stage is the :class:`PushPullStage` which can express a large class of algorithms
|
|
||||||
working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is
|
|
||||||
seen in the illustration below.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_conceptual.png
|
|
||||||
:align: center
|
|
||||||
:width: 600
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports"
|
|
||||||
correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling
|
|
||||||
exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly.
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
There is one very important rule to remember when working with a ``Stage``. **Exactly one** method should be called
|
|
||||||
on the **currently passed** :class:`Context` **exactly once** and as the **last statement of the handler** where the return type
|
|
||||||
of the called method **matches the expected return type of the handler**. Any violation of this rule will
|
|
||||||
almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions
|
|
||||||
to this rule are the query methods ``isHolding()`` and ``isFinishing()``
|
|
||||||
|
|
||||||
To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_map.png
|
|
||||||
:align: center
|
|
||||||
:width: 300
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull``
|
|
||||||
handler resulting in the conceptual wiring above, and fully expressed in code below:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#one-to-one
|
|
||||||
|
|
||||||
Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement
|
|
||||||
filter. The conceptual wiring of ``Filter`` looks like this:
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_filter.png
|
|
||||||
:align: center
|
|
||||||
:width: 300
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
|
|
||||||
we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map
|
|
||||||
example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call
|
|
||||||
(and of course not having a mapping ``f`` function).
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#many-to-one
|
|
||||||
|
|
||||||
To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage
|
|
||||||
that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_doubler.png
|
|
||||||
:align: center
|
|
||||||
:width: 300
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we
|
|
||||||
have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull``
|
|
||||||
method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we
|
|
||||||
will explain a little bit later. For now it is enough to look at the ``if(!ctx.isFinishing)`` block which
|
|
||||||
corresponds to the logic we expect by looking at the conceptual picture.
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#one-to-many
|
|
||||||
|
|
||||||
Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually
|
|
||||||
would correspond to the following structure:
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_chain.png
|
|
||||||
:align: center
|
|
||||||
:width: 650
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#stage-chain
|
|
||||||
|
|
||||||
If we attempt to draw the sequence of events, it shows that there is one "event token"
|
|
||||||
in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_general.png
|
|
||||||
:align: center
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
Completion handling
|
|
||||||
^^^^^^^^^^^^^^^^^^^
|
|
||||||
|
|
||||||
Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit a few
|
|
||||||
more elements after their upstream source has been completed. We have seen an example of this in our ``Duplicator`` class
|
|
||||||
where the last element needs to be doubled even after the upstream neighbor stage has been completed. Since the
|
|
||||||
``onUpstreamFinish()`` handler expects a :class:`TerminationDirective` as the return type we are only allowed to call
|
|
||||||
``ctx.finish()``, ``ctx.fail()`` or ``ctx.absorbTermination()``. Since the first two of these available methods will
|
|
||||||
immediately terminate, our only option is ``absorbTermination()``. It is also clear from the return type of
|
|
||||||
``onUpstreamFinish`` that we cannot call ``ctx.push()`` but we need to emit elements somehow! The trick is that after
|
|
||||||
calling ``absorbTermination()`` the ``onPull()`` handler will be called eventually, and at the same time
|
|
||||||
``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to
|
|
||||||
emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing.
|
|
||||||
|
|
||||||
The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of
|
|
||||||
Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that
|
|
||||||
our push/pull structure that was illustrated in the figure of our custom processing chain does not
|
|
||||||
apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a
|
|
||||||
pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling
|
|
||||||
``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at
|
|
||||||
that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was
|
|
||||||
*below* (which means that it will come back eventually, so the environment does not need to call anything yet).
|
|
||||||
|
|
||||||
The first of the two scenarios is when a termination signal arrives after a stage passed the event to its downstream. As
|
|
||||||
we can see in the following diagram, there is no need to do anything by ``absorbTermination()`` since the black arrows
|
|
||||||
representing the movement of the "event token" is uninterrupted.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_absorb_1.png
|
|
||||||
:align: center
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case
|
|
||||||
``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone
|
|
||||||
(since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_absorb_2.png
|
|
||||||
:align: center
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is
|
|
||||||
whether it is the downstream or the ``absorbTermination()`` call that calls the event handler.
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
It is not allowed to call ``absorbTermination()`` from ``onDownstreamFinish()``. If the method is called anyway,
|
|
||||||
it will be logged at ``ERROR`` level, but no further action will be taken as at that point there is no active
|
|
||||||
downstream to propagate the error to. Cancellation in the upstream direction will continue undisturbed.
|
|
||||||
|
|
||||||
Using PushStage
|
|
||||||
---------------
|
|
||||||
|
|
||||||
Many one-to-one and many-to-one transformations do not need to override the ``onPull()`` handler at all since all
|
|
||||||
they do is just propagate the pull upwards. For such transformations it is better to extend PushStage directly. For
|
|
||||||
example our ``Map`` and ``Filter`` would look like this:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#pushstage
|
|
||||||
|
|
||||||
The reason to use ``PushStage`` is not just cosmetic: internal optimizations rely on the fact that the onPull method
|
|
||||||
only calls ``ctx.pull()`` and allow the environment do process elements faster than without this knowledge. By
|
|
||||||
extending ``PushStage`` the environment can be sure that ``onPull()`` was not overridden since it is ``final`` on
|
|
||||||
``PushStage``.
|
|
||||||
|
|
||||||
Using DetachedStage
|
|
||||||
-------------------
|
|
||||||
|
|
||||||
The model described in previous sections, while conceptually simple, cannot describe all desired stages. The main
|
|
||||||
limitation is the "single-ball" (single "event token") model which prevents independent progress of an upstream and
|
|
||||||
downstream of a stage. Sometimes it is desirable to *detach* the progress (and therefore, rate) of the upstream and
|
|
||||||
downstream of a stage, synchronizing only when needed.
|
|
||||||
|
|
||||||
This is achieved in the model by representing a :class:`DetachedStage` as a *boundary* between two "single-ball" regions.
|
|
||||||
One immediate consequence of this difference is that **it is not allowed to call** ``ctx.pull()`` **from** ``onPull()`` **and
|
|
||||||
it is not allowed to call** ``ctx.push()`` **from** ``onPush()`` as such combinations would "steal" a token from one region
|
|
||||||
(resulting in zero tokens left) and would inject an unexpected second token to the other region. This is enforced
|
|
||||||
by the expected return types of these callback functions.
|
|
||||||
|
|
||||||
One of the important use-cases for :class:`DetachedStage` is to build buffer-like entities, that allow independent progress
|
|
||||||
of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the
|
of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the
|
||||||
buffer becomes empty or full. The next diagram illustrates the event sequence for a buffer with capacity of two elements.
|
buffer becomes empty or full.
|
||||||
|
|
||||||
|
The next diagram illustrates the event sequence for a buffer with capacity of two elements in a setting where
|
||||||
|
the downstream demand is slow to start and the buffer will fill up with upstream elements before any demand
|
||||||
|
is seen from downstream.
|
||||||
|
|
||||||
|
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_buffer.png
|
.. image:: ../images/graph_stage_detached_tracks_1.png
|
||||||
:align: center
|
:align: center
|
||||||
|
:width: 500
|
||||||
|
|
||||||
|
|
|
|
||||||
|
|
||||||
The very first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on
|
Another scenario would be where the demand from downstream starts coming in before any element is pushed
|
||||||
initialization. Remember that it is forbidden to call ``ctx.pull`` from ``onPull``, therefore it is the task of the
|
into the buffer stage.
|
||||||
framework to kick off the first "event token" in the upstream region, which will remain there until the upstream stages
|
|
||||||
stop. The diagram distinguishes between the actions of the two regions by colors: *purple* arrows indicate the actions
|
|
||||||
involving the upstream "event token", while *red* arrows show the downstream region actions. This demonstrates the clear
|
|
||||||
separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged.
|
|
||||||
|
|
||||||
For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream
|
|
||||||
or downstream. The new API calls that are available for :class:`DetachedStage` s are the various ``ctx.holdXXX()`` methods
|
|
||||||
, ``ctx.pushAndPull()`` and variants, and ``ctx.isHoldingXXX()``.
|
|
||||||
Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding
|
|
||||||
region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding()``
|
|
||||||
which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not
|
|
||||||
both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only
|
|
||||||
possible by calling ``ctx.pushAndPull()``. This is to ensure that both the held token is released, and the triggering region
|
|
||||||
gets its token back (one inbound token + one held token = two released tokens).
|
|
||||||
|
|
||||||
The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed.
|
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#detached
|
.. image:: ../images/graph_stage_detached_tracks_2.png
|
||||||
|
:align: center
|
||||||
|
:width: 500
|
||||||
|
|
||||||
.. warning::
|
|
|
||||||
If ``absorbTermination()`` is called on a :class:`DetachedStage` while it holds downstream (``isHoldingDownstream``
|
|
||||||
returns true) then ``onPull()`` will be called on the stage. This ensures that the stage does not end up in a
|
|
||||||
deadlocked case. Since at the point when the termination is absorbed there will be no way to get any callbacks because
|
The first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on
|
||||||
the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination
|
initialization. The buffer has demand for up to two elements without any downstream demand.
|
||||||
logic already shown for :class:`PushPullStage`.
|
|
||||||
|
The following code example demonstrates a buffer class corresponding to the message sequence chart above.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached
|
||||||
|
|
||||||
Thread safety of custom processing stages
|
Thread safety of custom processing stages
|
||||||
=========================================
|
=========================================
|
||||||
|
|
|
||||||
|
|
@ -3,15 +3,16 @@
|
||||||
*/
|
*/
|
||||||
package docs.stream
|
package docs.stream
|
||||||
|
|
||||||
import akka.stream.javadsl.Sink
|
import akka.stream.scaladsl.{ Keep, Sink, Flow, Source }
|
||||||
import akka.stream.scaladsl.Source
|
import akka.stream.stage._
|
||||||
import akka.stream.stage.{ OutHandler, GraphStage, GraphStageLogic }
|
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
|
|
||||||
import akka.stream.testkit.AkkaSpec
|
import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec }
|
||||||
|
|
||||||
import scala.concurrent.{ Await, Future }
|
import scala.collection.mutable
|
||||||
|
import scala.concurrent.{ Promise, Await, Future }
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
import scala.collection.immutable.Iterable
|
||||||
|
|
||||||
class GraphStageDocSpec extends AkkaSpec {
|
class GraphStageDocSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
|
@ -83,4 +84,425 @@ class GraphStageDocSpec extends AkkaSpec {
|
||||||
Await.result(result2, 3.seconds) should ===(5050)
|
Await.result(result2, 3.seconds) should ===(5050)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#one-to-one
|
||||||
|
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("Map.in")
|
||||||
|
val out = Outlet[B]("Map.out")
|
||||||
|
|
||||||
|
override val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(attr: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
push(out, f(grab(in)))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#one-to-one
|
||||||
|
|
||||||
|
"Demonstrate a one to one element GraphStage" in {
|
||||||
|
// tests:
|
||||||
|
val stringLength = Flow.fromGraph(new Map[String, Int](_.length))
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(Vector("one", "two", "three"))
|
||||||
|
.via(stringLength)
|
||||||
|
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(Seq(3, 3, 5))
|
||||||
|
}
|
||||||
|
|
||||||
|
//#many-to-one
|
||||||
|
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("Filter.in")
|
||||||
|
val out = Outlet[A]("Filter.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
val elem = grab(in)
|
||||||
|
if (p(elem)) push(out, elem)
|
||||||
|
else pull(in)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#many-to-one
|
||||||
|
|
||||||
|
"Demonstrate a many to one element GraphStage" in {
|
||||||
|
|
||||||
|
// tests:
|
||||||
|
val evenFilter = Flow.fromGraph(new Filter[Int](_ % 2 == 0))
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(Vector(1, 2, 3, 4, 5, 6))
|
||||||
|
.via(evenFilter)
|
||||||
|
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(Seq(2, 4, 6))
|
||||||
|
}
|
||||||
|
|
||||||
|
//#one-to-many
|
||||||
|
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("Duplicator.in")
|
||||||
|
val out = Outlet[A]("Duplicator.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
// Again: note that all mutable state
|
||||||
|
// MUST be inside the GraphStageLogic
|
||||||
|
var lastElem: Option[A] = None
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
val elem = grab(in)
|
||||||
|
lastElem = Some(elem)
|
||||||
|
push(out, elem)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit = {
|
||||||
|
if (lastElem.isDefined) emit(out, lastElem.get)
|
||||||
|
complete(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
if (lastElem.isDefined) {
|
||||||
|
push(out, lastElem.get)
|
||||||
|
lastElem = None
|
||||||
|
} else {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#one-to-many
|
||||||
|
|
||||||
|
"Demonstrate a one to many element GraphStage" in {
|
||||||
|
// tests:
|
||||||
|
val duplicator = Flow.fromGraph(new Duplicator[Int])
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(Vector(1, 2, 3))
|
||||||
|
.via(duplicator)
|
||||||
|
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Demonstrate a simpler one to many stage" in {
|
||||||
|
//#simpler-one-to-many
|
||||||
|
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("Duplicator.in")
|
||||||
|
val out = Outlet[A]("Duplicator.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
val elem = grab(in)
|
||||||
|
// this will temporarily suspend this handler until the two elems
|
||||||
|
// are emitted and then reinstates it
|
||||||
|
emitMultiple(out, Iterable(elem, elem))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#simpler-one-to-many
|
||||||
|
|
||||||
|
// tests:
|
||||||
|
val duplicator = Flow.fromGraph(new Duplicator[Int])
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(Vector(1, 2, 3))
|
||||||
|
.via(duplicator)
|
||||||
|
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
"Demonstrate chaining of graph stages" in {
|
||||||
|
val sink = Sink.fold[List[Int], Int](List.empty[Int])((acc, n) => acc :+ n)
|
||||||
|
|
||||||
|
//#graph-stage-chain
|
||||||
|
val resultFuture = Source(1 to 5)
|
||||||
|
.via(new Filter(_ % 2 == 0))
|
||||||
|
.via(new Duplicator())
|
||||||
|
.via(new Map(_ / 2))
|
||||||
|
.runWith(sink)
|
||||||
|
|
||||||
|
//#graph-stage-chain
|
||||||
|
|
||||||
|
Await.result(resultFuture, 3.seconds) should ===(List(1, 1, 2, 2))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Demonstrate an asynchronous side channel" in {
|
||||||
|
import system.dispatcher
|
||||||
|
//#async-side-channel
|
||||||
|
// will close upstream when the future completes
|
||||||
|
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("KillSwitch.in")
|
||||||
|
val out = Outlet[A]("KillSwitch.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
|
||||||
|
override def preStart(): Unit = {
|
||||||
|
val callback = getAsyncCallback[Unit] { (_) =>
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
switch.foreach(callback.invoke)
|
||||||
|
}
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = { push(out, grab(in)) }
|
||||||
|
})
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = { pull(in) }
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#async-side-channel
|
||||||
|
|
||||||
|
// tests:
|
||||||
|
val switch = Promise[Unit]()
|
||||||
|
val duplicator = Flow.fromGraph(new KillSwitch[Int](switch.future))
|
||||||
|
|
||||||
|
// TODO this is probably racey, is there a way to make sure it happens after?
|
||||||
|
val valueAfterKill = switch.future.flatMap(_ => Future(4))
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(Vector(1, 2, 3)).concat(Source.fromFuture(valueAfterKill))
|
||||||
|
.via(duplicator)
|
||||||
|
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||||
|
|
||||||
|
switch.success(Unit)
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(Seq(1, 2, 3))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Demonstrate a graph stage with a timer" in {
|
||||||
|
|
||||||
|
//#timed
|
||||||
|
// each time an event is pushed through it will trigger a period of silence
|
||||||
|
class TimedGate[A](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("TimedGate.in")
|
||||||
|
val out = Outlet[A]("TimedGate.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new TimerGraphStageLogic(shape) {
|
||||||
|
|
||||||
|
var open = false
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
val elem = grab(in)
|
||||||
|
if (open) pull(in)
|
||||||
|
else {
|
||||||
|
push(out, elem)
|
||||||
|
open = true
|
||||||
|
scheduleOnce(None, silencePeriod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = { pull(in) }
|
||||||
|
})
|
||||||
|
|
||||||
|
override protected def onTimer(timerKey: Any): Unit = {
|
||||||
|
open = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#timed
|
||||||
|
|
||||||
|
// tests:
|
||||||
|
val result =
|
||||||
|
Source(Vector(1, 2, 3))
|
||||||
|
.via(new TimedGate[Int](2.second))
|
||||||
|
.takeWithin(250.millis)
|
||||||
|
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(Seq(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Demonstrate a custom materialized value" in {
|
||||||
|
|
||||||
|
//#materialized
|
||||||
|
class FirstValue[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("FirstValue.in")
|
||||||
|
val out = Outlet[A]("FirstValue.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[A]) = {
|
||||||
|
val promise = Promise[A]()
|
||||||
|
val logic = new GraphStageLogic(shape) {
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
val elem = grab(in)
|
||||||
|
promise.success(elem)
|
||||||
|
push(out, elem)
|
||||||
|
|
||||||
|
// replace handler with one just forwarding
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
push(out, grab(in))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
(logic, promise.future)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#materialized
|
||||||
|
|
||||||
|
// tests:
|
||||||
|
val flow = Source(Vector(1, 2, 3))
|
||||||
|
.viaMat(new FirstValue)(Keep.right)
|
||||||
|
.to(Sink.ignore)
|
||||||
|
|
||||||
|
val result: Future[Int] = flow.run()
|
||||||
|
|
||||||
|
Await.result(result, 3.seconds) should ===(1)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
"Demonstrate a detached graph stage" in {
|
||||||
|
|
||||||
|
//#detached
|
||||||
|
class TwoBuffer[A] extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
|
val in = Inlet[A]("TwoBuffer.in")
|
||||||
|
val out = Outlet[A]("TwoBuffer.out")
|
||||||
|
|
||||||
|
val shape = FlowShape.of(in, out)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) {
|
||||||
|
|
||||||
|
val buffer = mutable.Queue[A]()
|
||||||
|
def bufferFull = buffer.size == 2
|
||||||
|
var downstreamWaiting = false
|
||||||
|
|
||||||
|
override def preStart(): Unit = {
|
||||||
|
// a detached stage needs to start upstream demand
|
||||||
|
// itself as it is not triggered by downstream demand
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
val elem = grab(in)
|
||||||
|
buffer.enqueue(elem)
|
||||||
|
if (downstreamWaiting) {
|
||||||
|
downstreamWaiting = false
|
||||||
|
val bufferedElem = buffer.dequeue()
|
||||||
|
push(out, bufferedElem)
|
||||||
|
}
|
||||||
|
if (!bufferFull) {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit = {
|
||||||
|
if (buffer.nonEmpty) {
|
||||||
|
// emit the rest if possible
|
||||||
|
emitMultiple(out, buffer.toIterator)
|
||||||
|
}
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
if (buffer.isEmpty) {
|
||||||
|
downstreamWaiting = true
|
||||||
|
} else {
|
||||||
|
val elem = buffer.dequeue
|
||||||
|
push(out, elem)
|
||||||
|
}
|
||||||
|
if (!bufferFull && !hasBeenPulled(in)) {
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
//#detached
|
||||||
|
|
||||||
|
// tests:
|
||||||
|
val result1 = Source(Vector(1, 2, 3))
|
||||||
|
.via(new TwoBuffer)
|
||||||
|
.runFold(Vector.empty[Int])((acc, n) => acc :+ n)
|
||||||
|
|
||||||
|
Await.result(result1, 3.seconds) should ===(Vector(1, 2, 3))
|
||||||
|
|
||||||
|
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||||
|
val publisher = TestPublisher.probe[Int]()
|
||||||
|
val flow2 =
|
||||||
|
Source.fromPublisher(publisher)
|
||||||
|
.via(new TwoBuffer)
|
||||||
|
.to(Sink.fromSubscriber(subscriber))
|
||||||
|
|
||||||
|
val result2 = flow2.run()
|
||||||
|
|
||||||
|
val sub = subscriber.expectSubscription()
|
||||||
|
// this happens even though the subscriber has not signalled any demand
|
||||||
|
publisher.sendNext(1)
|
||||||
|
publisher.sendNext(2)
|
||||||
|
|
||||||
|
sub.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -9,6 +9,12 @@ is sometimes necessary to define new transformation stages either because some f
|
||||||
stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph
|
stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph
|
||||||
junctions of various kinds.
|
junctions of various kinds.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
A custom graph stage should not be the first tool you reach for, defining graphs using flows
|
||||||
|
and the graph DSL is in general easier and does to a larger extent protect you from mistakes that
|
||||||
|
might be easy to make with a custom :class:`GraphStage`
|
||||||
|
|
||||||
|
|
||||||
.. _graphstage-scala:
|
.. _graphstage-scala:
|
||||||
|
|
||||||
Custom processing with GraphStage
|
Custom processing with GraphStage
|
||||||
|
|
@ -129,22 +135,162 @@ Finally, there are two methods available for convenience to complete the stage a
|
||||||
* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports.
|
* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports.
|
||||||
|
|
||||||
|
|
||||||
|
In some cases it is inconvenient and error prone to react on the regular state machine events with the
|
||||||
|
signal based API described above. For those cases there is a API which allows for a more declarative sequencing
|
||||||
|
of actions which will greatly simplify some use cases at the cost of some extra allocations. The difference
|
||||||
|
between the two APIs could be described as that the first one is signal driven from the outside, while this API
|
||||||
|
is more active and drives its surroundings.
|
||||||
|
|
||||||
|
The operations of this part of the :class:``GraphStage`` API are:
|
||||||
|
|
||||||
|
* ``emit(out, elem)`` and ``emitMultiple(out, Iterable(elem1, elem2))`` replaces the ``OutHandler`` with a handler that emits
|
||||||
|
one or more elements when there is demand, and then reinstalls the current handlers
|
||||||
|
* ``read(in)(andThen)`` and ``readN(in, n)(andThen)`` replaces the ``InHandler`` with a handler that reads one or
|
||||||
|
more elements as they are pushed and allows the handler to react once the requested number of elements has been read.
|
||||||
|
* ``abortEmitting()`` and ``abortReading()`` which will cancel an ongoing emit or read
|
||||||
|
|
||||||
|
Note that since the above methods are implemented by temporarily replacing the handlers of the stage you should never
|
||||||
|
call ``setHandler`` while they are running ``emit`` or ``read`` as that interferes with how they are implemented.
|
||||||
|
The following methods are safe to call after invoking ``emit`` and ``read`` (and will lead to actually running the
|
||||||
|
operation when those are done): ``complete(out)``, ``completeStage()``, ``emit``, ``emitMultiple``, ``abortEmitting()``
|
||||||
|
and ``abortReading()``
|
||||||
|
|
||||||
|
An example of how this API simplifies a stage can be found below in the second version of the :class:``Duplicator``.
|
||||||
|
|
||||||
|
|
||||||
|
Custom linear processing stages using GraphStage
|
||||||
|
------------------------------------------------
|
||||||
|
|
||||||
|
Graph stages allows for custom linear processing stages through letting them
|
||||||
|
have one input and one output and using :class:`FlowShape` as their shape.
|
||||||
|
|
||||||
|
Such a stage can be illustrated as a box with two flows as it is
|
||||||
|
seen in the illustration below. Demand flowing upstream leading to elements
|
||||||
|
flowing downstream.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_conceptual.png
|
||||||
|
:align: center
|
||||||
|
:width: 500
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
|
||||||
|
To illustrate these concepts we create a small :class:`GraphStage` that implements the ``map`` transformation.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_map.png
|
||||||
|
:align: center
|
||||||
|
:width: 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
Map calls ``push(out)`` from the ``onPush()`` handler and it also calls ``pull()`` from the ``onPull`` handler resulting in the
|
||||||
|
conceptual wiring above, and fully expressed in code below:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#one-to-one
|
||||||
|
|
||||||
|
Map is a typical example of a one-to-one transformation of a stream where
|
||||||
|
demand is passed along upstream elements passed on downstream.
|
||||||
|
|
||||||
|
To demonstrate a many-to-one stage we will implement
|
||||||
|
filter. The conceptual wiring of ``Filter`` looks like this:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_filter.png
|
||||||
|
:align: center
|
||||||
|
:width: 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
|
||||||
|
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
|
||||||
|
we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map
|
||||||
|
example by adding a conditional in the ``onPush`` handler and decide between a ``pull(in)`` or ``push(out)`` call
|
||||||
|
(and of course not having a mapping ``f`` function).
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#many-to-one
|
||||||
|
|
||||||
|
To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage
|
||||||
|
that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_duplicate.png
|
||||||
|
:align: center
|
||||||
|
:width: 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
This is a stage that has state: an option with the last element it has seen indicating if it
|
||||||
|
has duplicated this last element already or not. We must also make sure to emit the extra element
|
||||||
|
if the upstream completes.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#one-to-many
|
||||||
|
|
||||||
|
In this case a pull from downstream might be consumed by the stage itself rather
|
||||||
|
than passed along upstream as the stage might contain an element it wants to
|
||||||
|
push. Note that we also need to handle the case where the upstream closes while
|
||||||
|
the stage still has elements it wants to push downstream. This is done by
|
||||||
|
overriding `onUpstreamFinish` in the `InHandler` and provide custom logic
|
||||||
|
that should happen when the upstream has been finished.
|
||||||
|
|
||||||
|
This example can be simplified by replacing the usage of a mutable state with calls to
|
||||||
|
``emitMultiple`` which will replace the handlers, emit each of multiple elements and then
|
||||||
|
reinstate the original handlers:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#simpler-one-to-many
|
||||||
|
|
||||||
|
|
||||||
|
Finally, to demonstrate all of the stages above, we put them together into a processing chain,
|
||||||
|
which conceptually would correspond to the following structure:
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_chain.png
|
||||||
|
:align: center
|
||||||
|
:width: 700
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
In code this is only a few lines, using the ``via`` use our custom stages in a stream:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#graph-stage-chain
|
||||||
|
|
||||||
|
If we attempt to draw the sequence of events, it shows that there is one "event token"
|
||||||
|
in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/graph_stage_tracks_1.png
|
||||||
|
:align: center
|
||||||
|
:width: 700
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
Completion
|
Completion
|
||||||
----------
|
----------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit
|
||||||
|
a few more elements after their upstream source has been completed. We have seen an example of this in our
|
||||||
|
first :class:`Duplicator` implementation where the last element needs to be doubled even after the upstream neighbor
|
||||||
|
stage has been completed. This can be done by overriding the ``onUpstreamFinish`` method in ``InHandler``.
|
||||||
|
|
||||||
Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally.
|
Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally.
|
||||||
It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in
|
It is possible to opt out from this behavior by invoking ``setKeepGoing(true)`` (which is not supported from the stage’s
|
||||||
the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()``
|
constructor and usually done in ``preStart``). In this case the stage **must** be explicitly closed by calling ``completeStage()``
|
||||||
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
|
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
|
||||||
with care.
|
with care.
|
||||||
|
|
||||||
|
|
||||||
Using timers
|
Using timers
|
||||||
------------
|
------------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
|
||||||
|
|
||||||
It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for
|
It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for
|
||||||
the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or
|
the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or
|
||||||
``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example
|
``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example
|
||||||
|
|
@ -155,11 +301,15 @@ fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the
|
||||||
Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the
|
Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the
|
||||||
``preStart()`` lifecycle hook.
|
``preStart()`` lifecycle hook.
|
||||||
|
|
||||||
|
In this sample the stage toggles between open and closed, where open means no elements are passed through. The
|
||||||
|
stage starts out as closed but as soon as an element is pushed downstream the gate becomes open for a duration
|
||||||
|
of time during which it will consume and drop upstream messages:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#timed
|
||||||
|
|
||||||
|
|
||||||
Using asynchronous side-channels
|
Using asynchronous side-channels
|
||||||
--------------------------------
|
--------------------------------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
|
||||||
|
|
||||||
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
|
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
|
||||||
or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the
|
or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the
|
||||||
stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous
|
stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous
|
||||||
|
|
@ -171,6 +321,12 @@ implementation.
|
||||||
Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the
|
Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the
|
||||||
``preStart()`` lifecycle hook instead.
|
``preStart()`` lifecycle hook instead.
|
||||||
|
|
||||||
|
|
||||||
|
This example shows an asynchronous side channel graph stage that starts dropping elements
|
||||||
|
when a future completes:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#async-side-channel
|
||||||
|
|
||||||
Integration with actors
|
Integration with actors
|
||||||
-----------------------
|
-----------------------
|
||||||
|
|
||||||
|
|
@ -189,11 +345,10 @@ or ``unwatch(ref)`` methods. The reference can be also watched by external actor
|
||||||
- they cannot be accessed from the constructor of the :class:`GraphStageLogic`, but they can be accessed from the
|
- they cannot be accessed from the constructor of the :class:`GraphStageLogic`, but they can be accessed from the
|
||||||
``preStart()`` method.
|
``preStart()`` method.
|
||||||
|
|
||||||
|
|
||||||
Custom materialized values
|
Custom materialized values
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
**This section is a stub and will be extended in the next release**
|
|
||||||
|
|
||||||
Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue`
|
Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue`
|
||||||
instead of the simpler :class:`GraphStage`. The difference is that in this case the method
|
instead of the simpler :class:`GraphStage`. The difference is that in this case the method
|
||||||
``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the
|
``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the
|
||||||
|
|
@ -204,6 +359,11 @@ stage logic the materialized value must be provided
|
||||||
the thread that got hold of the materialized value. It is the responsibility of the programmer to add the
|
the thread that got hold of the materialized value. It is the responsibility of the programmer to add the
|
||||||
necessary (non-blocking) synchronization and visibility guarantees to this shared object.
|
necessary (non-blocking) synchronization and visibility guarantees to this shared object.
|
||||||
|
|
||||||
|
In this sample the materialized value is a future containing the first element to go through the stream:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#materialized
|
||||||
|
|
||||||
|
|
||||||
Using attributes to affect the behavior of a stage
|
Using attributes to affect the behavior of a stage
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
||||||
|
|
@ -217,239 +377,54 @@ decision.
|
||||||
See :ref:`composition-scala` for an explanation on how attributes work.
|
See :ref:`composition-scala` for an explanation on how attributes work.
|
||||||
|
|
||||||
|
|
||||||
Custom linear processing stages with PushPullStage
|
|
||||||
==================================================
|
|
||||||
|
|
||||||
To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method
|
|
||||||
which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this
|
|
||||||
page.
|
|
||||||
|
|
||||||
.. _stream-using-push-pull-stage-scala:
|
|
||||||
|
|
||||||
Using PushPullStage
|
|
||||||
-------------------
|
|
||||||
|
|
||||||
The most elementary transformation stage is the :class:`PushPullStage` which can express a large class of algorithms
|
|
||||||
working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is
|
|
||||||
seen in the illustration below.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_conceptual.png
|
|
||||||
:align: center
|
|
||||||
:width: 600
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports"
|
|
||||||
correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling
|
|
||||||
exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly.
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
There is one very important rule to remember when working with a ``Stage``. **Exactly one** method should be called
|
|
||||||
on the **currently passed** :class:`Context` **exactly once** and as the **last statement of the handler** where the return type
|
|
||||||
of the called method **matches the expected return type of the handler**. Any violation of this rule will
|
|
||||||
almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions
|
|
||||||
to this rule are the query methods ``isHolding()`` and ``isFinishing()``
|
|
||||||
|
|
||||||
To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_map.png
|
|
||||||
:align: center
|
|
||||||
:width: 300
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull``
|
|
||||||
handler resulting in the conceptual wiring above, and fully expressed in code below:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#one-to-one
|
|
||||||
|
|
||||||
Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement
|
|
||||||
filter. The conceptual wiring of ``Filter`` looks like this:
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_filter.png
|
|
||||||
:align: center
|
|
||||||
:width: 300
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
|
|
||||||
we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map
|
|
||||||
example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call
|
|
||||||
(and of course not having a mapping ``f`` function).
|
|
||||||
|
|
||||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#many-to-one
|
|
||||||
|
|
||||||
To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage
|
|
||||||
that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_doubler.png
|
|
||||||
:align: center
|
|
||||||
:width: 300
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we
|
|
||||||
have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull``
|
|
||||||
method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we
|
|
||||||
will explain a little bit later. For now it is enough to look at the ``if(!ctx.isFinishing)`` block which
|
|
||||||
corresponds to the logic we expect by looking at the conceptual picture.
|
|
||||||
|
|
||||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#one-to-many
|
|
||||||
|
|
||||||
Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually
|
|
||||||
would correspond to the following structure:
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_chain.png
|
|
||||||
:align: center
|
|
||||||
:width: 650
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#stage-chain
|
|
||||||
|
|
||||||
If we attempt to draw the sequence of events, it shows that there is one "event token"
|
|
||||||
in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_general.png
|
|
||||||
:align: center
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
|
||||||
Completion handling
|
Rate decoupled graph stages
|
||||||
^^^^^^^^^^^^^^^^^^^
|
---------------------------
|
||||||
|
|
||||||
Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit a few
|
Sometimes it is desirable to *decouple* the rate of the upstream and downstream of a stage, synchronizing only
|
||||||
more elements after their upstream source has been completed. We have seen an example of this in our ``Duplicator`` class
|
when needed.
|
||||||
where the last element needs to be doubled even after the upstream neighbor stage has been completed. Since the
|
|
||||||
``onUpstreamFinish()`` handler expects a :class:`TerminationDirective` as the return type we are only allowed to call
|
|
||||||
``ctx.finish()``, ``ctx.fail()`` or ``ctx.absorbTermination()``. Since the first two of these available methods will
|
|
||||||
immediately terminate, our only option is ``absorbTermination()``. It is also clear from the return type of
|
|
||||||
``onUpstreamFinish`` that we cannot call ``ctx.push()`` but we need to emit elements somehow! The trick is that after
|
|
||||||
calling ``absorbTermination()`` the ``onPull()`` handler will be called eventually, and at the same time
|
|
||||||
``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to
|
|
||||||
emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing.
|
|
||||||
|
|
||||||
The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of
|
This is achieved in the model by representing a :class:`GraphStage` as a *boundary* between two regions where the
|
||||||
Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that
|
demand sent upstream is decoupled from the demand that arrives from downstream. One immediate consequence of this
|
||||||
our push/pull structure that was illustrated in the figure of our custom processing chain does not
|
difference is that an ``onPush`` call does not always lead to calling ``push`` and an ``onPull`` call does not always
|
||||||
apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a
|
lead to calling ``pull``.
|
||||||
pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling
|
|
||||||
``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at
|
|
||||||
that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was
|
|
||||||
*below* (which means that it will come back eventually, so the environment does not need to call anything yet).
|
|
||||||
|
|
||||||
The first of the two scenarios is when a termination signal arrives after a stage passed the event to its downstream. As
|
One of the important use-case for this is to build buffer-like entities, that allow independent progress
|
||||||
we can see in the following diagram, there is no need to do anything by ``absorbTermination()`` since the black arrows
|
|
||||||
representing the movement of the "event token" is uninterrupted.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_absorb_1.png
|
|
||||||
:align: center
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case
|
|
||||||
``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone
|
|
||||||
(since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_absorb_2.png
|
|
||||||
:align: center
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is
|
|
||||||
whether it is the downstream or the ``absorbTermination()`` call that calls the event handler.
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
It is not allowed to call ``absorbTermination()`` from ``onDownstreamFinish()``. If the method is called anyway,
|
|
||||||
it will be logged at ``ERROR`` level, but no further action will be taken as at that point there is no active
|
|
||||||
downstream to propagate the error to. Cancellation in the upstream direction will continue undisturbed.
|
|
||||||
|
|
||||||
Using PushStage
|
|
||||||
---------------
|
|
||||||
|
|
||||||
Many one-to-one and many-to-one transformations do not need to override the ``onPull()`` handler at all since all
|
|
||||||
they do is just propagate the pull upwards. For such transformations it is better to extend PushStage directly. For
|
|
||||||
example our ``Map`` and ``Filter`` would look like this:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#pushstage
|
|
||||||
|
|
||||||
The reason to use ``PushStage`` is not just cosmetic: internal optimizations rely on the fact that the onPull method
|
|
||||||
only calls ``ctx.pull()`` and allow the environment do process elements faster than without this knowledge. By
|
|
||||||
extending ``PushStage`` the environment can be sure that ``onPull()`` was not overridden since it is ``final`` on
|
|
||||||
``PushStage``.
|
|
||||||
|
|
||||||
|
|
||||||
Using DetachedStage
|
|
||||||
-------------------
|
|
||||||
|
|
||||||
The model described in previous sections, while conceptually simple, cannot describe all desired stages. The main
|
|
||||||
limitation is the "single-ball" (single "event token") model which prevents independent progress of an upstream and
|
|
||||||
downstream of a stage. Sometimes it is desirable to *detach* the progress (and therefore, rate) of the upstream and
|
|
||||||
downstream of a stage, synchronizing only when needed.
|
|
||||||
|
|
||||||
This is achieved in the model by representing a :class:`DetachedStage` as a *boundary* between two "single-ball" regions.
|
|
||||||
One immediate consequence of this difference is that **it is not allowed to call** ``ctx.pull()`` **from** ``onPull()`` **and
|
|
||||||
it is not allowed to call** ``ctx.push()`` **from** ``onPush()`` as such combinations would "steal" a token from one region
|
|
||||||
(resulting in zero tokens left) and would inject an unexpected second token to the other region. This is enforced
|
|
||||||
by the expected return types of these callback functions.
|
|
||||||
|
|
||||||
One of the important use-cases for :class:`DetachedStage` is to build buffer-like entities, that allow independent progress
|
|
||||||
of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the
|
of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the
|
||||||
buffer becomes empty or full. The next diagram illustrates the event sequence for a buffer with capacity of two elements.
|
buffer becomes empty or full.
|
||||||
|
|
||||||
|
The next diagram illustrates the event sequence for a buffer with capacity of two elements in a setting where
|
||||||
|
the downstream demand is slow to start and the buffer will fill up with upstream elements before any demand
|
||||||
|
is seen from downstream.
|
||||||
|
|
||||||
|
|
|
|
||||||
|
|
||||||
.. image:: ../images/stage_msc_buffer.png
|
.. image:: ../images/graph_stage_detached_tracks_1.png
|
||||||
:align: center
|
:align: center
|
||||||
|
:width: 500
|
||||||
|
|
||||||
|
|
|
|
||||||
|
|
||||||
The very first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on
|
Another scenario would be where the demand from downstream starts coming in before any element is pushed
|
||||||
initialization. Remember that it is forbidden to call ``ctx.pull`` from ``onPull``, therefore it is the task of the
|
into the buffer stage.
|
||||||
framework to kick off the first "event token" in the upstream region, which will remain there until the upstream stages
|
|
||||||
stop. The diagram distinguishes between the actions of the two regions by colors: *purple* arrows indicate the actions
|
|
||||||
involving the upstream "event token", while *red* arrows show the downstream region actions. This demonstrates the clear
|
|
||||||
separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged.
|
|
||||||
|
|
||||||
For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream
|
|
||||||
or downstream. The new API calls that are available for :class:`DetachedStage` s are the various ``ctx.holdXXX()`` methods
|
|
||||||
, ``ctx.pushAndPull()`` and variants, and ``ctx.isHoldingXXX()``.
|
|
||||||
Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding
|
|
||||||
region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding()``
|
|
||||||
which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not
|
|
||||||
both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only
|
|
||||||
possible by calling ``ctx.pushAndPull()``. This is to ensure that both the held token is released, and the triggering region
|
|
||||||
gets its token back (one inbound token + one held token = two released tokens).
|
|
||||||
|
|
||||||
The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed.
|
|
|
||||||
|
|
||||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#detached
|
.. image:: ../images/graph_stage_detached_tracks_2.png
|
||||||
|
:align: center
|
||||||
|
:width: 500
|
||||||
|
|
||||||
.. warning::
|
|
|
||||||
If ``absorbTermination()`` is called on a :class:`DetachedStage` while it holds downstream (``isHoldingDownstream``
|
|
||||||
returns true) then ``onPull()`` will be called on the stage. This ensures that the stage does not end up in a
|
|
||||||
deadlocked case. Since at the point when the termination is absorbed there will be no way to get any callbacks because
|
The first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on
|
||||||
the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination
|
initialization. The buffer has demand for up to two elements without any downstream demand.
|
||||||
logic already shown for :class:`PushPullStage`.
|
|
||||||
|
The following code example demonstrates a buffer class corresponding to the message sequence chart above.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached
|
||||||
|
|
||||||
|
|
||||||
Thread safety of custom processing stages
|
Thread safety of custom processing stages
|
||||||
|
|
|
||||||
|
|
@ -3,16 +3,15 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.stage
|
package akka.stream.stage
|
||||||
|
|
||||||
import java.util
|
import java.util.concurrent.atomic.{ AtomicReference }
|
||||||
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference }
|
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch }
|
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch }
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
|
import akka.japi.function.{ Effect, Procedure }
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.stream.impl.StreamLayout.Module
|
import akka.stream.impl.StreamLayout.Module
|
||||||
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
|
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule }
|
||||||
import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule, GraphStageModule }
|
|
||||||
import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName }
|
import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName }
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
@ -612,6 +611,17 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: Read a number of elements from the given inlet and continue with the given function,
|
||||||
|
* suspending execution if necessary. This action replaces the [[InHandler]]
|
||||||
|
* for the given inlet if suspension is needed and reinstalls the current
|
||||||
|
* handler upon receiving the last `onPush()` signal (before invoking the `andThen` function).
|
||||||
|
*/
|
||||||
|
final protected def readN[T](in: Inlet[T], n: Int, andThen: Procedure[java.util.List[T]], onClose: Procedure[java.util.List[T]]): Unit = {
|
||||||
|
import collection.JavaConverters._
|
||||||
|
readN(in, n)(seq ⇒ andThen(seq.asJava), seq ⇒ onClose(seq.asJava))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read an element from the given inlet and continue with the given function,
|
* Read an element from the given inlet and continue with the given function,
|
||||||
* suspending execution if necessary. This action replaces the [[InHandler]]
|
* suspending execution if necessary. This action replaces the [[InHandler]]
|
||||||
|
|
@ -631,6 +641,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: Read an element from the given inlet and continue with the given function,
|
||||||
|
* suspending execution if necessary. This action replaces the [[InHandler]]
|
||||||
|
* for the given inlet if suspension is needed and reinstalls the current
|
||||||
|
* handler upon receiving the `onPush()` signal (before invoking the `andThen` function).
|
||||||
|
*/
|
||||||
|
final protected def read[T](in: Inlet[T], andThen: Procedure[T], onClose: Effect): Unit = {
|
||||||
|
read(in)(andThen.apply, onClose.apply)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abort outstanding (suspended) reading for the given inlet, if there is any.
|
* Abort outstanding (suspended) reading for the given inlet, if there is any.
|
||||||
* This will reinstall the replaced handler that was in effect before the `read`
|
* This will reinstall the replaced handler that was in effect before the `read`
|
||||||
|
|
@ -690,6 +710,32 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
*/
|
*/
|
||||||
final protected def emitMultiple[T](out: Outlet[T], elems: immutable.Iterable[T]): Unit = emitMultiple(out, elems, DoNothing)
|
final protected def emitMultiple[T](out: Outlet[T], elems: immutable.Iterable[T]): Unit = emitMultiple(out, elems, DoNothing)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API
|
||||||
|
*
|
||||||
|
* Emit a sequence of elements through the given outlet, suspending execution if necessary.
|
||||||
|
* This action replaces the [[AbstractOutHandler]] for the given outlet if suspension
|
||||||
|
* is needed and reinstalls the current handler upon receiving an `onPull()`
|
||||||
|
* signal.
|
||||||
|
*/
|
||||||
|
final protected def emitMultiple[T](out: Outlet[T], elems: java.util.Iterator[T]): Unit = {
|
||||||
|
import collection.JavaConverters._
|
||||||
|
emitMultiple(out, elems.asScala, DoNothing)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API
|
||||||
|
*
|
||||||
|
* Emit a sequence of elements through the given outlet, suspending execution if necessary.
|
||||||
|
* This action replaces the [[AbstractOutHandler]] for the given outlet if suspension
|
||||||
|
* is needed and reinstalls the current handler upon receiving an `onPull()`
|
||||||
|
* signal.
|
||||||
|
*/
|
||||||
|
final protected def emitMultiple[T](out: Outlet[T], elems: java.util.Iterator[T], andThen: Effect): Unit = {
|
||||||
|
import collection.JavaConverters._
|
||||||
|
emitMultiple(out, elems.asScala, andThen.apply _)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emit a sequence of elements through the given outlet and continue with the given thunk
|
* Emit a sequence of elements through the given outlet and continue with the given thunk
|
||||||
* afterwards, suspending execution if necessary.
|
* afterwards, suspending execution if necessary.
|
||||||
|
|
@ -740,6 +786,10 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
*/
|
*/
|
||||||
final protected def emit[T](out: Outlet[T], elem: T): Unit = emit(out, elem, DoNothing)
|
final protected def emit[T](out: Outlet[T], elem: T): Unit = emit(out, elem, DoNothing)
|
||||||
|
|
||||||
|
final protected def emit[T](out: Outlet[T], elem: T, andThen: Effect): Unit = {
|
||||||
|
emit(out, elem, andThen.apply _)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abort outstanding (suspended) emissions for the given outlet, if there are any.
|
* Abort outstanding (suspended) emissions for the given outlet, if there are any.
|
||||||
* This will reinstall the replaced handler that was in effect before the `emit`
|
* This will reinstall the replaced handler that was in effect before the `emit`
|
||||||
|
|
@ -875,6 +925,18 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: Obtain a callback object that can be used asynchronously to re-enter the
|
||||||
|
* current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned
|
||||||
|
* [[AsyncCallback]] is safe to be called from other threads and it will in the background thread-safely
|
||||||
|
* delegate to the passed callback function. I.e. [[invoke()]] will be called by the external world and
|
||||||
|
* the passed handler will be invoked eventually in a thread-safe way by the execution environment.
|
||||||
|
*
|
||||||
|
* This object can be cached and reused within the same [[GraphStageLogic]].
|
||||||
|
*/
|
||||||
|
final protected def createAsyncCallback[T](handler: Procedure[T]): AsyncCallback[T] =
|
||||||
|
getAsyncCallback(handler.apply)
|
||||||
|
|
||||||
private var _stageActorRef: StageActorRef = _
|
private var _stageActorRef: StageActorRef = _
|
||||||
final def stageActorRef: ActorRef = _stageActorRef match {
|
final def stageActorRef: ActorRef = _stageActorRef match {
|
||||||
case null ⇒ throw StageActorRefNotInitializedException()
|
case null ⇒ throw StageActorRefNotInitializedException()
|
||||||
|
|
@ -982,6 +1044,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Will be called when the scheduled timer is triggered.
|
* Will be called when the scheduled timer is triggered.
|
||||||
|
*
|
||||||
* @param timerKey key of the scheduled timer
|
* @param timerKey key of the scheduled timer
|
||||||
*/
|
*/
|
||||||
protected def onTimer(timerKey: Any): Unit = ()
|
protected def onTimer(timerKey: Any): Unit = ()
|
||||||
|
|
@ -1029,6 +1092,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cancel timer, ensuring that the [[#onTimer]] is not subsequently called.
|
* Cancel timer, ensuring that the [[#onTimer]] is not subsequently called.
|
||||||
|
*
|
||||||
* @param timerKey key of the timer to cancel
|
* @param timerKey key of the timer to cancel
|
||||||
*/
|
*/
|
||||||
final protected def cancelTimer(timerKey: Any): Unit =
|
final protected def cancelTimer(timerKey: Any): Unit =
|
||||||
|
|
|
||||||