Merge pull request #19026 from drewhk/wip-document-graphstage-drewhk
+str, doc: Documentation for GraphStage
This commit is contained in:
commit
b4fc3c11d8
10 changed files with 1601 additions and 40 deletions
|
|
@ -0,0 +1,86 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.stream
|
||||
|
||||
import akka.stream.javadsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.stage.{ OutHandler, GraphStage, GraphStageLogic }
|
||||
import akka.stream._
|
||||
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class GraphStageDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
"Demonstrate creation of GraphStage boilerplate" in {
|
||||
//#boilerplate-example
|
||||
import akka.stream.SourceShape
|
||||
import akka.stream.stage.GraphStage
|
||||
|
||||
class NumbersSource extends GraphStage[SourceShape[Int]] {
|
||||
// Define the (sole) output port of this stage
|
||||
val out: Outlet[Int] = Outlet("NumbersSource")
|
||||
// Define the shape of this stage, which is SourceShape with the port we defined above
|
||||
override val shape: SourceShape[Int] = SourceShape(out)
|
||||
|
||||
// This is where the actual (possibly stateful) logic will live
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
|
||||
}
|
||||
//#boilerplate-example
|
||||
|
||||
}
|
||||
|
||||
"Demonstrate creation of GraphStage Source" in {
|
||||
//#custom-source-example
|
||||
import akka.stream.SourceShape
|
||||
import akka.stream.Graph
|
||||
import akka.stream.stage.GraphStage
|
||||
import akka.stream.stage.OutHandler
|
||||
|
||||
class NumbersSource extends GraphStage[SourceShape[Int]] {
|
||||
val out: Outlet[Int] = Outlet("NumbersSource")
|
||||
override val shape: SourceShape[Int] = SourceShape(out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
// All state MUST be inside the GraphStageLogic,
|
||||
// never inside the enclosing GraphStage.
|
||||
// This state is safe to access and modify from all the
|
||||
// callbacks that are provided by GraphStageLogic and the
|
||||
// registered handlers.
|
||||
private var counter = 1
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
push(out, counter)
|
||||
counter += 1
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
//#custom-source-example
|
||||
|
||||
//#simple-source-usage
|
||||
// A GraphStage is a proper Graph, just like what FlowGraph.create would return
|
||||
val sourceGraph: Graph[SourceShape[Int], Unit] = new NumbersSource
|
||||
|
||||
// Create a Source from the Graph to access the DSL
|
||||
val mySource: Source[Int, Unit] = Source.fromGraph(new NumbersSource)
|
||||
|
||||
// Returns 55
|
||||
val result1: Future[Int] = mySource.take(10).runFold(0)(_ + _)
|
||||
|
||||
// The source is reusable. This returns 5050
|
||||
val result2: Future[Int] = mySource.take(100).runFold(0)(_ + _)
|
||||
//#simple-source-usage
|
||||
|
||||
Await.result(result1, 3.seconds) should ===(55)
|
||||
Await.result(result2, 3.seconds) should ===(5050)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -325,6 +325,20 @@ Update procedure
|
|||
See the example in the AsyncStage migration section for an example of this procedure.
|
||||
|
||||
|
||||
StatefulStage has been replaced by GraphStage
|
||||
=============================================
|
||||
|
||||
The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which
|
||||
caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used
|
||||
instead.
|
||||
|
||||
Update procedure
|
||||
----------------
|
||||
|
||||
There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation
|
||||
(:ref:`graphstage-java`).
|
||||
|
||||
|
||||
AsyncStage has been replaced by GraphStage
|
||||
==========================================
|
||||
|
||||
|
|
|
|||
|
|
@ -9,8 +9,216 @@ is sometimes necessary to define new transformation stages either because some f
|
|||
stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph
|
||||
junctions of various kinds.
|
||||
|
||||
Custom linear processing stages
|
||||
===============================
|
||||
.. _graphstage-scala:
|
||||
|
||||
Custom processing with GraphStage
|
||||
=================================
|
||||
|
||||
The :class:`GraphStage` abstraction can be used to create arbitrary graph processing stages with any number of input
|
||||
or output ports. It is a counterpart of the ``FlowGraph.create()`` method which creates new stream processing
|
||||
stages by composing others. Where :class:`GraphStage` differs is that it creates a stage that is itself not divisible into
|
||||
smaller ones, and allows state to be maintained inside it in a safe way.
|
||||
|
||||
As a first motivating example, we will build a new :class:`Source` that will simply emit numbers from 1 until it is
|
||||
cancelled. To start, we need to define the "interface" of our stage, which is called *shape* in Akka Streams terminology
|
||||
(this is explained in more detail in the section :ref:`composition-scala`). This is how this looks like:
|
||||
|
||||
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#boilerplate-example
|
||||
|
||||
As you see, in itself the :class:`GraphStage` only defines the ports of this stage and a shape that contains the ports.
|
||||
It also has, a currently unimplemented method called ``createLogic``. If you recall, stages are reusable in multiple
|
||||
materializations, each resulting in a different executing entity. In the case of :class:`GraphStage` the actual running
|
||||
logic is modeled as an instance of a :class:`GraphStageLogic` which will be created by the materializer by calling
|
||||
the ``createLogic`` method. In other words, all we need to do is to create a suitable logic that will emit the
|
||||
numbers we want.
|
||||
|
||||
In order to emit from a :class:`Source` in a backpressured stream one needs first to have demand from downstream.
|
||||
To receive the necessary events one needs to register a subclass of :class:`OutHandler` with the output port
|
||||
(:class:`Outlet`). This handler will receive events related to the lifecycle of the port. In our case we need to
|
||||
override ``onPull()`` which indicates that we are free to emit a single element. There is another callback,
|
||||
``onDownstreamFinish()`` which is called if the downstream cancelled. Since the default behavior of that callback is
|
||||
to stop the stage, we don't need to override it. In the ``onPull`` callback we will simply emit the next number. This
|
||||
is how it looks like in the end:
|
||||
|
||||
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#custom-source-example
|
||||
|
||||
Instances of the above :class:`GraphStage` are subclasses of ``Graph[SourceShape[Int],Unit]`` which means
|
||||
that they are already usable in many situations, but do not provide the DSL methods we usually have for other
|
||||
:class:`Source` s. In order to convert this :class:`Graph` to a proper :class:`Source` we need to wrap it using
|
||||
``Source.fromGraph`` (see :ref:`composition-scala` for more details about graphs and DSLs). Now we can use the
|
||||
source as any other built-in one:
|
||||
|
||||
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#simple-source-usage
|
||||
|
||||
Port states, InHandler and OutHandler
|
||||
-------------------------------------
|
||||
|
||||
In order to interact with a port (:class:`Inlet` or :class:`Outlet`) of the stage we need to be able to receive events
|
||||
and generate new events belonging to the port. From the :class:`GraphStageLogic` the following operations are available
|
||||
on an output port:
|
||||
|
||||
* ``push(out,elem)`` pushes an element to the output port. Only possible after the port has been pulled by downstream.
|
||||
* ``complete(out)`` closes the output port normally.
|
||||
* ``fail(out,exception)`` closes the port with a failure signal.
|
||||
|
||||
|
||||
The events corresponding to an *output* port can be received in an :class:`OutHandler` instance registered to the
|
||||
output port using ``setHandler(out,handler)``. This handler has two callbacks:
|
||||
|
||||
* ``onPull()`` is called when the output port is ready to emit the next element, ``push(out, elem)`` is now allowed
|
||||
to be called on this port.
|
||||
* ``onDownstreamFinish()`` is called once the downstream has cancelled and no longer allows messages to be pushed to it.
|
||||
No more ``onPull()`` will arrive after this event. If not overridden this will default to stopping the stage.
|
||||
|
||||
Also, there are two query methods available for output ports:
|
||||
|
||||
* ``isAvailable(out)`` returns true if a data element can be grabbed from the port
|
||||
* ``isClosed(out)`` returns true if the port is closed. At this point the port can not be pushed and will not be pulled anymore.
|
||||
|
||||
The relationship of the above operations, events and queries are summarized in the state machine below. Green shows
|
||||
the initial state while orange indicates the end state. If an operation is not listed for a state, then it is invalid
|
||||
to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen
|
||||
in that state.
|
||||
|
||||
|
|
||||
|
||||
.. image:: ../images/outport_transitions.png
|
||||
:align: center
|
||||
|
||||
|
|
||||
|
||||
The following operations are available for *input* ports:
|
||||
|
||||
* ``pull(in)`` requests a new element from an input port. This is only possible after the port has been pushed by upstream.
|
||||
* ``grab(in)`` acquires the element that has been received during an ``onPush()``. It cannot be called again until the
|
||||
port is pushed again by the upstream.
|
||||
* ``cancel(in)`` closes the input port.
|
||||
|
||||
The events corresponding to an *input* port can be received in an :class:`InHandler` instance registered to the
|
||||
input port using ``setHandler(in, handler)``. This handler has three callbacks:
|
||||
|
||||
* ``onPush()`` is called when the output port has now a new element. Now it is possible to aquire this element using
|
||||
``grab()`` and/or call ``pull(in)`` on the port to request the next element. It is not mandatory to grab the
|
||||
element, but if it is pulled while the element has not been grabbed it will drop the buffered element.
|
||||
* ``onUpstreamFinish()`` is called once the upstream has completed and no longer can be pulled for new elements.
|
||||
No more ``onPush()`` will arrive after this event. If not overridden this will default to stopping the stage.
|
||||
* ``onUpstreamFailure()`` is called if the upstream failed with an exception and no longer can be pulled for new elements.
|
||||
No more ``onPush()`` will arrive after this event. If not overridden this will default to failing the stage.
|
||||
|
||||
Also, there are three query methods available for input ports:
|
||||
|
||||
* ``isAvailable(out)`` returns true if the port can be grabbed.
|
||||
* ``hasBeenPulled(out)`` returns true if the port has been already pulled. Calling ``pull(in)`` in this state is illegal.
|
||||
* ``isClosed(in)`` returns true if the port is closed. At this point the port can not be pulled and will not be pushed anymore.
|
||||
|
||||
The relationship of the above operations, events and queries are summarized in the state machine below. Green shows
|
||||
the initial state while orange indicates the end state. If an operation is not listed for a state, then it is invalid
|
||||
to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen
|
||||
in that state.
|
||||
|
||||
|
|
||||
|
||||
.. image:: ../images/inport_transitions.png
|
||||
:align: center
|
||||
|
||||
|
|
||||
|
||||
Finally, there are two methods available for convenience to complete the stage and all of its ports:
|
||||
|
||||
* ``completeStage()`` is equivalent to closing all output ports and cancelling all input ports.
|
||||
* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports.
|
||||
|
||||
|
||||
Completion
|
||||
----------
|
||||
|
||||
**This section is a stub and will be extended in the next release**
|
||||
|
||||
Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally.
|
||||
It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in
|
||||
the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()``
|
||||
or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used
|
||||
with care.
|
||||
|
||||
Using timers
|
||||
------------
|
||||
|
||||
**This section is a stub and will be extended in the next release**
|
||||
|
||||
It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for
|
||||
the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or
|
||||
``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example
|
||||
a :class:`String`). The ``onTimer(key)`` method needs to be overridden and it will be called once the timer of ``key``
|
||||
fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the status of a timer with
|
||||
``isTimerActive(key)``. Timers will be automatically cleaned up when the stage completes.
|
||||
|
||||
Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the
|
||||
``preStart()`` lifecycle hook.
|
||||
|
||||
Using asynchronous side-channels
|
||||
--------------------------------
|
||||
|
||||
**This section is a stub and will be extended in the next release**
|
||||
|
||||
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
|
||||
or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the
|
||||
stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous
|
||||
event fires. It is important to **not call the callback directly**, instead, the external API must call the
|
||||
``invoke(event)`` method on the returned :class:`AsyncCallback`. The execution engine will take care of calling the
|
||||
provided callback in a thread-safe way. The callback can safely access the state of the :class:`GraphStageLogic`
|
||||
implementation.
|
||||
|
||||
Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the
|
||||
``preStart()`` lifecycle hook instead.
|
||||
|
||||
Integration with actors
|
||||
-----------------------
|
||||
|
||||
**This section is a stub and will be extended in the next release**
|
||||
**This is an experimental feature***
|
||||
|
||||
It is possible to acquire an ActorRef that can be addressed from the outside of the stage, similarly how
|
||||
:class:`AsyncCallback` allows injecting asynchronous events into a stage logic. This reference can be obtained
|
||||
by calling ``getStageActorRef(receive)`` passing in a function that takes a :class:`Pair` of the sender
|
||||
:class:`ActorRef` and the received message. This reference can be used to watch other actors by calling its ``watch(ref)``
|
||||
or ``unwatch(ref)`` methods. The reference can be also watched by external actors. The current limitations of this
|
||||
:class:`ActorRef` are:
|
||||
|
||||
- they are not location transparent, they cannot be accessed via remoting.
|
||||
- they cannot be returned as materialized values.
|
||||
- they cannot be accessed from the constructor of the :class:`GraphStageLogic`, but they can be accessed from the
|
||||
``preStart()`` method.
|
||||
|
||||
Custom materialized values
|
||||
--------------------------
|
||||
|
||||
**This section is a stub and will be extended in the next release**
|
||||
|
||||
Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue`
|
||||
instead of the simpler :class:`GraphStage`. The difference is that in this case the method
|
||||
``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the
|
||||
stage logic the materialized value must be provided
|
||||
|
||||
.. warning::
|
||||
There is no built-in synchronization of accessing this value from both of the thread where the logic runs and
|
||||
the thread that got hold of the materialized value. It is the responsibility of the programmer to add the
|
||||
necessary (non-blocking) synchronization and visibility guarantees to this shared object.
|
||||
|
||||
Using attributes to affect the behavior of a stage
|
||||
--------------------------------------------------
|
||||
|
||||
**This section is a stub and will be extended in the next release**
|
||||
|
||||
Stages can access the :class:`Attributes` object created by the materializer. This contains all the applied (inherited)
|
||||
attributes applying to the stage, ordered from least specific (outermost) towards the most specific (innermost)
|
||||
attribute. It is the responsibility of the stage to decide how to reconcile this inheritance chain to a final effective
|
||||
decision.
|
||||
|
||||
See :ref:`composition-scala` for an explanation on how attributes work.
|
||||
|
||||
|
||||
Custom linear processing stages with PushPullStage
|
||||
==================================================
|
||||
|
||||
To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method
|
||||
which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this
|
||||
|
|
@ -190,18 +398,6 @@ extending ``PushStage`` the environment can be sure that ``onPull()`` was not ov
|
|||
``PushStage``.
|
||||
|
||||
|
||||
Using StatefulStage
|
||||
-------------------
|
||||
|
||||
On top of ``PushPullStage`` which is the most elementary and low-level abstraction and ``PushStage`` that is a
|
||||
convenience class that also informs the environment about possible optimizations ``StatefulStage`` is a new tool that
|
||||
builds on ``PushPullStage`` directly, adding various convenience methods on top of it. It is possible to explicitly
|
||||
maintain state-machine like states using its ``become()`` method to encapsulates states explicitly. There is also
|
||||
a handy ``emit()`` method that simplifies emitting multiple values given as an iterator. To demonstrate this feature
|
||||
we reimplemented ``Duplicator`` in terms of a ``StatefulStage``:
|
||||
|
||||
.. includecode:: code/docs/stream/FlowStagesSpec.scala#doubler-stateful
|
||||
|
||||
Using DetachedStage
|
||||
-------------------
|
||||
|
||||
|
|
@ -255,12 +451,6 @@ The following code example demonstrates the buffer class corresponding to the me
|
|||
the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination
|
||||
logic already shown for :class:`PushPullStage`.
|
||||
|
||||
Custom graph processing junctions
|
||||
=================================
|
||||
|
||||
To extend available fan-in and fan-out structures (graph stages) Akka Streams include :class:`GraphStage`. This is an
|
||||
advanced usage DSL that should only be needed in rare and special cases, documentation will be forthcoming in one of the
|
||||
next releases.
|
||||
|
||||
Thread safety of custom processing stages
|
||||
=========================================
|
||||
|
|
|
|||
|
|
@ -68,3 +68,22 @@ You can also inject exceptions and test sink behaviour on error conditions.
|
|||
Test source and sink can be used together in combination when testing flows.
|
||||
|
||||
.. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-and-sink
|
||||
|
||||
|
||||
Fuzzing Mode
|
||||
============
|
||||
|
||||
For testing, it is possible to enable a special stream execution mode that exercises concurrent execution paths
|
||||
more aggressively (at the cost of reduced performance) and therefore helps exposing race conditions in tests. To
|
||||
enable this setting add the following line to your configuration:
|
||||
|
||||
::
|
||||
|
||||
akka.stream.materializer.debug.fuzzing-mode = on
|
||||
|
||||
|
||||
.. warning::
|
||||
|
||||
Never use this setting in production or benchmarks. This is a testing tool to provide more coverage of your code
|
||||
during tests, but it reduces the throughput of streams. A warning message will be logged if you have this setting
|
||||
enabled.
|
||||
Loading…
Add table
Add a link
Reference in a new issue