+doc java sample code for detached stage

This commit is contained in:
Martynas Mickevičius 2015-07-14 17:45:57 +03:00
parent 0d8fd40b09
commit d9386c8cc5
3 changed files with 119 additions and 24 deletions

View file

@ -25,10 +25,14 @@ The most elementary transformation stage is the :class:`PushPullStage` which can
working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is
seen in the illustration below. seen in the illustration below.
|
.. image:: ../images/stage_conceptual.png .. image:: ../images/stage_conceptual.png
:align: center :align: center
:width: 600 :width: 600
|
The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports" The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports"
correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling
exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly. exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly.
@ -42,10 +46,14 @@ exactly one "output port" method we wire up these four ports in various ways whi
To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation. To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation.
|
.. image:: ../images/stage_map.png .. image:: ../images/stage_map.png
:align: center :align: center
:width: 300 :width: 300
|
Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull`` Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull``
handler resulting in the conceptual wiring above, and fully expressed in code below: handler resulting in the conceptual wiring above, and fully expressed in code below:
@ -54,10 +62,14 @@ handler resulting in the conceptual wiring above, and fully expressed in code be
Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement
filter. The conceptual wiring of ``Filter`` looks like this: filter. The conceptual wiring of ``Filter`` looks like this:
|
.. image:: ../images/stage_filter.png .. image:: ../images/stage_filter.png
:align: center :align: center
:width: 300 :width: 300
|
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map
example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call
@ -68,10 +80,14 @@ example by adding a conditional in the ``onPush`` handler and decide between a `
To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage
that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:
|
.. image:: ../images/stage_doubler.png .. image:: ../images/stage_doubler.png
:align: center :align: center
:width: 300 :width: 300
|
This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we
have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull`` have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull``
method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we
@ -83,14 +99,28 @@ corresponds to the logic we expect by looking at the conceptual picture.
Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually
would correspond to the following structure: would correspond to the following structure:
|
.. image:: ../images/stage_chain.png .. image:: ../images/stage_chain.png
:align: center :align: center
:width: 650 :width: 650
|
In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream: In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#stage-chain .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#stage-chain
If we attempt to draw the sequence of events, it shows that there is one "event token"
in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts.
|
.. image:: ../images/stage_msc_general.png
:align: center
|
Completion handling Completion handling
^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^
@ -105,15 +135,39 @@ calling ``absorbTermination()`` the ``onPull()`` handler will be called eventual
``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to ``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to
emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing. emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing.
.. note:: The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of
The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that
Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that our push/pull structure that was illustrated in the figure of our custom processing chain does not
our push/pull structure that was illustrated in the figure of our custom processing chain does not apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a
apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling
pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling ``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at
``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was
that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was *below* (which means that it will come back eventually, so the environment does not need to call anything yet).
*below* (which means that it will come back eventually, so the environment does not need to call anything yet).
The first of the two scenarios is when a termination signal arrives after a stage passed the event to its downstream. As
we can see in the following diagram, there is no need to do anything by ``absorbTermination()`` since the black arrows
representing the movement of the "event token" is uninterrupted.
|
.. image:: ../images/stage_msc_absorb_1.png
:align: center
|
In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case
``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone
(since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage.
|
.. image:: ../images/stage_msc_absorb_2.png
:align: center
|
Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is
whether it is the downstream or the ``absorbTermination()`` call that calls the event handler.
Using PushStage Using PushStage
@ -146,7 +200,48 @@ we reimplemented ``Duplicator`` in terms of a ``StatefulStage``:
Using DetachedStage Using DetachedStage
------------------- -------------------
*TODO* The model described in previous sections, while conceptually simple, cannot describe all desired stages. The main
limitation is the "single-ball" (single "event token") model which prevents independent progress of an upstream and
downstream of a stage. Sometimes it is desirable to *detach* the progress (and therefore, rate) of the upstream and
downstream of a stage, synchronizing only when needed.
This is achieved in the model by representing a :class:`DetachedStage` as a *boundary* between two "single-ball" regions.
One immediate consequence of this difference is that **it is not allowed to call** ``ctx.pull()`` **from** ``onPull()`` **and
it is not allowed to call** ``ctx.push()`` **from** ``onPush()`` as such combinations would "steal" a token from one region
(resulting in zero tokens left) and would inject an unexpected second token to the other region. This is enforced
by the expected return types of these callback functions.
One of the important use-cases for :class:`DetachedStage` is to build buffer-like entities, that allow independent progress
of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the
buffer becomes empty or full. The next diagram illustrates the event sequence for a buffer with capacity of two elements.
|
.. image:: ../images/stage_msc_buffer.png
:align: center
|
The very first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on
initialization. Remember that it is forbidden to call ``ctx.pull`` from ``onPull``, therefore it is the task of the
framework to kick off the first "event token" in the upstream region, which will remain there until the upstream stages
stop. The diagram distinguishes between the actions of the two regions by colors: *purple* arrows indicate the actions
involving the upstream "event token", while *red* arrows show the downstream region actions. This demonstrates the clear
separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged.
For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream
or downstream. The new API calls that are available for :class:`DetachedStage` s are the various ``ctx.holdXXX()`` methods
, ``ctx.pushAndPull()`` and variants, and ``ctx.isHoldingXXX()``.
Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding
region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding()``
which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not
both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only
possible by calling ``ctx.pushAndPull()``. This is to ensure that both the held token is released, and the triggering region
gets its token back (one inbound token + one held token = two released tokens).
The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#detached
Custom graph processing junctions Custom graph processing junctions
================================= =================================
@ -293,7 +388,7 @@ we use the special :class:`SameState` object which signals :class:`FlexiRoute` t
.. warning:: .. warning::
While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance
*must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances. *must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances.
.. note:: .. note::
It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown. It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown.
@ -330,11 +425,11 @@ All of the above custom stages (linear or graph) provide a few simple guarantees
- The state encapsulated by these classes can be safely modified from the provided callbacks, without any further - The state encapsulated by these classes can be safely modified from the provided callbacks, without any further
synchronization. synchronization.
In essence, the above guarantees are similar to what :class:`Actor`s provide, if one thinks of the state of a custom In essence, the above guarantees are similar to what :class:`Actor` s provide, if one thinks of the state of a custom
stage as state of an actor, and the callbacks as the ``receive`` block of the actor. stage as state of an actor, and the callbacks as the ``receive`` block of the actor.
.. warning:: .. warning::
It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it
is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over**
internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined
behavior. behavior.

View file

@ -239,7 +239,8 @@ class FlexiDocSpec extends AkkaSpec {
"flexi route completion handling" in { "flexi route completion handling" in {
import FanOutShape._ import FanOutShape._
//#flexiroute-completion //#flexiroute-completion
class ImportantRouteShape[A](_init: Init[A] = Name[A]("ImportantRoute")) extends FanOutShape[A](_init) { class ImportantRouteShape[A](_init: Init[A] = Name[A]("ImportantRoute"))
extends FanOutShape[A](_init) {
val important = newOutlet[A]("important") val important = newOutlet[A]("important")
val additional1 = newOutlet[A]("additional1") val additional1 = newOutlet[A]("additional1")
val additional2 = newOutlet[A]("additional2") val additional2 = newOutlet[A]("additional2")

View file

@ -136,7 +136,6 @@ calling ``absorbTermination()`` the ``onPull()`` handler will be called eventual
``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to ``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to
emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing. emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing.
The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of
Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that
our push/pull structure that was illustrated in the figure of our custom processing chain does not our push/pull structure that was illustrated in the figure of our custom processing chain does not
@ -159,7 +158,7 @@ representing the movement of the "event token" is uninterrupted.
In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case
``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone ``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone
(since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage. ű (since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage.
| |
@ -231,13 +230,13 @@ involving the upstream "event token", while *red* arrows show the downstream reg
separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged. separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged.
For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream
or downstream. The new API calls that are available for :class:`DetachedStage` s are the various``ctx.holdXXX()`` methods or downstream. The new API calls that are available for :class:`DetachedStage` s are the various ``ctx.holdXXX()`` methods
, ``ctx.pushAndPull()`` and variangs, and ``ctx.isHoldingXXX()``. , ``ctx.pushAndPull()`` and variants, and ``ctx.isHoldingXXX()``.
Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding
region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding`` region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding()``
which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not
both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only
possible by calling ``pushAndPull()``. This is to ensure that both the held token is released, and the triggering region possible by calling ``ctx.pushAndPull()``. This is to ensure that both the held token is released, and the triggering region
gets its token back (one inbound token + one held token = two released tokens). gets its token back (one inbound token + one held token = two released tokens).
The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed. The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed.
@ -391,7 +390,7 @@ we use the special :class:`SameState` object which signals :class:`FlexiRoute` t
.. warning:: .. warning::
While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance
*must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances. *must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances.
.. note:: .. note::
It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown. It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown.
@ -428,11 +427,11 @@ All of the above custom stages (linear or graph) provide a few simple guarantees
- The state encapsulated by these classes can be safely modified from the provided callbacks, without any further - The state encapsulated by these classes can be safely modified from the provided callbacks, without any further
synchronization. synchronization.
In essence, the above guarantees are similar to what :class:`Actor`s provide, if one thinks of the state of a custom In essence, the above guarantees are similar to what :class:`Actor` s provide, if one thinks of the state of a custom
stage as state of an actor, and the callbacks as the ``receive`` block of the actor. stage as state of an actor, and the callbacks as the ``receive`` block of the actor.
.. warning:: .. warning::
It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it
is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over**
internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined
behavior. behavior.