diff --git a/akka-docs-dev/rst/images/inport_transitions.png b/akka-docs-dev/rst/images/inport_transitions.png new file mode 100644 index 0000000000..f5b6c179d1 Binary files /dev/null and b/akka-docs-dev/rst/images/inport_transitions.png differ diff --git a/akka-docs-dev/rst/images/outport_transitions.png b/akka-docs-dev/rst/images/outport_transitions.png new file mode 100644 index 0000000000..7eb407a464 Binary files /dev/null and b/akka-docs-dev/rst/images/outport_transitions.png differ diff --git a/akka-docs-dev/rst/images/port_transitions.svg b/akka-docs-dev/rst/images/port_transitions.svg new file mode 100644 index 0000000000..9ffbc0c011 --- /dev/null +++ b/akka-docs-dev/rst/images/port_transitions.svg @@ -0,0 +1,1054 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + PUSHED + isAvailable + !hasBeenPulled + !isClosed + + + + PUSHED_EMPTY + !isAvailable + !hasBeenPulled + !isClosed + + + + PULLED + !isAvailable + hasBeenPulled + !isClosed + + + + CLOSED_EMPTY + !isAvailable + !hasBeenPulled + isClosed + + + grab() + + + pull() + onPush() + + + CLOSED + isAvailable + !hasBeenPulled + isClosed + + + + grab() + + + onUpstreamFinish() + cancel() + onUpstreamFailure() + onUpstreamFinish() + cancel() + onUpstreamFailure() + onUpstreamFinish() + cancel() + onUpstreamFailure() + + + PULLED + isAvailable + !isClosed + + + + + PUSHED + !isAvailable + !isClosed + + + + + + CLOSED + !isAvailable + isClosed + + + + + push(elem) + onPull() + + + complete()fail(ex)onDownstreamFinish() + + complete()fail(ex)onDownstreamFinish() + + + + cancel() + + cancel() + + complete()fail() + pull() + + diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index 5273332bed..4c453b54ef 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -336,6 +336,19 @@ Update procedure See the example in the AsyncStage migration section for an example of this procedure. +StatefulStage has been replaced by GraphStage +============================================= + +The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which +caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used +instead. + +Update procedure +---------------- + +There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation +(:ref:`graphstage-java`). + AsyncStage has been replaced by GraphStage ========================================== diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index 4019a10af6..0e0f8068a7 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -9,6 +9,210 @@ is sometimes necessary to define new transformation stages either because some f stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph junctions of various kinds. +.. _graphstage-java: + +Custom processing with GraphStage +================================= + +The :class:`GraphStage` abstraction can be used to create arbitrary graph processing stages with any number of input +or output ports. It is a counterpart of the ``FlowGraph.create()`` method which creates new stream processing +stages by composing others. Where :class:`GraphStage` differs is that it creates a stage that is itself not divisible into +smaller ones, and allows state to be maintained inside it in a safe way. + +As a first motivating example, we will build a new :class:`Source` that will simply emit numbers from 1 until it is +cancelled. To start, we need to define the "interface" of our stage, which is called *shape* in Akka Streams terminology +(this is explained in more detail in the section :ref:`composition-java`). + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#simple-source + +As you see, in itself the :class:`GraphStage` only defines the ports of this stage and a shape that contains the ports. +It also has a user implemented method called ``createLogic``. If you recall, stages are reusable in multiple +materializations, each resulting in a different executing entity. In the case of :class:`GraphStage` the actual running +logic is modeled as an instance of a :class:`GraphStageLogic` which will be created by the materializer by calling +the ``createLogic`` method. + +In order to emit from a :class:`Source` in a backpressured stream one needs first to have demand from downstream. +To receive the necessary events one needs to register a subclass of :class:`AbstractOutHandler` with the output port +(:class:`Outlet`). This handler will receive events related to the lifecycle of the port. In our case we need to +override ``onPull()`` which indicates that we are free to emit a single element. There is another callback, +``onDownstreamFinish()`` which is called if the downstream cancelled. Since the default behavior of that callback is +to stop the stage, we don't need to override it. In the ``onPull`` callback we simply emit the next number. + +Instances of the above :class:`GraphStage` are subclasses of ``Graph,Unit>`` which means +that they are already usable in many situations, but do not provide the DSL methods we usually have for other +:class:`Source` s. In order to convert this :class:`Graph` to a proper :class:`Source` we need to wrap it using +``Source.fromGraph`` (see :ref:`composition-java` for more details about graphs and DSLs). Now we can use the +source as any other built-in one: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#simple-source-usage + +Port states, AbstractInHandler and AbstractOutHandler +----------------------------------------------------- + +In order to interact with a port (:class:`Inlet` or :class:`Outlet`) of the stage we need to be able to receive events +and generate new events belonging to the port. From the :class:`GraphStageLogic` the following operations are available +on an output port: + +* ``push(out,elem)`` pushes an element to the output port. Only possible after the port has been pulled by downstream. +* ``complete(out)`` closes the output port normally. +* ``fail(out,exception)`` closes the port with a failure signal. + + +The events corresponding to an *output* port can be received in an :class:`AbstractOutHandler` instance registered to the +output port using ``setHandler(out,handler)``. This handler has two callbacks: + +* ``onPull()`` is called when the output port is ready to emit the next element, ``push(out, elem)`` is now allowed + to be called on this port. +* ``onDownstreamFinish()`` is called once the downstream has cancelled and no longer allows messages to be pushed to it. + No more ``onPull()`` will arrive after this event. If not overridden this will default to stopping the stage. + +Also, there are two query methods available for output ports: + +* ``isAvailable(out)`` returns true if the port can be pushed. +* ``isClosed(out)`` returns true if the port is closed. At this point the port can not be pushed and will not be pulled anymore. + +The relationship of the above operations, events and queries are summarized in the state machine below. Green shows +the initial state while orange indicates the end state. If an operation is not listed for a state, then it is invalid +to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen +in that state. + +| + +.. image:: ../images/outport_transitions.png +:align: center + +| + +The following operations are available for *input* ports: + +* ``pull(in)`` requests a new element from an input port. This is only possible after the port has been pushed by upstream. +* ``grab(in)`` acquires the element that has been received during an ``onPush()``. It cannot be called again until the + port is pushed again by the upstream. +* ``cancel(in)`` closes the input port. + +The events corresponding to an *input* port can be received in an :class:`AbstractInHandler` instance registered to the +input port using ``setHandler(in, handler)``. This handler has three callbacks: + +* ``onPush()`` is called when the output port has now a new element. Now it is possible to aquire this element using + ``grab()`` and/or call ``pull(in)`` on the port to request the next element. It is not mandatory to grab the + element, but if it is pulled while the element has not been grabbed it will drop the buffered element. +* ``onUpstreamFinish()`` is called once the upstream has completed and no longer can be pulled for new elements. + No more ``onPush()`` will arrive after this event. If not overridden this will default to stopping the stage. +* ``onUpstreamFailure()`` is called if the upstream failed with an exception and no longer can be pulled for new elements. + No more ``onPush()`` will arrive after this event. If not overridden this will default to failing the stage. + +Also, there are three query methods available for input ports: + +* ``isAvailable(out)`` returns true if a data element can be grabbed from the port +* ``hasBeenPulled(out)`` returns true if the port has been already pulled. Calling ``pull(in)`` in this state is illegal. +* ``isClosed(in)`` returns true if the port is closed. At this point the port can not be pulled and will not be pushed anymore. + +The relationship of the above operations, events and queries are summarized in the state machine below. Green shows +the initial state while orange indicates the end state. If an operation is not listed for a state, then it is invalid +to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen +in that state. + +| + +.. image:: ../images/inport_transitions.png +:align: center + +| + +Finally, there are two methods available for convenience to complete the stage and all of its ports: + +* ``completeStage()`` is equivalent to closing all output ports and cancelling all input ports. +* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports. + + +Completion +---------- + +**This section is a stub and will be extended in the next release** + +Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally. +It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in +the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()`` +or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used +with care. + +Using timers +------------ + +**This section is a stub and will be extended in the next release** + +It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for +the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or +``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example +a :class:`String`). The ``onTimer(key)`` method needs to be overridden and it will be called once the timer of ``key`` +fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the status of a timer with +``isTimerActive(key)``. Timers will be automatically cleaned up when the stage completes. + +Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the +``preStart()`` lifecycle hook. + +Using asynchronous side-channels +-------------------------------- + +**This section is a stub and will be extended in the next release** + +In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future +or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the +stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous +event fires. It is important to **not call the callback directly**, instead, the external API must call the +``invoke(event)`` method on the returned :class:`AsyncCallback`. The execution engine will take care of calling the +provided callback in a thread-safe way. The callback can safely access the state of the :class:`GraphStageLogic` +implementation. + +Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the +``preStart()`` lifecycle hook instead. + +Integration with actors +----------------------- + +**This section is a stub and will be extended in the next release** +**This is an experimental feature*** + +It is possible to acquire an ActorRef that can be addressed from the outside of the stage, similarly how +:class:`AsyncCallback` allows injecting asynchronous events into a stage logic. This reference can be obtained +by calling ``getStageActorRef(receive)`` passing in a function that takes a :class:`Pair` of the sender +:class:`ActorRef` and the received message. This reference can be used to watch other actors by calling its ``watch(ref)`` +or ``unwatch(ref)`` methods. The reference can be also watched by external actors. The current limitations of this +:class:`ActorRef` are: + + - they are not location transparent, they cannot be accessed via remoting. + - they cannot be returned as materialized values. + - they cannot be accessed from the constructor of the :class:`GraphStageLogic`, but they can be accessed from the + ``preStart()`` method. + +Custom materialized values +-------------------------- + +**This section is a stub and will be extended in the next release** + +Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue` +instead of the simpler :class:`GraphStage`. The difference is that in this case the method +``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the +stage logic the materialized value must be provided + +.. warning:: + There is no built-in synchronization of accessing this value from both of the thread where the logic runs and + the thread that got hold of the materialized value. It is the responsibility of the programmer to add the + necessary (non-blocking) synchronization and visibility guarantees to this shared object. + +Using attributes to affect the behavior of a stage +-------------------------------------------------- + +**This section is a stub and will be extended in the next release** + +Stages can access the :class:`Attributes` object created by the materializer. This contains all the applied (inherited) +attributes applying to the stage, ordered from least specific (outermost) towards the most specific (innermost) +attribute. It is the responsibility of the stage to decide how to reconcile this inheritance chain to a final effective +decision. + +See :ref:`composition-java` for an explanation on how attributes work. + + Custom linear processing stages =============================== @@ -188,19 +392,6 @@ only calls ``ctx.pull()`` and allow the environment do process elements faster t extending ``PushStage`` the environment can be sure that ``onPull()`` was not overridden since it is ``final`` on ``PushStage``. - -Using StatefulStage -------------------- - -On top of ``PushPullStage`` which is the most elementary and low-level abstraction and ``PushStage`` that is a -convenience class that also informs the environment about possible optimizations ``StatefulStage`` is a new tool that -builds on ``PushPullStage`` directly, adding various convenience methods on top of it. It is possible to explicitly -maintain state-machine like states using its ``become()`` method to encapsulates states explicitly. There is also -a handy ``emit()`` method that simplifies emitting multiple values given as an iterator. To demonstrate this feature -we reimplemented ``Duplicator`` in terms of a ``StatefulStage``: - -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowStagesDocTest.java#doubler-stateful - Using DetachedStage ------------------- @@ -254,13 +445,6 @@ The following code example demonstrates the buffer class corresponding to the me the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination logic already shown for :class:`PushPullStage`. -Custom graph processing junctions -================================= - -To extend available fan-in and fan-out structures (graph stages) Akka Streams include :class:`GraphStage`. This is an -advanced usage DSL that should only be needed in rare and special cases, documentation will be forthcoming in one of the -next releases. - Thread safety of custom processing stages ========================================= diff --git a/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala new file mode 100644 index 0000000000..06abf7e2ec --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/GraphStageDocSpec.scala @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.stream + +import akka.stream.javadsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.stage.{ OutHandler, GraphStage, GraphStageLogic } +import akka.stream._ + +import akka.stream.testkit.AkkaSpec + +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +class GraphStageDocSpec extends AkkaSpec { + + implicit val mat = ActorMaterializer() + + "Demonstrate creation of GraphStage boilerplate" in { + //#boilerplate-example + import akka.stream.SourceShape + import akka.stream.stage.GraphStage + + class NumbersSource extends GraphStage[SourceShape[Int]] { + // Define the (sole) output port of this stage + val out: Outlet[Int] = Outlet("NumbersSource") + // Define the shape of this stage, which is SourceShape with the port we defined above + override val shape: SourceShape[Int] = SourceShape(out) + + // This is where the actual (possibly stateful) logic will live + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? + } + //#boilerplate-example + + } + + "Demonstrate creation of GraphStage Source" in { + //#custom-source-example + import akka.stream.SourceShape + import akka.stream.Graph + import akka.stream.stage.GraphStage + import akka.stream.stage.OutHandler + + class NumbersSource extends GraphStage[SourceShape[Int]] { + val out: Outlet[Int] = Outlet("NumbersSource") + override val shape: SourceShape[Int] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + // All state MUST be inside the GraphStageLogic, + // never inside the enclosing GraphStage. + // This state is safe to access and modify from all the + // callbacks that are provided by GraphStageLogic and the + // registered handlers. + private var counter = 1 + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + push(out, counter) + counter += 1 + } + }) + } + } + //#custom-source-example + + //#simple-source-usage + // A GraphStage is a proper Graph, just like what FlowGraph.create would return + val sourceGraph: Graph[SourceShape[Int], Unit] = new NumbersSource + + // Create a Source from the Graph to access the DSL + val mySource: Source[Int, Unit] = Source.fromGraph(new NumbersSource) + + // Returns 55 + val result1: Future[Int] = mySource.take(10).runFold(0)(_ + _) + + // The source is reusable. This returns 5050 + val result2: Future[Int] = mySource.take(100).runFold(0)(_ + _) + //#simple-source-usage + + Await.result(result1, 3.seconds) should ===(55) + Await.result(result2, 3.seconds) should ===(5050) + } + +} diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index 40ac6c43e5..acde374a9b 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -325,6 +325,20 @@ Update procedure See the example in the AsyncStage migration section for an example of this procedure. +StatefulStage has been replaced by GraphStage +============================================= + +The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which +caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used +instead. + +Update procedure +---------------- + +There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation +(:ref:`graphstage-java`). + + AsyncStage has been replaced by GraphStage ========================================== diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index 7d3133dad6..1b175755d0 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -9,8 +9,216 @@ is sometimes necessary to define new transformation stages either because some f stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph junctions of various kinds. -Custom linear processing stages -=============================== +.. _graphstage-scala: + +Custom processing with GraphStage +================================= + +The :class:`GraphStage` abstraction can be used to create arbitrary graph processing stages with any number of input +or output ports. It is a counterpart of the ``FlowGraph.create()`` method which creates new stream processing +stages by composing others. Where :class:`GraphStage` differs is that it creates a stage that is itself not divisible into +smaller ones, and allows state to be maintained inside it in a safe way. + +As a first motivating example, we will build a new :class:`Source` that will simply emit numbers from 1 until it is +cancelled. To start, we need to define the "interface" of our stage, which is called *shape* in Akka Streams terminology +(this is explained in more detail in the section :ref:`composition-scala`). This is how this looks like: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#boilerplate-example + +As you see, in itself the :class:`GraphStage` only defines the ports of this stage and a shape that contains the ports. +It also has, a currently unimplemented method called ``createLogic``. If you recall, stages are reusable in multiple +materializations, each resulting in a different executing entity. In the case of :class:`GraphStage` the actual running +logic is modeled as an instance of a :class:`GraphStageLogic` which will be created by the materializer by calling +the ``createLogic`` method. In other words, all we need to do is to create a suitable logic that will emit the +numbers we want. + +In order to emit from a :class:`Source` in a backpressured stream one needs first to have demand from downstream. +To receive the necessary events one needs to register a subclass of :class:`OutHandler` with the output port +(:class:`Outlet`). This handler will receive events related to the lifecycle of the port. In our case we need to +override ``onPull()`` which indicates that we are free to emit a single element. There is another callback, +``onDownstreamFinish()`` which is called if the downstream cancelled. Since the default behavior of that callback is +to stop the stage, we don't need to override it. In the ``onPull`` callback we will simply emit the next number. This +is how it looks like in the end: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#custom-source-example + +Instances of the above :class:`GraphStage` are subclasses of ``Graph[SourceShape[Int],Unit]`` which means +that they are already usable in many situations, but do not provide the DSL methods we usually have for other +:class:`Source` s. In order to convert this :class:`Graph` to a proper :class:`Source` we need to wrap it using +``Source.fromGraph`` (see :ref:`composition-scala` for more details about graphs and DSLs). Now we can use the +source as any other built-in one: + +.. includecode:: code/docs/stream/GraphStageDocSpec.scala#simple-source-usage + +Port states, InHandler and OutHandler +------------------------------------- + +In order to interact with a port (:class:`Inlet` or :class:`Outlet`) of the stage we need to be able to receive events +and generate new events belonging to the port. From the :class:`GraphStageLogic` the following operations are available +on an output port: + +* ``push(out,elem)`` pushes an element to the output port. Only possible after the port has been pulled by downstream. +* ``complete(out)`` closes the output port normally. +* ``fail(out,exception)`` closes the port with a failure signal. + + +The events corresponding to an *output* port can be received in an :class:`OutHandler` instance registered to the +output port using ``setHandler(out,handler)``. This handler has two callbacks: + +* ``onPull()`` is called when the output port is ready to emit the next element, ``push(out, elem)`` is now allowed + to be called on this port. +* ``onDownstreamFinish()`` is called once the downstream has cancelled and no longer allows messages to be pushed to it. + No more ``onPull()`` will arrive after this event. If not overridden this will default to stopping the stage. + +Also, there are two query methods available for output ports: + +* ``isAvailable(out)`` returns true if a data element can be grabbed from the port +* ``isClosed(out)`` returns true if the port is closed. At this point the port can not be pushed and will not be pulled anymore. + +The relationship of the above operations, events and queries are summarized in the state machine below. Green shows +the initial state while orange indicates the end state. If an operation is not listed for a state, then it is invalid +to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen +in that state. + +| + +.. image:: ../images/outport_transitions.png + :align: center + +| + +The following operations are available for *input* ports: + +* ``pull(in)`` requests a new element from an input port. This is only possible after the port has been pushed by upstream. +* ``grab(in)`` acquires the element that has been received during an ``onPush()``. It cannot be called again until the + port is pushed again by the upstream. +* ``cancel(in)`` closes the input port. + +The events corresponding to an *input* port can be received in an :class:`InHandler` instance registered to the +input port using ``setHandler(in, handler)``. This handler has three callbacks: + +* ``onPush()`` is called when the output port has now a new element. Now it is possible to aquire this element using + ``grab()`` and/or call ``pull(in)`` on the port to request the next element. It is not mandatory to grab the + element, but if it is pulled while the element has not been grabbed it will drop the buffered element. +* ``onUpstreamFinish()`` is called once the upstream has completed and no longer can be pulled for new elements. + No more ``onPush()`` will arrive after this event. If not overridden this will default to stopping the stage. +* ``onUpstreamFailure()`` is called if the upstream failed with an exception and no longer can be pulled for new elements. + No more ``onPush()`` will arrive after this event. If not overridden this will default to failing the stage. + +Also, there are three query methods available for input ports: + +* ``isAvailable(out)`` returns true if the port can be grabbed. +* ``hasBeenPulled(out)`` returns true if the port has been already pulled. Calling ``pull(in)`` in this state is illegal. +* ``isClosed(in)`` returns true if the port is closed. At this point the port can not be pulled and will not be pushed anymore. + +The relationship of the above operations, events and queries are summarized in the state machine below. Green shows +the initial state while orange indicates the end state. If an operation is not listed for a state, then it is invalid +to call it while the port is in that state. If an event is not listed for a state, then that event cannot happen +in that state. + +| + +.. image:: ../images/inport_transitions.png + :align: center + +| + +Finally, there are two methods available for convenience to complete the stage and all of its ports: + +* ``completeStage()`` is equivalent to closing all output ports and cancelling all input ports. +* ``failStage(exception)`` is equivalent to failing all output ports and cancelling all input ports. + + +Completion +---------- + +**This section is a stub and will be extended in the next release** + +Stages by default automatically stop once all of their ports (input and output) have been closed externally or internally. +It is possible to opt out from this behavior by overriding ``keepGoingAfterAllPortsClosed`` and returning true in +the :class:`GraphStageLogic` implementation. In this case the stage **must** be explicitly closed by calling ``completeStage()`` +or ``failStage(exception)``. This feature carries the risk of leaking streams and actors, therefore it should be used +with care. + +Using timers +------------ + +**This section is a stub and will be extended in the next release** + +It is possible to use timers in :class:`GraphStages` by using :class:`TimerGraphStageLogic` as the base class for +the returned logic. Timers can be scheduled by calling one of ``scheduleOnce(key,delay)``, ``schedulePeriodically(key,period)`` or +``schedulePeriodicallyWithInitialDelay(key,delay,period)`` and passing an object as a key for that timer (can be any object, for example +a :class:`String`). The ``onTimer(key)`` method needs to be overridden and it will be called once the timer of ``key`` +fires. It is possible to cancel a timer using ``cancelTimer(key)`` and check the status of a timer with +``isTimerActive(key)``. Timers will be automatically cleaned up when the stage completes. + +Timers can not be scheduled from the constructor of the logic, but it is possible to schedule them from the +``preStart()`` lifecycle hook. + +Using asynchronous side-channels +-------------------------------- + +**This section is a stub and will be extended in the next release** + +In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future +or a callback from a 3rd party API) one must acquire a :class:`AsyncCallback` by calling ``getAsyncCallback()`` from the +stage logic. The method ``getAsyncCallback`` takes as a parameter a callback that will be called once the asynchronous +event fires. It is important to **not call the callback directly**, instead, the external API must call the +``invoke(event)`` method on the returned :class:`AsyncCallback`. The execution engine will take care of calling the +provided callback in a thread-safe way. The callback can safely access the state of the :class:`GraphStageLogic` +implementation. + +Sharing the AsyncCallback from the constructor risks race conditions, therefore it is recommended to use the +``preStart()`` lifecycle hook instead. + +Integration with actors +----------------------- + +**This section is a stub and will be extended in the next release** +**This is an experimental feature*** + +It is possible to acquire an ActorRef that can be addressed from the outside of the stage, similarly how +:class:`AsyncCallback` allows injecting asynchronous events into a stage logic. This reference can be obtained +by calling ``getStageActorRef(receive)`` passing in a function that takes a :class:`Pair` of the sender +:class:`ActorRef` and the received message. This reference can be used to watch other actors by calling its ``watch(ref)`` +or ``unwatch(ref)`` methods. The reference can be also watched by external actors. The current limitations of this +:class:`ActorRef` are: + + - they are not location transparent, they cannot be accessed via remoting. + - they cannot be returned as materialized values. + - they cannot be accessed from the constructor of the :class:`GraphStageLogic`, but they can be accessed from the + ``preStart()`` method. + +Custom materialized values +-------------------------- + +**This section is a stub and will be extended in the next release** + +Custom stages can return materialized values instead of ``Unit`` by inheriting from :class:`GraphStageWithMaterializedValue` +instead of the simpler :class:`GraphStage`. The difference is that in this case the method +``createLogicAndMaterializedValue(inheritedAttributes)`` needs to be overridden, overridden, and in addition to the +stage logic the materialized value must be provided + +.. warning:: + There is no built-in synchronization of accessing this value from both of the thread where the logic runs and + the thread that got hold of the materialized value. It is the responsibility of the programmer to add the + necessary (non-blocking) synchronization and visibility guarantees to this shared object. + +Using attributes to affect the behavior of a stage +-------------------------------------------------- + +**This section is a stub and will be extended in the next release** + +Stages can access the :class:`Attributes` object created by the materializer. This contains all the applied (inherited) +attributes applying to the stage, ordered from least specific (outermost) towards the most specific (innermost) +attribute. It is the responsibility of the stage to decide how to reconcile this inheritance chain to a final effective +decision. + +See :ref:`composition-scala` for an explanation on how attributes work. + + +Custom linear processing stages with PushPullStage +================================================== To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this @@ -190,18 +398,6 @@ extending ``PushStage`` the environment can be sure that ``onPull()`` was not ov ``PushStage``. -Using StatefulStage -------------------- - -On top of ``PushPullStage`` which is the most elementary and low-level abstraction and ``PushStage`` that is a -convenience class that also informs the environment about possible optimizations ``StatefulStage`` is a new tool that -builds on ``PushPullStage`` directly, adding various convenience methods on top of it. It is possible to explicitly -maintain state-machine like states using its ``become()`` method to encapsulates states explicitly. There is also -a handy ``emit()`` method that simplifies emitting multiple values given as an iterator. To demonstrate this feature -we reimplemented ``Duplicator`` in terms of a ``StatefulStage``: - -.. includecode:: code/docs/stream/FlowStagesSpec.scala#doubler-stateful - Using DetachedStage ------------------- @@ -255,12 +451,6 @@ The following code example demonstrates the buffer class corresponding to the me the downstream is held, so the framework invokes onPull() to avoid this situation. This is similar to the termination logic already shown for :class:`PushPullStage`. -Custom graph processing junctions -================================= - -To extend available fan-in and fan-out structures (graph stages) Akka Streams include :class:`GraphStage`. This is an -advanced usage DSL that should only be needed in rare and special cases, documentation will be forthcoming in one of the -next releases. Thread safety of custom processing stages ========================================= diff --git a/akka-docs-dev/rst/scala/stream-testkit.rst b/akka-docs-dev/rst/scala/stream-testkit.rst index 0f642455b6..c249929642 100644 --- a/akka-docs-dev/rst/scala/stream-testkit.rst +++ b/akka-docs-dev/rst/scala/stream-testkit.rst @@ -68,3 +68,22 @@ You can also inject exceptions and test sink behaviour on error conditions. Test source and sink can be used together in combination when testing flows. .. includecode:: code/docs/stream/StreamTestKitDocSpec.scala#test-source-and-sink + + +Fuzzing Mode +============ + +For testing, it is possible to enable a special stream execution mode that exercises concurrent execution paths +more aggressively (at the cost of reduced performance) and therefore helps exposing race conditions in tests. To +enable this setting add the following line to your configuration: + +:: + + akka.stream.materializer.debug.fuzzing-mode = on + + +.. warning:: + + Never use this setting in production or benchmarks. This is a testing tool to provide more coverage of your code + during tests, but it reduces the throughput of streams. A warning message will be logged if you have this setting + enabled. \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 5a3cc187c7..c2c7a49da5 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -411,6 +411,7 @@ private[akka] object StatefulStage { * * Use [[#terminationEmit]] to push final elements from [[#onUpstreamFinish]] or [[#onUpstreamFailure]]. */ +@deprecated("StatefulStage is deprecated, please use GraphStage instead.", "2.0-M2") abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] { import StatefulStage._