Merge pull request #16628 from ktoso/docs-fleximerge-ktoso

+doc FlexiMerge / FlexiRoute docs
This commit is contained in:
Konrad Malawski 2015-01-15 17:43:59 +01:00
commit 865f713a51
4 changed files with 521 additions and 4 deletions

View file

@ -0,0 +1,355 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import scala.collection.immutable.IndexedSeq
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class FlexiDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
implicit val mat = FlowMaterializer()
"implement zip using readall" in {
//#fleximerge-zip-readall
class Zip[A, B] extends FlexiMerge[(A, B)] {
import FlexiMerge._
val left = createInputPort[A]()
val right = createInputPort[B]()
def createMergeLogic = new MergeLogic[(A, B)] {
override def inputHandles(inputCount: Int) = {
require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount")
Vector(left, right)
}
override def initialState: State[_] =
State[ReadAllInputs](ReadAll(left, right)) { (ctx, _, inputs) =>
val a: A = inputs(left)
val b: B = inputs(right)
ctx.emit((a, b))
SameState
}
}
}
//#fleximerge-zip-readall
//format: OFF
//#fleximerge-zip-connecting
val head = Sink.head[(Int, String)]
//#fleximerge-zip-connecting
val map =
//#fleximerge-zip-connecting
FlowGraph { implicit b =>
import FlowGraphImplicits._
val zip = Zip[Int, String]
Source.single(1) ~> zip.left
Source.single("1") ~> zip.right
zip.out ~> head
}
//#fleximerge-zip-connecting
.run()
//format: ON
Await.result(map.get(head), 300.millis) should equal((1, "1"))
}
"implement zip using two states" in {
//#fleximerge-zip-states
class Zip[A, B] extends FlexiMerge[(A, B)] {
import FlexiMerge._
val left = createInputPort[A]()
val right = createInputPort[B]()
def createMergeLogic = new MergeLogic[(A, B)] {
var lastInA: A = _
override def inputHandles(inputCount: Int) = {
require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount")
Vector(left, right)
}
val readA: State[A] = State[A](Read(left)) { (ctx, input, element) =>
lastInA = element
readB
}
val readB: State[B] = State[B](Read(right)) { (ctx, input, element) =>
ctx.emit((lastInA, element))
readA
}
override def initialState: State[_] = readA
}
}
//#fleximerge-zip-states
val head = Sink.head[(Int, String)]
val map = FlowGraph { implicit b =>
import akka.stream.scaladsl.FlowGraphImplicits._
val zip = new Zip[Int, String]
Source(1 to 2) ~> zip.left
Source((1 to 2).map(_.toString)) ~> zip.right
zip.out ~> head
}.run()
Await.result(map.get(head), 300.millis) should equal((1, "1"))
}
"fleximerge completion handling" in {
//#fleximerge-completion
class ImportantWithBackups[A] extends FlexiMerge[A] {
import FlexiMerge._
val important = createInputPort[A]()
val replica1 = createInputPort[A]()
val replica2 = createInputPort[A]()
def createMergeLogic = new MergeLogic[A] {
val inputs = Vector(important, replica1, replica2)
override def inputHandles(inputCount: Int) = {
require(inputCount == 3, s"Must connect 3 inputs, connected only $inputCount")
inputs
}
override def initialCompletionHandling =
CompletionHandling(
onComplete = (ctx, input) => input match {
case `important` =>
log.info("Important input completed, shutting down.")
ctx.complete()
SameState
case replica =>
log.info("Replica {} completed, " +
"no more replicas available, " +
"applying eagerClose completion handling.", replica)
ctx.changeCompletionHandling(eagerClose)
SameState
},
onError = (ctx, input, cause) => input match {
case `important` =>
ctx.error(cause)
SameState
case replica =>
log.error(cause, "Replica {} failed, " +
"no more replicas available, " +
"applying eagerClose completion handling.", replica)
ctx.changeCompletionHandling(eagerClose)
SameState
})
override def initialState = State[A](ReadAny(inputs)) {
(ctx, input, element) =>
ctx.emit(element)
SameState
}
}
}
//#fleximerge-completion
FlowGraph { implicit b =>
import FlowGraphImplicits._
val importantWithBackups = new ImportantWithBackups[Int]
Source.single(1) ~> importantWithBackups.important
Source.single(2) ~> importantWithBackups.replica1
Source.failed[Int](new Exception("Boom!") with NoStackTrace) ~> importantWithBackups.replica2
importantWithBackups.out ~> Sink.ignore
}.run()
}
"flexi preferring merge" in {
//#flexi-preferring-merge
class PreferringMerge extends FlexiMerge[Int] {
import akka.stream.scaladsl.FlexiMerge._
val preferred = createInputPort[Int]()
val secondary1 = createInputPort[Int]()
val secondary2 = createInputPort[Int]()
def createMergeLogic = new MergeLogic[Int] {
override def inputHandles(inputCount: Int) = {
require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount")
Vector(preferred, secondary1, secondary2)
}
override def initialState =
State[Int](ReadPreferred(preferred)(secondary1, secondary2)) {
(ctx, input, element) =>
ctx.emit(element)
SameState
}
}
}
//#flexi-preferring-merge
}
"flexi read conditions" in {
class X extends FlexiMerge[Int] {
import FlexiMerge._
override def createMergeLogic(): MergeLogic[Int] = new MergeLogic[Int] {
//#read-conditions
val first = createInputPort[Int]()
val second = createInputPort[Int]()
val third = createInputPort[Int]()
//#read-conditions
//#read-conditions
val onlyFirst = Read(first)
val firstOrThird = ReadAny(first, third)
val firstAndSecond = ReadAll(first, second)
val firstAndThird = ReadAll(first, third)
val mostlyFirst = ReadPreferred(first)(second, third)
//#read-conditions
override def inputHandles(inputCount: Int): IndexedSeq[InputHandle] = Vector()
override def initialState: State[_] = State[ReadAllInputs](firstAndSecond) {
(ctx, input, inputs) =>
val in1: Int = inputs(first)
SameState
}
}
}
}
"flexi route" in {
//#flexiroute-unzip
class Unzip[A, B] extends FlexiRoute[(A, B)] {
import FlexiRoute._
val outA = createOutputPort[A]()
val outB = createOutputPort[B]()
override def createRouteLogic() = new RouteLogic[(A, B)] {
override def outputHandles(outputCount: Int) = {
require(outputCount == 2, s"Unzip must have two connected outputs, was $outputCount")
Vector(outA, outB)
}
override def initialState = State[Any](DemandFromAll(outA, outB)) {
(ctx, _, element) =>
val (a, b) = element
ctx.emit(outA, a)
ctx.emit(outB, b)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
//#flexiroute-unzip
}
"flexi route completion handling" in {
//#flexiroute-completion
class ImportantRoute[A] extends FlexiRoute[A] {
import FlexiRoute._
val important = createOutputPort[A]()
val additional1 = createOutputPort[A]()
val additional2 = createOutputPort[A]()
override def createRouteLogic() = new RouteLogic[A] {
val outputs = Vector(important, additional1, additional2)
override def outputHandles(outputCount: Int) = {
require(outputCount == 3, s"Must have three connected outputs, was $outputCount")
outputs
}
override def initialCompletionHandling =
CompletionHandling(
// upstream:
onComplete = (ctx) => (),
onError = (ctx, thr) => (),
// downstream:
onCancel = (ctx, output) => output match {
case `important` =>
// complete all downstreams, and cancel the upstream
ctx.complete()
SameState
case _ =>
SameState
})
override def initialState = State[A](DemandFromAny(outputs)) {
(ctx, output, element) =>
ctx.emit(output, element)
SameState
}
}
}
//#flexiroute-completion
FlowGraph { implicit b =>
import FlowGraphImplicits._
val route = new ImportantRoute[Int]
Source.single(1) ~> route.in
route.important ~> Sink.ignore
route.additional1 ~> Sink.ignore
route.additional2 ~> Sink.ignore
}.run()
}
"flexi route completion handling emitting element upstream completion" in {
class ElementsAndStatus[A] extends FlexiRoute[A] {
import FlexiRoute._
val out = createOutputPort[A]()
override def createRouteLogic() = new RouteLogic[A] {
override def outputHandles(outputCount: Int) = Vector(out)
// format: OFF
//#flexiroute-completion-upstream-completed-signalling
var buffer: List[A]
//#flexiroute-completion-upstream-completed-signalling
= List[A]()
// format: ON
//#flexiroute-completion-upstream-completed-signalling
def drainBuffer(ctx: RouteLogicContext[Any]): Unit =
while (ctx.isDemandAvailable(out) && buffer.nonEmpty) {
ctx.emit(out, buffer.head)
buffer = buffer.tail
}
val signalStatusOnTermination = CompletionHandling(
onComplete = ctx => drainBuffer(ctx),
onError = (ctx, cause) => drainBuffer(ctx),
onCancel = (_, _) => SameState)
//#flexiroute-completion-upstream-completed-signalling
override def initialCompletionHandling = signalStatusOnTermination
override def initialState = State[A](DemandFromAny(out)) {
(ctx, output, element) =>
ctx.emit(output, element)
SameState
}
}
}
}
}

View file

@ -149,14 +149,176 @@ Using DetachedStage
Custom graph processing junctions
=================================
To extend available fan-in and fan-out structures (graph stages) Akka Streams include :class:`FlexiMerge` and
:class:`FlexiRoute` which provide an intuitive DSL which allows to describe which upstream or downstream stream
elements should be pulled from or emitted to.
Using FlexiMerge
----------------
:class:`FlexiMerge` can be used to describe a fan-in element which contains some logic about which upstream stage the
merge should consume elements. It is recommended to create your custom fan-in stage as a separate class, name it
apropriately to the behavior it is exposing and reuse it this way similarily as you would use built-in fan-in stages.
*TODO*
The first flexi merge example we are going to implement is a so-called "preferring merge", in which one
of the input ports is *preferred*, e.g. if the merge could pull from the preferred or another secondary input port,
it will pull from the preferred port, only pulling from the secondary ports once the preferred one does not have elements
available.
Implementing a custom merge stage is done by extending the :class:`FlexiMerge` trait, exposing its input ports and finally
defining the logic which will decide how this merge should behave. First we need to create the input ports which are used
to wire up the fan-in element in a :class:`FlowGraph`. These input ports *must* be properly typed and their names should
indicate what kind of port it is:
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexi-preferring-merge
Next we implement the ``createMergeLogic`` method, which will be used as factory of merges :class:`MergeLogic`.
A new :class:`MergeLogic` object will be created for each materialized stream, so it is allowed to be stateful.
The :class:`MergeLogic` defines the behaviour of our merge stage, and may be *stateful* (for example to buffer some elements
internally). The first method we must implement in a merge logic is ``inputHandles`` in which we have the opportunity to
validate the number of connected input ports, e.g. in our preferring merge we only require that at least one input is connected.
.. warning::
While a :class:`MergeLogic` instance *may* be stateful, the :class:`FlexiMerge` instance
*must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances.
Next we implement the ``initialState`` method, which returns the behaviour of the merge stage. A ``MergeLogic#State``
defines the behaviour of the merge by signaling which input ports it is interested in consuming, and how to handle
the element once it has been pulled from its upstream. Signalling which input port we are interested in pulling data
from is done by using an apropriate *read condition*. Available *read conditions* include:
- ``Read(input)`` - reads from only the given input,
- ``ReadAny(inputs)`` reads from any of the given inputs,
- ``ReadPreferred(preferred)(secondaries)`` reads from the preferred input if elements available, otherwise from one of the secondaries,
- ``ReadAll(inputs)`` reads from *all* given inputs (like ``Zip``), and offers an :class:`ReadAllInputs` as the ``element`` passed into the state function, which allows to obtain the pulled element values in a type-safe way.
In our case we use the :class:`ReadPreferred` read condition which has the exact semantics which we need to implement
our preferring merge it pulls elements from the preferred input port if there are any available, otherwise reverting
to pulling from the secondary inputs. The context object passed into the state function allows us to interact with the
connected streams, for example by emitting an ``element``, which was just pulled from the given ``input``, or signalling
completion or errors to the merges downstream stage.
The state function must always return the next behaviour to be used when an element should be pulled from its upstreams,
we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed.
Implementing Zip-like merges
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
More complex fan-in junctions may require not only multiple States but also sharing state between those states.
As :class:`MergeLogic` is allowed to be stateful, it can be easily used to hold the state of the merge junction.
We now implement the equivalent of the built-in ``Zip`` junction by using the property that a the MergeLogic can be stateful
and that each read is followed by a state transition (much like in Akka FSM or ``Actor#become``).
.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-zip-states
The above style of implementing complex flexi merges is useful when we need fine grained control over consuming from certain
input ports. Sometimes however it is simpler to strictly consume all of a given set of inputs. In the ``Zip`` rewrite below
we use the :class:`ReadAll` read condition, which behaves slightly differently than the other read conditions, as the element
it is emitting is of the type :class:`ReadAllInputs` instead of directly handing over the pulled elements:
.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-zip-readall
Thanks to being handed a :class:`ReadAllInputs` instance instead of the elements directly it is possible to pick elements
in a type-safe way based on their input port.
Connecting your custom junction is as simple as creating an instance and connecting Sources and Sinks to its ports
(notice that the merged output port is named ``out``):
.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-zip-connecting
.. _flexi-merge-completion-handling-scala:
Completion handling
^^^^^^^^^^^^^^^^^^^
Completion handling in :class:`FlexiMerge` is defined by an :class:`CompletionHandling` object which can react on
completion and error signals from its upstream input ports. The default strategy is to remain running while at-least-one
upstream input port which are declared to be consumed in the current state is still running (i.e. has not signalled
completion or error).
Customising completion can be done via overriding the ``MergeLogic#initialCompletionHandling`` method, or from within
a :class:`State` by calling ``ctx.changeCompletionHandling(handling)``. Other than the default completion handling (as
late as possible) :class:`FlexiMerge` also provides an ``eagerClose`` completion handling which completes (or errors) its
downstream as soon as at least one of its upstream inputs completes (or errors).
In the example below the we implement an ``ImportantWithBackups`` fan-in stage which can only keep operating while
the ``important`` and at-least-one of the ``replica`` inputs are active. Therefore in our custom completion strategy we
have to investigate which input has completed or errored and act accordingly. If the important input completed or errored
we propagate this downstream completing the stream, on the other hand if the first replicated input fails, we log the
exception and instead of erroring the downstream swallow this exception (as one failed replica is still acceptable).
Then we change the completion strategy to ``eagerClose`` which will propagate any future completion or error event right
to this stages downstream effectively shutting down the stream.
.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-completion
In case you want to change back to the default completion handling, it is available as ``MergeLogic#defaultCompletionHandling``.
Using FlexiRoute
----------------
Similarily to using :class:`FlexiMerge`, implementing custom fan-out stages requires extending the :class:`FlexiRoute` class
and with a :class:`RouteLogic` object which determines how the route should behave.
*TODO*
The first flexi route stage that we are going to implement is ``Unzip``, which consumes a stream of pairs and splits
it into two streams of the first and second elements of each tuple.
A :class:`FlexiRoute` has exactly-one input port (in our example, type parameterized as ``(A,B)``), and may have multiple
output ports, all of which must be created before hand (they can not be added dynamically), however not all output ports
must be connected. You can validate the number of connected output ports in the ``RouteLogic#outputHandles`` method,
which receives the number of connected output ports for a given instance of the flexi route in a given materialization.
The :class:`Vector` returned from ``outputHandles`` must include all output ports which are to be used by this junction:
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-unzip
Next we implement ``RouteLogic#initialState`` by providing a State that uses the :class:`DemandFromAll` *demand condition*
to signal to flexi route that elements can only be emitted from this stage when demand is available from all given downstream
output ports. Other available demand conditions are:
- ``DemandFrom(output)`` - triggers when the given output port has pending demand,
- ``DemandFromAny(outputs)`` - triggers when any of the given output ports has pending demand,
- ``DemandFromAll(outputs)`` - triggers when *all* of the given output ports has pending demand.
Since the ``Unzip`` junction we're implementing signals both downstreams stages at the same time, we use ``DemandFromAll``,
unpack the incoming tuple in the state function and signal its first element to the ``left`` stream, and the second element
of the tuple to the ``right`` stream. Notice that since we are emitting values of different types (``A`` and ``B``),
the type parameter of this ``State[_]`` must be set to ``Any``. This type can be utilised more efficiently when a junction
is emitting the same type of element to its downstreams e.g. in all *strictly routing* stages.
The state function must always return the next behaviour to be used when an element should be emited,
we use the special :class:`SameState` object which signals :class:`FlexiRoute` that no state transition is needed.
.. warning::
While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance
*must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances.
Completion handling
^^^^^^^^^^^^^^^^^^^
Completion handling in :class:`FlexiRoute` is handled similarily to :class:`FlexiMerge` (which is explained in depth in
:ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *error*
it can also react to its downstream stages *cancelling* their subscriptions. The default completion handling for
:class:`FlexiRoute` (defined in ``RouteLogic#defaultCompletionHandling``) is to continue running until all of its
downstreams have cancelled their subscriptions, or the upstream has completed / errored.
In order to customise completion handling we can override overriding the ``RouteLogic#initialCompletionHandling`` method,
or call ``ctx.changeCompletionHandling(handling)`` from within a :class:`State`. Other than the default completion handling
(as late as possible) :class:`FlexiRoute` also provides an ``eagerClose`` completion handling which completes all its
downstream streams as well as cancels its upstream as soon as *any* of its downstream stages cancels its subscription.
In the example below we implement a custom completion handler which completes the entire stream eagerly if the ``important``
downstream cancels, otherwise (if any other downstream cancels their subscription) the :class:`ImportantRoute` keeps running.
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion
Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/error
cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements
and continue with shutting down the entire stream.
Sometimes you may want to emit buffered or additional elements from the completion handler when the stream is shutting down.
However calling ``ctx.emit`` is only legal when the stream we emit to *has demand available*. In normal operation,
this is guaranteed by properly using demand conditions, however as completion handlers may be invokead at any time (without
regard to downstream demand being available) we must explicitly check that the downstream has demand available before signalling it.
The completion strategy below assumes that we have implemented some kind of :class:`FlexiRoute` which buffers elements,
yet when its upstream completes it should drain as much as possible to its downstream ``out`` output port. We use the
``ctx.isDemandAvailable(outputHandle)`` method to make sure calling emit with the buffered elements is valid and
complete this flushing once all demand (or the buffer) is drained:
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion-upstream-completed-signalling