diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index 4c3dcfac48..b36f8718a6 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -159,7 +159,7 @@ Using FlexiMerge ---------------- :class:`FlexiMerge` can be used to describe a fan-in element which contains some logic about which upstream stage the merge should consume elements. It is recommended to create your custom fan-in stage as a separate class, name it -appropriately to the behavior it is exposing and reuse it this way – similarily as you would use built-in fan-in stages. +appropriately to the behavior it is exposing and reuse it this way – similarly as you would use built-in fan-in stages. The first flexi merge example we are going to implement is a so-called "preferring merge", in which one of the input ports is *preferred*, e.g. if the merge could pull from the preferred or another secondary input port, @@ -167,9 +167,9 @@ it will pull from the preferred port, only pulling from the secondary ports once available. Implementing a custom merge stage is done by extending the :class:`FlexiMerge` trait, exposing its input ports and finally -defining the logic which will decide how this merge should behave. First we need to create the input ports which are used +defining the logic which will decide how this merge should behave. First we need to create the ports which are used to wire up the fan-in element in a :class:`FlowGraph`. These input ports *must* be properly typed and their names should -indicate what kind of port it is: +indicate what kind of port it is. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlexiMergeDocTest.java#flexi-preferring-merge @@ -177,8 +177,7 @@ Next we implement the ``createMergeLogic`` method, which will be used as factory A new :class:`MergeLogic` object will be created for each materialized stream, so it is allowed to be stateful. The :class:`MergeLogic` defines the behaviour of our merge stage, and may be *stateful* (for example to buffer some elements -internally). The first method we must implement in a merge logic is ``inputHandles`` in which we have the opportunity to -validate the number of connected input ports, e.g. in our preferring merge we only require that at least one input is connected. +internally). .. warning:: While a :class:`MergeLogic` instance *may* be stateful, the :class:`FlexiMerge` instance @@ -269,10 +268,8 @@ The first flexi route stage that we are going to implement is ``Unzip``, which c it into two streams of the first and second elements of each pair. A :class:`FlexiRoute` has exactly-one input port (in our example, type parameterized as ``Pair``), and may have multiple -output ports, all of which must be created before hand (they can not be added dynamically), however not all output ports -must be connected. You can validate the number of connected output ports in the ``RouteLogic#outputHandles`` method, -which receives the number of connected output ports for a given instance of the flexi route in a given materialization. -The :class:`List` returned from ``outputHandles`` must include all output ports which are to be used by this junction: +output ports, all of which must be created beforehand (they can not be added dynamically). First we need to create the +ports which are used to wire up the fan-in element in a :class:`FlowGraph`. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlexiRouteDocTest.java#flexiroute-unzip diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala index 35355d5b91..230fce172b 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala @@ -178,7 +178,7 @@ class FlexiDocSpec extends AkkaSpec { "flexi preferring merge" in { import FanInShape._ - //#flexi-preferring-merge + //#flexi-preferring-merge-ports class PreferringMergeShape[A](_init: Init[A] = Name("PreferringMerge")) extends FanInShape[A](_init) { val preferred = newInlet[A]("preferred") @@ -186,6 +186,10 @@ class FlexiDocSpec extends AkkaSpec { val secondary2 = newInlet[A]("secondary2") protected override def construct(i: Init[A]) = new PreferringMergeShape(i) } + //#flexi-preferring-merge-ports + + //#flexi-preferring-merge + class PreferringMerge extends FlexiMerge[Int, PreferringMergeShape[Int]]( new PreferringMergeShape, OperationAttributes.name("ImportantWithBackups")) { import akka.stream.scaladsl.FlexiMerge._ diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index e72ff1d787..a014734a38 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -159,7 +159,7 @@ Using FlexiMerge ---------------- :class:`FlexiMerge` can be used to describe a fan-in element which contains some logic about which upstream stage the merge should consume elements. It is recommended to create your custom fan-in stage as a separate class, name it -appropriately to the behavior it is exposing and reuse it this way – similarily as you would use built-in fan-in stages. +appropriately to the behavior it is exposing and reuse it this way – similarly as you would use built-in fan-in stages. The first flexi merge example we are going to implement is a so-called "preferring merge", in which one of the input ports is *preferred*, e.g. if the merge could pull from the preferred or another secondary input port, @@ -167,18 +167,19 @@ it will pull from the preferred port, only pulling from the secondary ports once available. Implementing a custom merge stage is done by extending the :class:`FlexiMerge` trait, exposing its input ports and finally -defining the logic which will decide how this merge should behave. First we need to create the input ports which are used +defining the logic which will decide how this merge should behave. First we need to create the ports which are used to wire up the fan-in element in a :class:`FlowGraph`. These input ports *must* be properly typed and their names should -indicate what kind of port it is: +indicate what kind of port it is. -.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexi-preferring-merge +.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexi-preferring-merge-ports Next we implement the ``createMergeLogic`` method, which will be used as factory of merges :class:`MergeLogic`. A new :class:`MergeLogic` object will be created for each materialized stream, so it is allowed to be stateful. +.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexi-preferring-merge + The :class:`MergeLogic` defines the behaviour of our merge stage, and may be *stateful* (for example to buffer some elements -internally). The first method we must implement in a merge logic is ``inputHandles`` in which we have the opportunity to -validate the number of connected input ports, e.g. in our preferring merge we only require that at least one input is connected. +internally). .. warning:: While a :class:`MergeLogic` instance *may* be stateful, the :class:`FlexiMerge` instance @@ -269,10 +270,8 @@ The first flexi route stage that we are going to implement is ``Unzip``, which c it into two streams of the first and second elements of each tuple. A :class:`FlexiRoute` has exactly-one input port (in our example, type parameterized as ``(A,B)``), and may have multiple -output ports, all of which must be created before hand (they can not be added dynamically), however not all output ports -must be connected. You can validate the number of connected output ports in the ``RouteLogic#outputHandles`` method, -which receives the number of connected output ports for a given instance of the flexi route in a given materialization. -The :class:`Vector` returned from ``outputHandles`` must include all output ports which are to be used by this junction: +output ports, all of which must be created beforehand (they can not be added dynamically). First we need to create the +ports which are used to wire up the fan-in element in a :class:`FlowGraph`. .. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-unzip @@ -302,7 +301,7 @@ we use the special :class:`SameState` object which signals :class:`FlexiRoute` t Completion handling ^^^^^^^^^^^^^^^^^^^ -Completion handling in :class:`FlexiRoute` is handled similarily to :class:`FlexiMerge` (which is explained in depth in +Completion handling in :class:`FlexiRoute` is handled similarly to :class:`FlexiMerge` (which is explained in depth in :ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *failure* it can also react to its downstream stages *cancelling* their subscriptions. The default completion handling for :class:`FlexiRoute` (defined in ``RouteLogic#defaultCompletionHandling``) is to continue running until all of its