diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala new file mode 100644 index 0000000000..df20a65c2b --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala @@ -0,0 +1,355 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.stream.FlowMaterializer +import akka.stream.scaladsl._ +import akka.stream.testkit.AkkaSpec + +import scala.collection.immutable.IndexedSeq +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class FlexiDocSpec extends AkkaSpec { + + implicit val ec = system.dispatcher + implicit val mat = FlowMaterializer() + + "implement zip using readall" in { + //#fleximerge-zip-readall + class Zip[A, B] extends FlexiMerge[(A, B)] { + import FlexiMerge._ + val left = createInputPort[A]() + val right = createInputPort[B]() + + def createMergeLogic = new MergeLogic[(A, B)] { + override def inputHandles(inputCount: Int) = { + require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount") + Vector(left, right) + } + + override def initialState: State[_] = + State[ReadAllInputs](ReadAll(left, right)) { (ctx, _, inputs) => + val a: A = inputs(left) + val b: B = inputs(right) + ctx.emit((a, b)) + SameState + } + } + } + //#fleximerge-zip-readall + + //format: OFF + //#fleximerge-zip-connecting + val head = Sink.head[(Int, String)] + //#fleximerge-zip-connecting + + val map = + //#fleximerge-zip-connecting + FlowGraph { implicit b => + import FlowGraphImplicits._ + + val zip = Zip[Int, String] + + Source.single(1) ~> zip.left + Source.single("1") ~> zip.right + zip.out ~> head + } + //#fleximerge-zip-connecting + .run() + //format: ON + + Await.result(map.get(head), 300.millis) should equal((1, "1")) + } + + "implement zip using two states" in { + //#fleximerge-zip-states + class Zip[A, B] extends FlexiMerge[(A, B)] { + import FlexiMerge._ + val left = createInputPort[A]() + val right = createInputPort[B]() + + def createMergeLogic = new MergeLogic[(A, B)] { + var lastInA: A = _ + + override def inputHandles(inputCount: Int) = { + require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount") + Vector(left, right) + } + + val readA: State[A] = State[A](Read(left)) { (ctx, input, element) => + lastInA = element + readB + } + + val readB: State[B] = State[B](Read(right)) { (ctx, input, element) => + ctx.emit((lastInA, element)) + readA + } + + override def initialState: State[_] = readA + } + } + //#fleximerge-zip-states + + val head = Sink.head[(Int, String)] + val map = FlowGraph { implicit b => + import akka.stream.scaladsl.FlowGraphImplicits._ + + val zip = new Zip[Int, String] + + Source(1 to 2) ~> zip.left + Source((1 to 2).map(_.toString)) ~> zip.right + zip.out ~> head + }.run() + + Await.result(map.get(head), 300.millis) should equal((1, "1")) + } + + "fleximerge completion handling" in { + //#fleximerge-completion + class ImportantWithBackups[A] extends FlexiMerge[A] { + import FlexiMerge._ + + val important = createInputPort[A]() + val replica1 = createInputPort[A]() + val replica2 = createInputPort[A]() + + def createMergeLogic = new MergeLogic[A] { + val inputs = Vector(important, replica1, replica2) + + override def inputHandles(inputCount: Int) = { + require(inputCount == 3, s"Must connect 3 inputs, connected only $inputCount") + inputs + } + + override def initialCompletionHandling = + CompletionHandling( + onComplete = (ctx, input) => input match { + case `important` => + log.info("Important input completed, shutting down.") + ctx.complete() + SameState + + case replica => + log.info("Replica {} completed, " + + "no more replicas available, " + + "applying eagerClose completion handling.", replica) + + ctx.changeCompletionHandling(eagerClose) + SameState + }, + onError = (ctx, input, cause) => input match { + case `important` => + ctx.error(cause) + SameState + + case replica => + log.error(cause, "Replica {} failed, " + + "no more replicas available, " + + "applying eagerClose completion handling.", replica) + + ctx.changeCompletionHandling(eagerClose) + SameState + }) + + override def initialState = State[A](ReadAny(inputs)) { + (ctx, input, element) => + ctx.emit(element) + SameState + } + } + } + //#fleximerge-completion + + FlowGraph { implicit b => + import FlowGraphImplicits._ + val importantWithBackups = new ImportantWithBackups[Int] + Source.single(1) ~> importantWithBackups.important + Source.single(2) ~> importantWithBackups.replica1 + Source.failed[Int](new Exception("Boom!") with NoStackTrace) ~> importantWithBackups.replica2 + importantWithBackups.out ~> Sink.ignore + }.run() + } + + "flexi preferring merge" in { + //#flexi-preferring-merge + class PreferringMerge extends FlexiMerge[Int] { + import akka.stream.scaladsl.FlexiMerge._ + + val preferred = createInputPort[Int]() + val secondary1 = createInputPort[Int]() + val secondary2 = createInputPort[Int]() + + def createMergeLogic = new MergeLogic[Int] { + override def inputHandles(inputCount: Int) = { + require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount") + Vector(preferred, secondary1, secondary2) + } + + override def initialState = + State[Int](ReadPreferred(preferred)(secondary1, secondary2)) { + (ctx, input, element) => + ctx.emit(element) + SameState + } + } + } + //#flexi-preferring-merge + } + + "flexi read conditions" in { + class X extends FlexiMerge[Int] { + import FlexiMerge._ + + override def createMergeLogic(): MergeLogic[Int] = new MergeLogic[Int] { + //#read-conditions + val first = createInputPort[Int]() + val second = createInputPort[Int]() + val third = createInputPort[Int]() + //#read-conditions + + //#read-conditions + val onlyFirst = Read(first) + + val firstOrThird = ReadAny(first, third) + + val firstAndSecond = ReadAll(first, second) + val firstAndThird = ReadAll(first, third) + + val mostlyFirst = ReadPreferred(first)(second, third) + + //#read-conditions + + override def inputHandles(inputCount: Int): IndexedSeq[InputHandle] = Vector() + + override def initialState: State[_] = State[ReadAllInputs](firstAndSecond) { + (ctx, input, inputs) => + val in1: Int = inputs(first) + SameState + } + } + } + } + + "flexi route" in { + //#flexiroute-unzip + class Unzip[A, B] extends FlexiRoute[(A, B)] { + import FlexiRoute._ + val outA = createOutputPort[A]() + val outB = createOutputPort[B]() + + override def createRouteLogic() = new RouteLogic[(A, B)] { + + override def outputHandles(outputCount: Int) = { + require(outputCount == 2, s"Unzip must have two connected outputs, was $outputCount") + Vector(outA, outB) + } + + override def initialState = State[Any](DemandFromAll(outA, outB)) { + (ctx, _, element) => + val (a, b) = element + ctx.emit(outA, a) + ctx.emit(outB, b) + SameState + } + + override def initialCompletionHandling = eagerClose + } + } + //#flexiroute-unzip + } + + "flexi route completion handling" in { + //#flexiroute-completion + class ImportantRoute[A] extends FlexiRoute[A] { + import FlexiRoute._ + val important = createOutputPort[A]() + val additional1 = createOutputPort[A]() + val additional2 = createOutputPort[A]() + + override def createRouteLogic() = new RouteLogic[A] { + val outputs = Vector(important, additional1, additional2) + + override def outputHandles(outputCount: Int) = { + require(outputCount == 3, s"Must have three connected outputs, was $outputCount") + outputs + } + + override def initialCompletionHandling = + CompletionHandling( + // upstream: + onComplete = (ctx) => (), + onError = (ctx, thr) => (), + // downstream: + onCancel = (ctx, output) => output match { + case `important` => + // complete all downstreams, and cancel the upstream + ctx.complete() + SameState + case _ => + SameState + }) + + override def initialState = State[A](DemandFromAny(outputs)) { + (ctx, output, element) => + ctx.emit(output, element) + SameState + } + } + } + //#flexiroute-completion + + FlowGraph { implicit b => + import FlowGraphImplicits._ + val route = new ImportantRoute[Int] + Source.single(1) ~> route.in + route.important ~> Sink.ignore + route.additional1 ~> Sink.ignore + route.additional2 ~> Sink.ignore + }.run() + } + + "flexi route completion handling emitting element upstream completion" in { + class ElementsAndStatus[A] extends FlexiRoute[A] { + import FlexiRoute._ + val out = createOutputPort[A]() + + override def createRouteLogic() = new RouteLogic[A] { + override def outputHandles(outputCount: Int) = Vector(out) + + // format: OFF + //#flexiroute-completion-upstream-completed-signalling + var buffer: List[A] + //#flexiroute-completion-upstream-completed-signalling + = List[A]() + // format: ON + + //#flexiroute-completion-upstream-completed-signalling + + def drainBuffer(ctx: RouteLogicContext[Any]): Unit = + while (ctx.isDemandAvailable(out) && buffer.nonEmpty) { + ctx.emit(out, buffer.head) + buffer = buffer.tail + } + + val signalStatusOnTermination = CompletionHandling( + onComplete = ctx => drainBuffer(ctx), + onError = (ctx, cause) => drainBuffer(ctx), + onCancel = (_, _) => SameState) + //#flexiroute-completion-upstream-completed-signalling + + override def initialCompletionHandling = signalStatusOnTermination + + override def initialState = State[A](DemandFromAny(out)) { + (ctx, output, element) => + ctx.emit(output, element) + SameState + } + } + } + } + +} diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index 2c7eee3304..15c719819e 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -149,14 +149,176 @@ Using DetachedStage Custom graph processing junctions ================================= +To extend available fan-in and fan-out structures (graph stages) Akka Streams include :class:`FlexiMerge` and +:class:`FlexiRoute` which provide an intuitive DSL which allows to describe which upstream or downstream stream +elements should be pulled from or emitted to. + Using FlexiMerge ---------------- +:class:`FlexiMerge` can be used to describe a fan-in element which contains some logic about which upstream stage the +merge should consume elements. It is recommended to create your custom fan-in stage as a separate class, name it +apropriately to the behavior it is exposing and reuse it this way – similarily as you would use built-in fan-in stages. -*TODO* +The first flexi merge example we are going to implement is a so-called "preferring merge", in which one +of the input ports is *preferred*, e.g. if the merge could pull from the preferred or another secondary input port, +it will pull from the preferred port, only pulling from the secondary ports once the preferred one does not have elements +available. + +Implementing a custom merge stage is done by extending the :class:`FlexiMerge` trait, exposing its input ports and finally +defining the logic which will decide how this merge should behave. First we need to create the input ports which are used +to wire up the fan-in element in a :class:`FlowGraph`. These input ports *must* be properly typed and their names should +indicate what kind of port it is: + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexi-preferring-merge + +Next we implement the ``createMergeLogic`` method, which will be used as factory of merges :class:`MergeLogic`. +A new :class:`MergeLogic` object will be created for each materialized stream, so it is allowed to be stateful. + +The :class:`MergeLogic` defines the behaviour of our merge stage, and may be *stateful* (for example to buffer some elements +internally). The first method we must implement in a merge logic is ``inputHandles`` in which we have the opportunity to +validate the number of connected input ports, e.g. in our preferring merge we only require that at least one input is connected. + +.. warning:: + While a :class:`MergeLogic` instance *may* be stateful, the :class:`FlexiMerge` instance + *must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances. + +Next we implement the ``initialState`` method, which returns the behaviour of the merge stage. A ``MergeLogic#State`` +defines the behaviour of the merge by signaling which input ports it is interested in consuming, and how to handle +the element once it has been pulled from its upstream. Signalling which input port we are interested in pulling data +from is done by using an apropriate *read condition*. Available *read conditions* include: + +- ``Read(input)`` - reads from only the given input, +- ``ReadAny(inputs)`` – reads from any of the given inputs, +- ``ReadPreferred(preferred)(secondaries)`` – reads from the preferred input if elements available, otherwise from one of the secondaries, +- ``ReadAll(inputs)`` – reads from *all* given inputs (like ``Zip``), and offers an :class:`ReadAllInputs` as the ``element`` passed into the state function, which allows to obtain the pulled element values in a type-safe way. + +In our case we use the :class:`ReadPreferred` read condition which has the exact semantics which we need to implement +our preferring merge – it pulls elements from the preferred input port if there are any available, otherwise reverting +to pulling from the secondary inputs. The context object passed into the state function allows us to interact with the +connected streams, for example by emitting an ``element``, which was just pulled from the given ``input``, or signalling +completion or errors to the merges downstream stage. + +The state function must always return the next behaviour to be used when an element should be pulled from its upstreams, +we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed. + +Implementing Zip-like merges +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +More complex fan-in junctions may require not only multiple States but also sharing state between those states. +As :class:`MergeLogic` is allowed to be stateful, it can be easily used to hold the state of the merge junction. + +We now implement the equivalent of the built-in ``Zip`` junction by using the property that a the MergeLogic can be stateful +and that each read is followed by a state transition (much like in Akka FSM or ``Actor#become``). + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-zip-states + +The above style of implementing complex flexi merges is useful when we need fine grained control over consuming from certain +input ports. Sometimes however it is simpler to strictly consume all of a given set of inputs. In the ``Zip`` rewrite below +we use the :class:`ReadAll` read condition, which behaves slightly differently than the other read conditions, as the element +it is emitting is of the type :class:`ReadAllInputs` instead of directly handing over the pulled elements: + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-zip-readall + +Thanks to being handed a :class:`ReadAllInputs` instance instead of the elements directly it is possible to pick elements +in a type-safe way based on their input port. + +Connecting your custom junction is as simple as creating an instance and connecting Sources and Sinks to its ports +(notice that the merged output port is named ``out``): + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-zip-connecting + +.. _flexi-merge-completion-handling-scala: + +Completion handling +^^^^^^^^^^^^^^^^^^^ +Completion handling in :class:`FlexiMerge` is defined by an :class:`CompletionHandling` object which can react on +completion and error signals from its upstream input ports. The default strategy is to remain running while at-least-one +upstream input port which are declared to be consumed in the current state is still running (i.e. has not signalled +completion or error). + +Customising completion can be done via overriding the ``MergeLogic#initialCompletionHandling`` method, or from within +a :class:`State` by calling ``ctx.changeCompletionHandling(handling)``. Other than the default completion handling (as +late as possible) :class:`FlexiMerge` also provides an ``eagerClose`` completion handling which completes (or errors) its +downstream as soon as at least one of its upstream inputs completes (or errors). + +In the example below the we implement an ``ImportantWithBackups`` fan-in stage which can only keep operating while +the ``important`` and at-least-one of the ``replica`` inputs are active. Therefore in our custom completion strategy we +have to investigate which input has completed or errored and act accordingly. If the important input completed or errored +we propagate this downstream completing the stream, on the other hand if the first replicated input fails, we log the +exception and instead of erroring the downstream swallow this exception (as one failed replica is still acceptable). +Then we change the completion strategy to ``eagerClose`` which will propagate any future completion or error event right +to this stages downstream effectively shutting down the stream. + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-completion + +In case you want to change back to the default completion handling, it is available as ``MergeLogic#defaultCompletionHandling``. Using FlexiRoute ---------------- +Similarily to using :class:`FlexiMerge`, implementing custom fan-out stages requires extending the :class:`FlexiRoute` class +and with a :class:`RouteLogic` object which determines how the route should behave. -*TODO* +The first flexi route stage that we are going to implement is ``Unzip``, which consumes a stream of pairs and splits +it into two streams of the first and second elements of each tuple. +A :class:`FlexiRoute` has exactly-one input port (in our example, type parameterized as ``(A,B)``), and may have multiple +output ports, all of which must be created before hand (they can not be added dynamically), however not all output ports +must be connected. You can validate the number of connected output ports in the ``RouteLogic#outputHandles`` method, +which receives the number of connected output ports for a given instance of the flexi route in a given materialization. +The :class:`Vector` returned from ``outputHandles`` must include all output ports which are to be used by this junction: +.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-unzip + +Next we implement ``RouteLogic#initialState`` by providing a State that uses the :class:`DemandFromAll` *demand condition* +to signal to flexi route that elements can only be emitted from this stage when demand is available from all given downstream +output ports. Other available demand conditions are: + +- ``DemandFrom(output)`` - triggers when the given output port has pending demand, +- ``DemandFromAny(outputs)`` - triggers when any of the given output ports has pending demand, +- ``DemandFromAll(outputs)`` - triggers when *all* of the given output ports has pending demand. + +Since the ``Unzip`` junction we're implementing signals both downstreams stages at the same time, we use ``DemandFromAll``, +unpack the incoming tuple in the state function and signal its first element to the ``left`` stream, and the second element +of the tuple to the ``right`` stream. Notice that since we are emitting values of different types (``A`` and ``B``), +the type parameter of this ``State[_]`` must be set to ``Any``. This type can be utilised more efficiently when a junction +is emitting the same type of element to its downstreams e.g. in all *strictly routing* stages. + +The state function must always return the next behaviour to be used when an element should be emited, +we use the special :class:`SameState` object which signals :class:`FlexiRoute` that no state transition is needed. + +.. warning:: + While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance + *must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances. + +Completion handling +^^^^^^^^^^^^^^^^^^^ +Completion handling in :class:`FlexiRoute` is handled similarily to :class:`FlexiMerge` (which is explained in depth in +:ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *error* +it can also react to its downstream stages *cancelling* their subscriptions. The default completion handling for +:class:`FlexiRoute` (defined in ``RouteLogic#defaultCompletionHandling``) is to continue running until all of its +downstreams have cancelled their subscriptions, or the upstream has completed / errored. + +In order to customise completion handling we can override overriding the ``RouteLogic#initialCompletionHandling`` method, +or call ``ctx.changeCompletionHandling(handling)`` from within a :class:`State`. Other than the default completion handling +(as late as possible) :class:`FlexiRoute` also provides an ``eagerClose`` completion handling which completes all its +downstream streams as well as cancels its upstream as soon as *any* of its downstream stages cancels its subscription. + +In the example below we implement a custom completion handler which completes the entire stream eagerly if the ``important`` +downstream cancels, otherwise (if any other downstream cancels their subscription) the :class:`ImportantRoute` keeps running. + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion + +Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/error +cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements +and continue with shutting down the entire stream. + +Sometimes you may want to emit buffered or additional elements from the completion handler when the stream is shutting down. +However calling ``ctx.emit`` is only legal when the stream we emit to *has demand available*. In normal operation, +this is guaranteed by properly using demand conditions, however as completion handlers may be invokead at any time (without +regard to downstream demand being available) we must explicitly check that the downstream has demand available before signalling it. + +The completion strategy below assumes that we have implemented some kind of :class:`FlexiRoute` which buffers elements, +yet when its upstream completes it should drain as much as possible to its downstream ``out`` output port. We use the +``ctx.isDemandAvailable(outputHandle)`` method to make sure calling emit with the buffered elements is valid and +complete this flushing once all demand (or the buffer) is drained: + +.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion-upstream-completed-signalling diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala index 9ab34ace3a..f9cb782416 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl +import java.io.{File, FileInputStream} + import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import akka.stream.MaterializerSettings diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index f57d9a9557..260296d555 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -160,10 +160,8 @@ object FlexiRoute { * handle cancel from downstream output. * * The `onComplete` function is called the upstream input was completed successfully. - * It returns next behavior or [[#SameState]] to keep current behavior. * * The `onError` function is called when the upstream input was completed with failure. - * It returns next behavior or [[#SameState]] to keep current behavior. * * The `onCancel` function is called when a downstream output cancels. * It returns next behavior or [[#SameState]] to keep current behavior.