diff --git a/akka-docs-dev/rst/images/graph_stage_chain.png b/akka-docs-dev/rst/images/graph_stage_chain.png new file mode 100644 index 0000000000..9de3145b21 Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_chain.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_chain.svg b/akka-docs-dev/rst/images/graph_stage_chain.svg new file mode 100644 index 0000000000..b3ad21644b --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_chain.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 12:11:46 +0000Canvas 1Layer 1SourceSinkFilteronPush()push(out, elem)if(p(elem))onPull()pull(in)demandif(!p(elem))DuplicateonPush()push(out, elem)onPull()pull(in)demandif(oneLeft)if(!oneLeft)MaponPush()push(out, elem)f(elem)onPull()pull(in)demand diff --git a/akka-docs-dev/rst/images/graph_stage_conceptual.png b/akka-docs-dev/rst/images/graph_stage_conceptual.png new file mode 100644 index 0000000000..a27f1cfca3 Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_conceptual.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_conceptual.svg b/akka-docs-dev/rst/images/graph_stage_conceptual.svg new file mode 100644 index 0000000000..6e8b79e67a --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_conceptual.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 10:31:40 +0000Canvas 1Layer 1onPush()push(out, elem)elementsonPull()pull(in)demandGraphStageSourceSink diff --git a/akka-docs-dev/rst/images/graph_stage_detached_tracks_1.png b/akka-docs-dev/rst/images/graph_stage_detached_tracks_1.png new file mode 100644 index 0000000000..1887d0fbc8 Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_detached_tracks_1.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_detached_tracks_1.svg b/akka-docs-dev/rst/images/graph_stage_detached_tracks_1.svg new file mode 100644 index 0000000000..794fa8c8bd --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_detached_tracks_1.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 15:40:29 +0000Canvas 3Layer 1SourceTwoBufferSinkpreStartonPushonPushpush(out, 1)onPullpull(in)push(out, 1)push(out, 3)onPull[][1]pull(in)onPushpush(out, 2)onPull[1,2]pull(in)onPush[2,3]bufferFull!bufferFull diff --git a/akka-docs-dev/rst/images/graph_stage_detached_tracks_2.png b/akka-docs-dev/rst/images/graph_stage_detached_tracks_2.png new file mode 100644 index 0000000000..1aabee9a3d Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_detached_tracks_2.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_detached_tracks_2.svg b/akka-docs-dev/rst/images/graph_stage_detached_tracks_2.svg new file mode 100644 index 0000000000..f79400317b --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_detached_tracks_2.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 15:40:29 +0000Canvas 3Layer 1SourceTwoBufferSinkpreStartonPushonPullpull(in)push(out, 1)onPull[][1]pull(in)onPushpush(out, 2)onPull[]downstreamWaiting!downstreamWaitingonPush[2] diff --git a/akka-docs-dev/rst/images/graph_stage_duplicate.png b/akka-docs-dev/rst/images/graph_stage_duplicate.png new file mode 100644 index 0000000000..88f1c560ad Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_duplicate.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_duplicate.svg b/akka-docs-dev/rst/images/graph_stage_duplicate.svg new file mode 100644 index 0000000000..4729f72071 --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_duplicate.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 10:31:40 +0000Canvas 1Layer 1DuplicateonPush()push(out, elem)onPull()pull(in)demandif(oneLeft)if(!oneLeft) diff --git a/akka-docs-dev/rst/images/graph_stage_filter.png b/akka-docs-dev/rst/images/graph_stage_filter.png new file mode 100644 index 0000000000..77068a2a33 Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_filter.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_filter.svg b/akka-docs-dev/rst/images/graph_stage_filter.svg new file mode 100644 index 0000000000..02f366da87 --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_filter.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 10:31:40 +0000Canvas 1Layer 1FilteronPush()push(out, elem)if(p(elem))onPull()pull(in)demandif(!p(elem)) diff --git a/akka-docs-dev/rst/images/graph_stage_map.png b/akka-docs-dev/rst/images/graph_stage_map.png new file mode 100644 index 0000000000..96666ddfee Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_map.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_map.svg b/akka-docs-dev/rst/images/graph_stage_map.svg new file mode 100644 index 0000000000..2943f7248f --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_map.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 10:31:40 +0000Canvas 1Layer 1MaponPush()push(out, elem)f(elem)onPull()pull(in)demand diff --git a/akka-docs-dev/rst/images/graph_stage_tracks_1.png b/akka-docs-dev/rst/images/graph_stage_tracks_1.png new file mode 100644 index 0000000000..5f15f8612e Binary files /dev/null and b/akka-docs-dev/rst/images/graph_stage_tracks_1.png differ diff --git a/akka-docs-dev/rst/images/graph_stage_tracks_1.svg b/akka-docs-dev/rst/images/graph_stage_tracks_1.svg new file mode 100644 index 0000000000..99c77236e1 --- /dev/null +++ b/akka-docs-dev/rst/images/graph_stage_tracks_1.svg @@ -0,0 +1,3 @@ + + + Produced by OmniGraffle 6.4.1 2015-12-10 12:53:14 +0000Canvas 2Layer 1SourceFilterDoublerMapSinkonPullonPullpull(in)onPullpull(in)onPushonPushonPushpush(out…push(out…push(out…onPushonPullpull(in)push(out…onPushpull(in)onPullpull(in)push(out…pull(in)pull(in)push(out…push(out…onPullonPullonPushonPush diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index 01928a7fac..4ddd97e949 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -9,6 +9,12 @@ is sometimes necessary to define new transformation stages either because some f stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph junctions of various kinds. +.. note:: + A custom graph stage should not be the first tool you reach for, defining graphs using flows + and the graph DSL is in general easier and does to a larger extent protect you from mistakes that + might be easy to make with a custom :class:`GraphStage` + + .. _graphstage-java: Custom processing with GraphStage @@ -79,7 +85,7 @@ in that state. | .. image:: ../images/outport_transitions.png -:align: center + :align: center | @@ -115,7 +121,7 @@ in that state. | .. image:: ../images/inport_transitions.png -:align: center + :align: center | @@ -125,22 +131,160 @@ Finally, there are two methods available for convenience to complete the stage a * ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports. +In some cases it is inconvenient and error prone to react on the regular state machine events with the +signal based API described above. For those cases there is a API which allows for a more declarative sequencing +of actions which will greatly simplify some use cases at the cost of some extra allocations. The difference +between the two APIs could be described as that the first one is signal driven from the outside, while this API +is more active and drives its surroundings. + +The operations of this part of the :class:``GraphStage`` API are: + +* ``emit(out, elem)`` and ``emitMultiple(out, Iterable(elem1, elem2))`` replaces the ``OutHandler`` with a handler that emits + one or more elements when there is demand, and then reinstalls the current handlers +* ``read(in)(andThen)`` and ``readN(in, n)(andThen)`` replaces the ``InHandler`` with a handler that reads one or + more elements as they are pushed and allows the handler to react once the requested number of elements has been read. +* ``abortEmitting()`` and ``abortReading()`` which will cancel an ongoing emit or read + +Note that since the above methods are implemented by temporarily replacing the handlers of the stage you should never +call ``setHandler`` while they are running ``emit`` or ``read`` as that interferes with how they are implemented. +The following methods are safe to call after invoking ``emit`` and ``read`` (and will lead to actually running the +operation when those are done): ``complete(out)``, ``completeStage()``, ``emit``, ``emitMultiple``, ``abortEmitting()`` +and ``abortReading()`` + +An example of how this API simplifies a stage can be found below in the second version of the :class:``Duplicator``. + +Custom linear processing stages using GraphStage +------------------------------------------------ + +Graph stages allows for custom linear processing stages through letting them +have one input and one output and using :class:`FlowShape` as their shape. + +Such a stage can be illustrated as a box with two flows as it is +seen in the illustration below. Demand flowing upstream leading to elements +flowing downstream. + +| + +.. image:: ../images/graph_stage_conceptual.png + :align: center + :width: 500 + +| + + +To illustrate these concepts we create a small :class:`GraphStage` that implements the ``map`` transformation. + +| + +.. image:: ../images/graph_stage_map.png + :align: center + :width: 300 + +| + +Map calls ``push(out)`` from the ``onPush()`` handler and it also calls ``pull()`` from the ``onPull`` handler resulting in the +conceptual wiring above, and fully expressed in code below: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#one-to-one + +Map is a typical example of a one-to-one transformation of a stream where +demand is passed along upstream elements passed on downstream. + +To demonstrate a many-to-one stage we will implement +filter. The conceptual wiring of ``Filter`` looks like this: + +| + +.. image:: ../images/graph_stage_filter.png + :align: center + :width: 300 + +| + + +As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise +we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map +example by adding a conditional in the ``onPush`` handler and decide between a ``pull(in)`` or ``push(out)`` call +(and of course not having a mapping ``f`` function). + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#many-to-one + +To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage +that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: + +| + +.. image:: ../images/graph_stage_duplicate.png + :align: center + :width: 300 + +| + +This is a stage that has state: an option with the last element it has seen indicating if it +has duplicated this last element already or not. We must also make sure to emit the extra element +if the upstream completes. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#one-to-many + +In this case a pull from downstream might be consumed by the stage itself rather +than passed along upstream as the stage might contain an element it wants to +push. Note that we also need to handle the case where the upstream closes while +the stage still has elements it wants to push downstream. This is done by +overriding `onUpstreamFinish` in the `AbstractInHandler` and provide custom logic +that should happen when the upstream has been finished. + +This example can be simplified by replacing the usage of a mutable state with calls to +``emitMultiple`` which will replace the handlers, emit each of multiple elements and then +reinstate the original handlers: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#simpler-one-to-many + +Finally, to demonstrate all of the stages above, we put them together into a processing chain, +which conceptually would correspond to the following structure: + + +| + +.. image:: ../images/graph_stage_chain.png + :align: center + :width: 700 + +| + +In code this is only a few lines, using the ``via`` use our custom stages in a stream: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#graph-stage-chain + +If we attempt to draw the sequence of events, it shows that there is one "event token" +in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts. + + +| + +.. image:: ../images/graph_stage_tracks_1.png + :align: center + :width: 700 + +| + + Completion ---------- -**This section is a stub and will be extended in the next release** +Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit +a few more elements after their upstream source has been completed. We have seen an example of this in our +first :class:`Duplicator` implementation where the last element needs to be doubled even after the upstream neighbor +stage has been completed. This can be done by overriding the ``onUpstreamFinish`` method in ``AbstractInHandler``. Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally. -It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in -the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()`` +It is possible to opt out from this behavior by invoking ``setKeepGoing(true)`` (which is not supported from the stage’s +constructor and usually done in ``preStart``). In this case the stage **must** be explicitly closed by calling ``completeStage()`` or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used with care. Using timers ------------ -**This section is a stub and will be extended in the next release** - It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or ``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example @@ -151,11 +295,14 @@ fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the ``preStart()`` lifecycle hook. +In this sample the stage toggles between open and closed, where open means no elements are passed through. The +stage starts out as closed but as soon as an element is pushed downstream the gate becomes open for a duration +of time during which it will consume and drop upstream messages: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#timed + Using asynchronous side-channels -------------------------------- - -**This section is a stub and will be extended in the next release** - In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous @@ -167,6 +314,13 @@ implementation. Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the ``preStart()`` lifecycle hook instead. + +This example shows an asynchronous side channel graph stage that starts dropping elements +when a future completes: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#async-side-channel + + Integration with actors ----------------------- @@ -188,8 +342,6 @@ or ``unwatch(ref)`` methods. The reference can be also watched by external actor Custom materialized values -------------------------- -**This section is a stub and will be extended in the next release** - Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue` instead of the simpler :class:`GraphStage`. The difference is that in this case the method ``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the @@ -200,6 +352,10 @@ stage logic the materialized value must be provided the thread that got hold of the materialized value. It is the responsibility of the programmer to add the necessary (non-blocking) synchronization and visibility guarantees to this shared object. +In this sample the materialized value is a future containing the first element to go through the stream: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#materialized + Using attributes to affect the behavior of a stage -------------------------------------------------- @@ -213,237 +369,52 @@ decision. See :ref:`composition-java` for an explanation on how attributes work. -Custom linear processing stages -=============================== +Rate decoupled graph stages +--------------------------- -To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method -which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this -page. +Sometimes it is desirable to *decouple* the rate of the upstream and downstream of a stage, synchronizing only +when needed. -.. _stream-using-push-pull-stage-java: +This is achieved in the model by representing a :class:`GraphStage` as a *boundary* between two regions where the +demand sent upstream is decoupled from the demand that arrives from downstream. One immediate consequence of this +difference is that an ``onPush`` call does not always lead to calling ``push`` and an ``onPull`` call does not always +lead to calling ``pull``. -Using PushPullStage -------------------- - -The most elementary transformation stage is the :class:`PushPullStage` which can express a large class of algorithms -working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is -seen in the illustration below. - -| - -.. image:: ../images/stage_conceptual.png - :align: center - :width: 600 - -| - -The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports" -correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling -exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly. - -.. warning:: - There is one very important rule to remember when working with a ``Stage``. **Exactly one** method should be called - on the **currently passed** :class:`Context` **exactly once** and as the **last statement of the handler** where the return type - of the called method **matches the expected return type of the handler**. Any violation of this rule will - almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions - to this rule are the query methods ``isHolding()`` and ``isFinishing()`` - -To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation. - -| - -.. image:: ../images/stage_map.png - :align: center - :width: 300 - -| - -Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull`` -handler resulting in the conceptual wiring above, and fully expressed in code below: - -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#one-to-one - -Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement -filter. The conceptual wiring of ``Filter`` looks like this: - -| - -.. image:: ../images/stage_filter.png - :align: center - :width: 300 - -| - -As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise -we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map -example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call -(and of course not having a mapping ``f`` function). - -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#many-to-one - -To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage -that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: - -| - -.. image:: ../images/stage_doubler.png - :align: center - :width: 300 - -| - -This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we -have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull`` -method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we -will explain a little bit later. For now it is enough to look at the ``if(!ctx.isFinishing)`` block which -corresponds to the logic we expect by looking at the conceptual picture. - -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#one-to-many - -Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually -would correspond to the following structure: - -| - -.. image:: ../images/stage_chain.png - :align: center - :width: 650 - -| - -In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream: - -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#stage-chain - -If we attempt to draw the sequence of events, it shows that there is one "event token" -in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts. - -| - -.. image:: ../images/stage_msc_general.png - :align: center - -| - -Completion handling -^^^^^^^^^^^^^^^^^^^ - -Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit a few -more elements after their upstream source has been completed. We have seen an example of this in our ``Duplicator`` class -where the last element needs to be doubled even after the upstream neighbor stage has been completed. Since the -``onUpstreamFinish()`` handler expects a :class:`TerminationDirective` as the return type we are only allowed to call -``ctx.finish()``, ``ctx.fail()`` or ``ctx.absorbTermination()``. Since the first two of these available methods will -immediately terminate, our only option is ``absorbTermination()``. It is also clear from the return type of -``onUpstreamFinish`` that we cannot call ``ctx.push()`` but we need to emit elements somehow! The trick is that after -calling ``absorbTermination()`` the ``onPull()`` handler will be called eventually, and at the same time -``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to -emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing. - -The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of -Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that -our push/pull structure that was illustrated in the figure of our custom processing chain does not -apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a -pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling -``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at -that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was -*below* (which means that it will come back eventually, so the environment does not need to call anything yet). - -The first of the two scenarios is when a termination signal arrives after a stage passed the event to its downstream. As -we can see in the following diagram, there is no need to do anything by ``absorbTermination()`` since the black arrows -representing the movement of the "event token" is uninterrupted. - -| - -.. image:: ../images/stage_msc_absorb_1.png - :align: center - -| - -In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case -``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone -(since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage. - -| - -.. image:: ../images/stage_msc_absorb_2.png - :align: center - -| - -Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is -whether it is the downstream or the ``absorbTermination()`` call that calls the event handler. - -.. warning:: - It is not allowed to call ``absorbTermination()`` from ``onDownstreamFinish()``. If the method is called anyway, - it will be logged at ``ERROR`` level, but no further action will be taken as at that point there is no active - downstream to propagate the error to. Cancellation in the upstream direction will continue undisturbed. - -Using PushStage ---------------- - -Many one-to-one and many-to-one transformations do not need to override the ``onPull()`` handler at all since all -they do is just propagate the pull upwards. For such transformations it is better to extend PushStage directly. For -example our ``Map`` and ``Filter`` would look like this: - -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#pushstage - -The reason to use ``PushStage`` is not just cosmetic: internal optimizations rely on the fact that the onPull method -only calls ``ctx.pull()`` and allow the environment do process elements faster than without this knowledge. By -extending ``PushStage`` the environment can be sure that ``onPull()`` was not overridden since it is ``final`` on -``PushStage``. - -Using DetachedStage -------------------- - -The model described in previous sections, while conceptually simple, cannot describe all desired stages. The main -limitation is the "single-ball" (single "event token") model which prevents independent progress of an upstream and -downstream of a stage. Sometimes it is desirable to *detach* the progress (and therefore, rate) of the upstream and -downstream of a stage, synchronizing only when needed. - -This is achieved in the model by representing a :class:`DetachedStage` as a *boundary* between two "single-ball" regions. -One immediate consequence of this difference is that **it is not allowed to call** ``ctx.pull()`` **from** ``onPull()`` **and -it is not allowed to call** ``ctx.push()`` **from** ``onPush()`` as such combinations would "steal" a token from one region -(resulting in zero tokens left) and would inject an unexpected second token to the other region. This is enforced -by the expected return types of these callback functions. - -One of the important use-cases for :class:`DetachedStage` is to build buffer-like entities, that allow independent progress +One of the important use-case for this is to build buffer-like entities, that allow independent progress of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the -buffer becomes empty or full. The next diagram illustrates the event sequence for a buffer with capacity of two elements. +buffer becomes empty or full. + +The next diagram illustrates the event sequence for a buffer with capacity of two elements in a setting where +the downstream demand is slow to start and the buffer will fill up with upstream elements before any demand +is seen from downstream. | -.. image:: ../images/stage_msc_buffer.png - :align: center +.. image:: ../images/graph_stage_detached_tracks_1.png + :align: center + :width: 500 | -The very first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on -initialization. Remember that it is forbidden to call ``ctx.pull`` from ``onPull``, therefore it is the task of the -framework to kick off the first "event token" in the upstream region, which will remain there until the upstream stages -stop. The diagram distinguishes between the actions of the two regions by colors: *purple* arrows indicate the actions -involving the upstream "event token", while *red* arrows show the downstream region actions. This demonstrates the clear -separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged. +Another scenario would be where the demand from downstream starts coming in before any element is pushed +into the buffer stage. -For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream -or downstream. The new API calls that are available for :class:`DetachedStage` s are the various ``ctx.holdXXX()`` methods -, ``ctx.pushAndPull()`` and variants, and ``ctx.isHoldingXXX()``. -Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding -region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding()`` -which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not -both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only -possible by calling ``ctx.pushAndPull()``. This is to ensure that both the held token is released, and the triggering region -gets its token back (one inbound token + one held token = two released tokens). -The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed. +| -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#detached +.. image:: ../images/graph_stage_detached_tracks_2.png + :align: center + :width: 500 -.. warning:: - If ``absorbTermination()`` is called on a :class:`DetachedStage` while it holds downstream (``isHoldingDownstream`` - returns true) then ``onPull()`` will be called on the stage. This ensures that the stage does not end up in a - deadlocked case. Since at the point when the termination is absorbed there will be no way to get any callbacks because - the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination - logic already shown for :class:`PushPullStage`. +| + + +The first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on +initialization. The buffer has demand for up to two elements without any downstream demand. + +The following code example demonstrates a buffer class corresponding to the message sequence chart above. + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached Thread safety of custom processing stages ========================================= @@ -457,7 +428,7 @@ In essence, the above guarantees are similar to what :class:`Actor` s provide, i stage as state of an actor, and the callbacks as the ``receive`` block of the actor. .. warning:: - It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it - is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** - internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined - behavior. + It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it + is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** + internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined + behavior. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala index daac5efc99..7e950832c1 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala @@ -3,15 +3,16 @@ */ package docs.stream -import akka.stream.javadsl.Sink -import akka.stream.scaladsl.Source -import akka.stream.stage.{ OutHandler, GraphStage, GraphStageLogic } +import akka.stream.scaladsl.{ Keep, Sink, Flow, Source } +import akka.stream.stage._ import akka.stream._ -import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } -import scala.concurrent.{ Await, Future } +import scala.collection.mutable +import scala.concurrent.{ Promise, Await, Future } import scala.concurrent.duration._ +import scala.collection.immutable.Iterable class GraphStageDocSpec extends AkkaSpec { @@ -83,4 +84,425 @@ class GraphStageDocSpec extends AkkaSpec { Await.result(result2, 3.seconds) should ===(5050) } -} + //#one-to-one + class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] { + + val in = Inlet[A]("Map.in") + val out = Outlet[B]("Map.out") + + override val shape = FlowShape.of(in, out) + + override def createLogic(attr: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = { + push(out, f(grab(in))) + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pull(in) + } + }) + } + } + //#one-to-one + + "Demonstrate a one to one element GraphStage" in { + // tests: + val stringLength = Flow.fromGraph(new Map[String, Int](_.length)) + + val result = + Source(Vector("one", "two", "three")) + .via(stringLength) + .runFold(Seq.empty[Int])((elem, acc) => elem :+ acc) + + Await.result(result, 3.seconds) should ===(Seq(3, 3, 5)) + } + + //#many-to-one + class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { + + val in = Inlet[A]("Filter.in") + val out = Outlet[A]("Filter.out") + + val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + if (p(elem)) push(out, elem) + else pull(in) + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pull(in) + } + }) + } + } + //#many-to-one + + "Demonstrate a many to one element GraphStage" in { + + // tests: + val evenFilter = Flow.fromGraph(new Filter[Int](_ % 2 == 0)) + + val result = + Source(Vector(1, 2, 3, 4, 5, 6)) + .via(evenFilter) + .runFold(Seq.empty[Int])((elem, acc) => elem :+ acc) + + Await.result(result, 3.seconds) should ===(Seq(2, 4, 6)) + } + + //#one-to-many + class Duplicator[A] extends GraphStage[FlowShape[A, A]] { + + val in = Inlet[A]("Duplicator.in") + val out = Outlet[A]("Duplicator.out") + + val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + // Again: note that all mutable state + // MUST be inside the GraphStageLogic + var lastElem: Option[A] = None + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + lastElem = Some(elem) + push(out, elem) + } + + override def onUpstreamFinish(): Unit = { + if (lastElem.isDefined) emit(out, lastElem.get) + complete(out) + } + + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (lastElem.isDefined) { + push(out, lastElem.get) + lastElem = None + } else { + pull(in) + } + } + }) + } + } + //#one-to-many + + "Demonstrate a one to many element GraphStage" in { + // tests: + val duplicator = Flow.fromGraph(new Duplicator[Int]) + + val result = + Source(Vector(1, 2, 3)) + .via(duplicator) + .runFold(Seq.empty[Int])((elem, acc) => elem :+ acc) + + Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3)) + } + + "Demonstrate a simpler one to many stage" in { + //#simpler-one-to-many + class Duplicator[A] extends GraphStage[FlowShape[A, A]] { + + val in = Inlet[A]("Duplicator.in") + val out = Outlet[A]("Duplicator.out") + + val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + // this will temporarily suspend this handler until the two elems + // are emitted and then reinstates it + emitMultiple(out, Iterable(elem, elem)) + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pull(in) + } + }) + } + } + //#simpler-one-to-many + + // tests: + val duplicator = Flow.fromGraph(new Duplicator[Int]) + + val result = + Source(Vector(1, 2, 3)) + .via(duplicator) + .runFold(Seq.empty[Int])((elem, acc) => elem :+ acc) + + Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3)) + + } + + "Demonstrate chaining of graph stages" in { + val sink = Sink.fold[List[Int], Int](List.empty[Int])((acc, n) => acc :+ n) + + //#graph-stage-chain + val resultFuture = Source(1 to 5) + .via(new Filter(_ % 2 == 0)) + .via(new Duplicator()) + .via(new Map(_ / 2)) + .runWith(sink) + + //#graph-stage-chain + + Await.result(resultFuture, 3.seconds) should ===(List(1, 1, 2, 2)) + } + + "Demonstrate an asynchronous side channel" in { + import system.dispatcher + //#async-side-channel + // will close upstream when the future completes + class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] { + + val in = Inlet[A]("KillSwitch.in") + val out = Outlet[A]("KillSwitch.out") + + val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + + override def preStart(): Unit = { + val callback = getAsyncCallback[Unit] { (_) => + completeStage() + } + switch.foreach(callback.invoke) + } + + setHandler(in, new InHandler { + override def onPush(): Unit = { push(out, grab(in)) } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = { pull(in) } + }) + } + } + //#async-side-channel + + // tests: + val switch = Promise[Unit]() + val duplicator = Flow.fromGraph(new KillSwitch[Int](switch.future)) + + // TODO this is probably racey, is there a way to make sure it happens after? + val valueAfterKill = switch.future.flatMap(_ => Future(4)) + + val result = + Source(Vector(1, 2, 3)).concat(Source.fromFuture(valueAfterKill)) + .via(duplicator) + .runFold(Seq.empty[Int])((elem, acc) => elem :+ acc) + + switch.success(Unit) + + Await.result(result, 3.seconds) should ===(Seq(1, 2, 3)) + } + + "Demonstrate a graph stage with a timer" in { + + //#timed + // each time an event is pushed through it will trigger a period of silence + class TimedGate[A](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[A, A]] { + + val in = Inlet[A]("TimedGate.in") + val out = Outlet[A]("TimedGate.out") + + val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) { + + var open = false + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + if (open) pull(in) + else { + push(out, elem) + open = true + scheduleOnce(None, silencePeriod) + } + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = { pull(in) } + }) + + override protected def onTimer(timerKey: Any): Unit = { + open = false + } + } + } + //#timed + + // tests: + val result = + Source(Vector(1, 2, 3)) + .via(new TimedGate[Int](2.second)) + .takeWithin(250.millis) + .runFold(Seq.empty[Int])((elem, acc) => elem :+ acc) + + Await.result(result, 3.seconds) should ===(Seq(1)) + } + + "Demonstrate a custom materialized value" in { + + //#materialized + class FirstValue[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[A]] { + + val in = Inlet[A]("FirstValue.in") + val out = Outlet[A]("FirstValue.out") + + val shape = FlowShape.of(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[A]) = { + val promise = Promise[A]() + val logic = new GraphStageLogic(shape) { + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + promise.success(elem) + push(out, elem) + + // replace handler with one just forwarding + setHandler(in, new InHandler { + override def onPush(): Unit = { + push(out, grab(in)) + } + }) + } + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pull(in) + } + }) + + } + + (logic, promise.future) + } + } + //#materialized + + // tests: + val flow = Source(Vector(1, 2, 3)) + .viaMat(new FirstValue)(Keep.right) + .to(Sink.ignore) + + val result: Future[Int] = flow.run() + + Await.result(result, 3.seconds) should ===(1) + + } + + "Demonstrate a detached graph stage" in { + + //#detached + class TwoBuffer[A] extends GraphStage[FlowShape[A, A]] { + + val in = Inlet[A]("TwoBuffer.in") + val out = Outlet[A]("TwoBuffer.out") + + val shape = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + + val buffer = mutable.Queue[A]() + def bufferFull = buffer.size == 2 + var downstreamWaiting = false + + override def preStart(): Unit = { + // a detached stage needs to start upstream demand + // itself as it is not triggered by downstream demand + pull(in) + } + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + buffer.enqueue(elem) + if (downstreamWaiting) { + downstreamWaiting = false + val bufferedElem = buffer.dequeue() + push(out, bufferedElem) + } + if (!bufferFull) { + pull(in) + } + } + + override def onUpstreamFinish(): Unit = { + if (buffer.nonEmpty) { + // emit the rest if possible + emitMultiple(out, buffer.toIterator) + } + completeStage() + } + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (buffer.isEmpty) { + downstreamWaiting = true + } else { + val elem = buffer.dequeue + push(out, elem) + } + if (!bufferFull && !hasBeenPulled(in)) { + pull(in) + } + } + }) + } + + } + //#detached + + // tests: + val result1 = Source(Vector(1, 2, 3)) + .via(new TwoBuffer) + .runFold(Vector.empty[Int])((acc, n) => acc :+ n) + + Await.result(result1, 3.seconds) should ===(Vector(1, 2, 3)) + + val subscriber = TestSubscriber.manualProbe[Int]() + val publisher = TestPublisher.probe[Int]() + val flow2 = + Source.fromPublisher(publisher) + .via(new TwoBuffer) + .to(Sink.fromSubscriber(subscriber)) + + val result2 = flow2.run() + + val sub = subscriber.expectSubscription() + // this happens even though the subscriber has not signalled any demand + publisher.sendNext(1) + publisher.sendNext(2) + + sub.cancel() + } + +} \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index a50268e869..679c9f1ea2 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -9,6 +9,12 @@ is sometimes necessary to define new transformation stages either because some f stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph junctions of various kinds. +.. note:: + A custom graph stage should not be the first tool you reach for, defining graphs using flows + and the graph DSL is in general easier and does to a larger extent protect you from mistakes that + might be easy to make with a custom :class:`GraphStage` + + .. _graphstage-scala: Custom processing with GraphStage @@ -129,22 +135,162 @@ Finally, there are two methods available for convenience to complete the stage a * ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports. +In some cases it is inconvenient and error prone to react on the regular state machine events with the +signal based API described above. For those cases there is a API which allows for a more declarative sequencing +of actions which will greatly simplify some use cases at the cost of some extra allocations. The difference +between the two APIs could be described as that the first one is signal driven from the outside, while this API +is more active and drives its surroundings. + +The operations of this part of the :class:``GraphStage`` API are: + +* ``emit(out, elem)`` and ``emitMultiple(out, Iterable(elem1, elem2))`` replaces the ``OutHandler`` with a handler that emits + one or more elements when there is demand, and then reinstalls the current handlers +* ``read(in)(andThen)`` and ``readN(in, n)(andThen)`` replaces the ``InHandler`` with a handler that reads one or + more elements as they are pushed and allows the handler to react once the requested number of elements has been read. +* ``abortEmitting()`` and ``abortReading()`` which will cancel an ongoing emit or read + +Note that since the above methods are implemented by temporarily replacing the handlers of the stage you should never +call ``setHandler`` while they are running ``emit`` or ``read`` as that interferes with how they are implemented. +The following methods are safe to call after invoking ``emit`` and ``read`` (and will lead to actually running the +operation when those are done): ``complete(out)``, ``completeStage()``, ``emit``, ``emitMultiple``, ``abortEmitting()`` +and ``abortReading()`` + +An example of how this API simplifies a stage can be found below in the second version of the :class:``Duplicator``. + + +Custom linear processing stages using GraphStage +------------------------------------------------ + +Graph stages allows for custom linear processing stages through letting them +have one input and one output and using :class:`FlowShape` as their shape. + +Such a stage can be illustrated as a box with two flows as it is +seen in the illustration below. Demand flowing upstream leading to elements +flowing downstream. + +| + +.. image:: ../images/graph_stage_conceptual.png + :align: center + :width: 500 + +| + + +To illustrate these concepts we create a small :class:`GraphStage` that implements the ``map`` transformation. + +| + +.. image:: ../images/graph_stage_map.png + :align: center + :width: 300 + +| + +Map calls ``push(out)`` from the ``onPush()`` handler and it also calls ``pull()`` from the ``onPull`` handler resulting in the +conceptual wiring above, and fully expressed in code below: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#one-to-one + +Map is a typical example of a one-to-one transformation of a stream where +demand is passed along upstream elements passed on downstream. + +To demonstrate a many-to-one stage we will implement +filter. The conceptual wiring of ``Filter`` looks like this: + +| + +.. image:: ../images/graph_stage_filter.png + :align: center + :width: 300 + +| + + +As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise +we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map +example by adding a conditional in the ``onPush`` handler and decide between a ``pull(in)`` or ``push(out)`` call +(and of course not having a mapping ``f`` function). + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#many-to-one + +To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage +that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: + +| + +.. image:: ../images/graph_stage_duplicate.png + :align: center + :width: 300 + +| + +This is a stage that has state: an option with the last element it has seen indicating if it +has duplicated this last element already or not. We must also make sure to emit the extra element +if the upstream completes. + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#one-to-many + +In this case a pull from downstream might be consumed by the stage itself rather +than passed along upstream as the stage might contain an element it wants to +push. Note that we also need to handle the case where the upstream closes while +the stage still has elements it wants to push downstream. This is done by +overriding `onUpstreamFinish` in the `InHandler` and provide custom logic +that should happen when the upstream has been finished. + +This example can be simplified by replacing the usage of a mutable state with calls to +``emitMultiple`` which will replace the handlers, emit each of multiple elements and then +reinstate the original handlers: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#simpler-one-to-many + + +Finally, to demonstrate all of the stages above, we put them together into a processing chain, +which conceptually would correspond to the following structure: + + +| + +.. image:: ../images/graph_stage_chain.png + :align: center + :width: 700 + +| + +In code this is only a few lines, using the ``via`` use our custom stages in a stream: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#graph-stage-chain + +If we attempt to draw the sequence of events, it shows that there is one "event token" +in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts. + + +| + +.. image:: ../images/graph_stage_tracks_1.png + :align: center + :width: 700 + +| + Completion ---------- -**This section is a stub and will be extended in the next release** +Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit +a few more elements after their upstream source has been completed. We have seen an example of this in our +first :class:`Duplicator` implementation where the last element needs to be doubled even after the upstream neighbor +stage has been completed. This can be done by overriding the ``onUpstreamFinish`` method in ``InHandler``. Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally. -It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in -the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()`` +It is possible to opt out from this behavior by invoking ``setKeepGoing(true)`` (which is not supported from the stage’s +constructor and usually done in ``preStart``). In this case the stage **must** be explicitly closed by calling ``completeStage()`` or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used with care. + Using timers ------------ -**This section is a stub and will be extended in the next release** - It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or ``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example @@ -155,11 +301,15 @@ fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the ``preStart()`` lifecycle hook. +In this sample the stage toggles between open and closed, where open means no elements are passed through. The +stage starts out as closed but as soon as an element is pushed downstream the gate becomes open for a duration +of time during which it will consume and drop upstream messages: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#timed + + Using asynchronous side-channels -------------------------------- - -**This section is a stub and will be extended in the next release** - In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous @@ -171,6 +321,12 @@ implementation. Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the ``preStart()`` lifecycle hook instead. + +This example shows an asynchronous side channel graph stage that starts dropping elements +when a future completes: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#async-side-channel + Integration with actors ----------------------- @@ -189,11 +345,10 @@ or ``unwatch(ref)`` methods. The reference can be also watched by external actor - they cannot be accessed from the constructor of the :class:`GraphStageLogic`, but they can be accessed from the ``preStart()`` method. + Custom materialized values -------------------------- -**This section is a stub and will be extended in the next release** - Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue` instead of the simpler :class:`GraphStage`. The difference is that in this case the method ``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the @@ -204,6 +359,11 @@ stage logic the materialized value must be provided the thread that got hold of the materialized value. It is the responsibility of the programmer to add the necessary (non-blocking) synchronization and visibility guarantees to this shared object. +In this sample the materialized value is a future containing the first element to go through the stream: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#materialized + + Using attributes to affect the behavior of a stage -------------------------------------------------- @@ -217,239 +377,54 @@ decision. See :ref:`composition-scala` for an explanation on how attributes work. -Custom linear processing stages with PushPullStage -================================================== - -To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method -which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this -page. - -.. _stream-using-push-pull-stage-scala: - -Using PushPullStage -------------------- - -The most elementary transformation stage is the :class:`PushPullStage` which can express a large class of algorithms -working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is -seen in the illustration below. - -| - -.. image:: ../images/stage_conceptual.png - :align: center - :width: 600 - -| - -The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports" -correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling -exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly. - -.. warning:: - There is one very important rule to remember when working with a ``Stage``. **Exactly one** method should be called - on the **currently passed** :class:`Context` **exactly once** and as the **last statement of the handler** where the return type - of the called method **matches the expected return type of the handler**. Any violation of this rule will - almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions - to this rule are the query methods ``isHolding()`` and ``isFinishing()`` - -To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation. - -| - -.. image:: ../images/stage_map.png - :align: center - :width: 300 - -| - -Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull`` -handler resulting in the conceptual wiring above, and fully expressed in code below: - -.. includecode:: code/docs/stream/FlowStagesSpec.scala#one-to-one - -Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement -filter. The conceptual wiring of ``Filter`` looks like this: - -| - -.. image:: ../images/stage_filter.png - :align: center - :width: 300 - -| - -As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise -we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map -example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call -(and of course not having a mapping ``f`` function). - -.. includecode:: code/docs/stream/FlowStagesSpec.scala#many-to-one - -To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage -that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this: - -| - -.. image:: ../images/stage_doubler.png - :align: center - :width: 300 - -| - -This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we -have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull`` -method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we -will explain a little bit later. For now it is enough to look at the ``if(!ctx.isFinishing)`` block which -corresponds to the logic we expect by looking at the conceptual picture. - -.. includecode:: code/docs/stream/FlowStagesSpec.scala#one-to-many - -Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually -would correspond to the following structure: - -| - -.. image:: ../images/stage_chain.png - :align: center - :width: 650 - -| - -In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream: - -.. includecode:: code/docs/stream/FlowStagesSpec.scala#stage-chain - -If we attempt to draw the sequence of events, it shows that there is one "event token" -in circulation in a potential chain of stages, just like our conceptual "railroad tracks" representation predicts. - -| - -.. image:: ../images/stage_msc_general.png - :align: center - -| -Completion handling -^^^^^^^^^^^^^^^^^^^ +Rate decoupled graph stages +--------------------------- -Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit a few -more elements after their upstream source has been completed. We have seen an example of this in our ``Duplicator`` class -where the last element needs to be doubled even after the upstream neighbor stage has been completed. Since the -``onUpstreamFinish()`` handler expects a :class:`TerminationDirective` as the return type we are only allowed to call -``ctx.finish()``, ``ctx.fail()`` or ``ctx.absorbTermination()``. Since the first two of these available methods will -immediately terminate, our only option is ``absorbTermination()``. It is also clear from the return type of -``onUpstreamFinish`` that we cannot call ``ctx.push()`` but we need to emit elements somehow! The trick is that after -calling ``absorbTermination()`` the ``onPull()`` handler will be called eventually, and at the same time -``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to -emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing. +Sometimes it is desirable to *decouple* the rate of the upstream and downstream of a stage, synchronizing only +when needed. -The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of -Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that -our push/pull structure that was illustrated in the figure of our custom processing chain does not -apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a -pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling -``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at -that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was -*below* (which means that it will come back eventually, so the environment does not need to call anything yet). +This is achieved in the model by representing a :class:`GraphStage` as a *boundary* between two regions where the +demand sent upstream is decoupled from the demand that arrives from downstream. One immediate consequence of this +difference is that an ``onPush`` call does not always lead to calling ``push`` and an ``onPull`` call does not always +lead to calling ``pull``. -The first of the two scenarios is when a termination signal arrives after a stage passed the event to its downstream. As -we can see in the following diagram, there is no need to do anything by ``absorbTermination()`` since the black arrows -representing the movement of the "event token" is uninterrupted. - -| - -.. image:: ../images/stage_msc_absorb_1.png - :align: center - -| - -In the second scenario the "event token" is somewhere upstream when the termination signal arrives. In this case -``absorbTermination`` needs to ensure that a new "event token" is generated replacing the old one that is forever gone -(since the upstream finished). This is done by calling the ``onPull()`` event handler of the stage. - -| - -.. image:: ../images/stage_msc_absorb_2.png - :align: center - -| - -Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is -whether it is the downstream or the ``absorbTermination()`` call that calls the event handler. - -.. warning:: - It is not allowed to call ``absorbTermination()`` from ``onDownstreamFinish()``. If the method is called anyway, - it will be logged at ``ERROR`` level, but no further action will be taken as at that point there is no active - downstream to propagate the error to. Cancellation in the upstream direction will continue undisturbed. - -Using PushStage ---------------- - -Many one-to-one and many-to-one transformations do not need to override the ``onPull()`` handler at all since all -they do is just propagate the pull upwards. For such transformations it is better to extend PushStage directly. For -example our ``Map`` and ``Filter`` would look like this: - -.. includecode:: code/docs/stream/FlowStagesSpec.scala#pushstage - -The reason to use ``PushStage`` is not just cosmetic: internal optimizations rely on the fact that the onPull method -only calls ``ctx.pull()`` and allow the environment do process elements faster than without this knowledge. By -extending ``PushStage`` the environment can be sure that ``onPull()`` was not overridden since it is ``final`` on -``PushStage``. - - -Using DetachedStage -------------------- - -The model described in previous sections, while conceptually simple, cannot describe all desired stages. The main -limitation is the "single-ball" (single "event token") model which prevents independent progress of an upstream and -downstream of a stage. Sometimes it is desirable to *detach* the progress (and therefore, rate) of the upstream and -downstream of a stage, synchronizing only when needed. - -This is achieved in the model by representing a :class:`DetachedStage` as a *boundary* between two "single-ball" regions. -One immediate consequence of this difference is that **it is not allowed to call** ``ctx.pull()`` **from** ``onPull()`` **and -it is not allowed to call** ``ctx.push()`` **from** ``onPush()`` as such combinations would "steal" a token from one region -(resulting in zero tokens left) and would inject an unexpected second token to the other region. This is enforced -by the expected return types of these callback functions. - -One of the important use-cases for :class:`DetachedStage` is to build buffer-like entities, that allow independent progress +One of the important use-case for this is to build buffer-like entities, that allow independent progress of upstream and downstream stages when the buffer is not full or empty, and slowing down the appropriate side if the -buffer becomes empty or full. The next diagram illustrates the event sequence for a buffer with capacity of two elements. +buffer becomes empty or full. + +The next diagram illustrates the event sequence for a buffer with capacity of two elements in a setting where +the downstream demand is slow to start and the buffer will fill up with upstream elements before any demand +is seen from downstream. | -.. image:: ../images/stage_msc_buffer.png - :align: center +.. image:: ../images/graph_stage_detached_tracks_1.png + :align: center + :width: 500 | -The very first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on -initialization. Remember that it is forbidden to call ``ctx.pull`` from ``onPull``, therefore it is the task of the -framework to kick off the first "event token" in the upstream region, which will remain there until the upstream stages -stop. The diagram distinguishes between the actions of the two regions by colors: *purple* arrows indicate the actions -involving the upstream "event token", while *red* arrows show the downstream region actions. This demonstrates the clear -separation of these regions, and the invariant that the number of tokens in the two regions are kept unchanged. +Another scenario would be where the demand from downstream starts coming in before any element is pushed +into the buffer stage. -For buffer it is necessary to detach the two regions, but it is also necessary to sometimes hold back the upstream -or downstream. The new API calls that are available for :class:`DetachedStage` s are the various ``ctx.holdXXX()`` methods -, ``ctx.pushAndPull()`` and variants, and ``ctx.isHoldingXXX()``. -Calling ``ctx.holdXXX()`` from ``onPull()`` or ``onPush`` results in suspending the corresponding -region from progress, and temporarily taking ownership of the "event token". This state can be queried by ``ctx.isHolding()`` -which will tell if the stage is currently holding a token or not. It is only allowed to suspend one of the regions, not -both, since that would disable all possible future events, resulting in a dead-lock. Releasing the held token is only -possible by calling ``ctx.pushAndPull()``. This is to ensure that both the held token is released, and the triggering region -gets its token back (one inbound token + one held token = two released tokens). -The following code example demonstrates the buffer class corresponding to the message sequence chart we discussed. +| -.. includecode:: code/docs/stream/FlowStagesSpec.scala#detached +.. image:: ../images/graph_stage_detached_tracks_2.png + :align: center + :width: 500 -.. warning:: - If ``absorbTermination()`` is called on a :class:`DetachedStage` while it holds downstream (``isHoldingDownstream`` - returns true) then ``onPull()`` will be called on the stage. This ensures that the stage does not end up in a - deadlocked case. Since at the point when the termination is absorbed there will be no way to get any callbacks because - the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination - logic already shown for :class:`PushPullStage`. +| + + +The first difference we can notice is that our ``Buffer`` stage is automatically pulling its upstream on +initialization. The buffer has demand for up to two elements without any downstream demand. + +The following code example demonstrates a buffer class corresponding to the message sequence chart above. + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached Thread safety of custom processing stages diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 420d46c8b1..d1e541ed87 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -3,16 +3,15 @@ */ package akka.stream.stage -import java.util -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference } +import java.util.concurrent.atomic.{ AtomicReference } import akka.actor._ import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch } import akka.event.LoggingAdapter +import akka.japi.function.{ Effect, Procedure } import akka.stream._ import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly -import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule, GraphStageModule } +import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule } import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName } import scala.annotation.tailrec @@ -612,6 +611,17 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } } + /** + * Java API: Read a number of elements from the given inlet and continue with the given function, + * suspending execution if necessary. This action replaces the [[InHandler]] + * for the given inlet if suspension is needed and reinstalls the current + * handler upon receiving the last `onPush()` signal (before invoking the `andThen` function). + */ + final protected def readN[T](in: Inlet[T], n: Int, andThen: Procedure[java.util.List[T]], onClose: Procedure[java.util.List[T]]): Unit = { + import collection.JavaConverters._ + readN(in, n)(seq ⇒ andThen(seq.asJava), seq ⇒ onClose(seq.asJava)) + } + /** * Read an element from the given inlet and continue with the given function, * suspending execution if necessary. This action replaces the [[InHandler]] @@ -631,6 +641,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } } + /** + * Java API: Read an element from the given inlet and continue with the given function, + * suspending execution if necessary. This action replaces the [[InHandler]] + * for the given inlet if suspension is needed and reinstalls the current + * handler upon receiving the `onPush()` signal (before invoking the `andThen` function). + */ + final protected def read[T](in: Inlet[T], andThen: Procedure[T], onClose: Effect): Unit = { + read(in)(andThen.apply, onClose.apply) + } + /** * Abort outstanding (suspended) reading for the given inlet, if there is any. * This will reinstall the replaced handler that was in effect before the `read` @@ -690,6 +710,32 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def emitMultiple[T](out: Outlet[T], elems: immutable.Iterable[T]): Unit = emitMultiple(out, elems, DoNothing) + /** + * Java API + * + * Emit a sequence of elements through the given outlet, suspending execution if necessary. + * This action replaces the [[AbstractOutHandler]] for the given outlet if suspension + * is needed and reinstalls the current handler upon receiving an `onPull()` + * signal. + */ + final protected def emitMultiple[T](out: Outlet[T], elems: java.util.Iterator[T]): Unit = { + import collection.JavaConverters._ + emitMultiple(out, elems.asScala, DoNothing) + } + + /** + * Java API + * + * Emit a sequence of elements through the given outlet, suspending execution if necessary. + * This action replaces the [[AbstractOutHandler]] for the given outlet if suspension + * is needed and reinstalls the current handler upon receiving an `onPull()` + * signal. + */ + final protected def emitMultiple[T](out: Outlet[T], elems: java.util.Iterator[T], andThen: Effect): Unit = { + import collection.JavaConverters._ + emitMultiple(out, elems.asScala, andThen.apply _) + } + /** * Emit a sequence of elements through the given outlet and continue with the given thunk * afterwards, suspending execution if necessary. @@ -740,6 +786,10 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def emit[T](out: Outlet[T], elem: T): Unit = emit(out, elem, DoNothing) + final protected def emit[T](out: Outlet[T], elem: T, andThen: Effect): Unit = { + emit(out, elem, andThen.apply _) + } + /** * Abort outstanding (suspended) emissions for the given outlet, if there are any. * This will reinstall the replaced handler that was in effect before the `emit` @@ -875,6 +925,18 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } } + /** + * Java API: Obtain a callback object that can be used asynchronously to re-enter the + * current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned + * [[AsyncCallback]] is safe to be called from other threads and it will in the background thread-safely + * delegate to the passed callback function. I.e. [[invoke()]] will be called by the external world and + * the passed handler will be invoked eventually in a thread-safe way by the execution environment. + * + * This object can be cached and reused within the same [[GraphStageLogic]]. + */ + final protected def createAsyncCallback[T](handler: Procedure[T]): AsyncCallback[T] = + getAsyncCallback(handler.apply) + private var _stageActorRef: StageActorRef = _ final def stageActorRef: ActorRef = _stageActorRef match { case null ⇒ throw StageActorRefNotInitializedException() @@ -982,6 +1044,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap /** * Will be called when the scheduled timer is triggered. + * * @param timerKey key of the scheduled timer */ protected def onTimer(timerKey: Any): Unit = () @@ -1029,6 +1092,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap /** * Cancel timer, ensuring that the [[#onTimer]] is not subsequently called. + * * @param timerKey key of the timer to cancel */ final protected def cancelTimer(timerKey: Any): Unit =