Merge pull request #16754 from akka/wip-16565-remove-isDemandAvailable-patriknw

!str #16565 Make Flexi* limitations explicit
This commit is contained in:
Patrik Nordwall 2015-02-03 13:44:16 +01:00
commit 39bdf6ff47
14 changed files with 206 additions and 303 deletions

View file

@ -39,7 +39,7 @@ Akka Streams currently provide these junctions:
- ``ZipWith<A,B,...,Out>`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output,
- ``Zip<A,B>`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair<A,B>`` stream,
- ``Concat<A>`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters,
- ``FlexiMerge<Out>`` (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL.
- ``FlexiMerge<Out>`` (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL.
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating

View file

@ -312,44 +312,4 @@ class FlexiDocSpec extends AkkaSpec {
}.run()
}
"flexi route completion handling emitting element upstream completion" in {
class ElementsAndStatus[A] extends FlexiRoute[A] {
import FlexiRoute._
val out = createOutputPort[A]()
override def createRouteLogic() = new RouteLogic[A] {
override def outputHandles(outputCount: Int) = Vector(out)
// format: OFF
//#flexiroute-completion-upstream-completed-signalling
var buffer: List[A]
//#flexiroute-completion-upstream-completed-signalling
= List[A]()
// format: ON
//#flexiroute-completion-upstream-completed-signalling
def drainBuffer(ctx: RouteLogicContext[Any]): Unit =
while (ctx.isDemandAvailable(out) && buffer.nonEmpty) {
ctx.emit(out, buffer.head)
buffer = buffer.tail
}
val signalStatusOnTermination = CompletionHandling(
onUpstreamFinish = ctx => drainBuffer(ctx),
onUpstreamFailure = (ctx, cause) => drainBuffer(ctx),
onDownstreamFinish = (_, _) => SameState)
//#flexiroute-completion-upstream-completed-signalling
override def initialCompletionHandling = signalStatusOnTermination
override def initialState = State[A](DemandFromAny(out)) {
(ctx, output, element) =>
ctx.emit(output, element)
SameState
}
}
}
}
}

View file

@ -203,6 +203,9 @@ completion or errors to the merges downstream stage.
The state function must always return the next behaviour to be used when an element should be pulled from its upstreams,
we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed.
.. note::
As response to an input element it is allowed to emit at most one output element.
Implementing Zip-like merges
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
More complex fan-in junctions may require not only multiple States but also sharing state between those states.
@ -254,6 +257,9 @@ to this stages downstream effectively shutting down the stream.
In case you want to change back to the default completion handling, it is available as ``MergeLogic#defaultCompletionHandling``.
It is not possible to emit elements from the completion handling, since completion
handlers may be invoked at any time (without regard to downstream demand being available).
Using FlexiRoute
----------------
Similarily to using :class:`FlexiMerge`, implementing custom fan-out stages requires extending the :class:`FlexiRoute` class
@ -284,12 +290,15 @@ of the tuple to the ``right`` stream. Notice that since we are emitting values o
the type parameter of this ``State[_]`` must be set to ``Any``. This type can be utilised more efficiently when a junction
is emitting the same type of element to its downstreams e.g. in all *strictly routing* stages.
The state function must always return the next behaviour to be used when an element should be emited,
The state function must always return the next behaviour to be used when an element should be emitted,
we use the special :class:`SameState` object which signals :class:`FlexiRoute` that no state transition is needed.
.. warning::
While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance
*must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances.
.. note::
It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown.
Completion handling
^^^^^^^^^^^^^^^^^^^
@ -313,14 +322,6 @@ Notice that State changes are only allowed in reaction to downstream cancellatio
cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements
and continue with shutting down the entire stream.
Sometimes you may want to emit buffered or additional elements from the completion handler when the stream is shutting down.
However calling ``ctx.emit`` is only legal when the stream we emit to *has demand available*. In normal operation,
this is guaranteed by properly using demand conditions, however as completion handlers may be invokead at any time (without
regard to downstream demand being available) we must explicitly check that the downstream has demand available before signalling it.
It is not possible to emit elements from the completion handling, since completion
handlers may be invoked at any time (without regard to downstream demand being available).
The completion strategy below assumes that we have implemented some kind of :class:`FlexiRoute` which buffers elements,
yet when its upstream completes it should drain as much as possible to its downstream ``out`` output port. We use the
``ctx.isDemandAvailable(outputHandle)`` method to make sure calling emit with the buffered elements is valid and
complete this flushing once all demand (or the buffer) is drained:
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion-upstream-completed-signalling

View file

@ -39,7 +39,7 @@ Akka Streams currently provide these junctions:
- ``ZipWith[A,B,...,Out]`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output,
- ``Zip[A,B]`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream,
- ``Concat[A]`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters,
- ``FlexiMerge[Out]`` (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL.
- ``FlexiMerge[Out]`` (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL.
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating

View file

@ -162,14 +162,21 @@ private[http] object HttpServer {
case (ctx, _, error) { ctx.fail(error); SameState }
})
def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {
def finishWithError(ctx: MergeLogicContextBase, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {
log.warning("Illegal {}, responding with status '{}': {}", target, status, info.formatPretty)
val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary
ctx.emit(ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true))
// FIXME this is a workaround that is supposed to be solved by issue #16753
ctx match {
case fullCtx: MergeLogicContext
// note that this will throw IllegalArgumentException if no demand available
fullCtx.emit(ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true))
case other throw new IllegalStateException(s"Unexpected MergeLogicContext [${other.getClass.getName}]")
}
//
finish(ctx)
}
def finish(ctx: MergeLogicContext): State[Any] = {
def finish(ctx: MergeLogicContextBase): State[Any] = {
ctx.finish() // shouldn't this return a `State` rather than `Unit`?
SameState // it seems weird to stay in the same state after completion
}

View file

@ -181,13 +181,13 @@ public class FlexiMergeTest {
private final CompletionHandling<T> emitOtherOnClose = new CompletionHandling<T>() {
@Override
public State<T, T> onUpstreamFinish(MergeLogicContext<T> ctx, InputHandle input) {
public State<T, T> onUpstreamFinish(MergeLogicContextBase<T> ctx, InputHandle input) {
ctx.changeCompletionHandling(defaultCompletionHandling());
return readRemaining(other(input));
}
@Override
public State<T, T> onUpstreamFailure(MergeLogicContext<T> ctx, InputHandle inputHandle, Throwable cause) {
public State<T, T> onUpstreamFailure(MergeLogicContextBase<T> ctx, InputHandle inputHandle, Throwable cause) {
ctx.fail(cause);
return sameState();
}

View file

@ -7,8 +7,9 @@ import akka.stream.testkit.StreamTestKit.AutoPublisher
import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.PublisherProbe
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import scala.util.control.NoStackTrace
import akka.actor.ActorRef
import akka.testkit.TestProbe
object GraphFlexiMergeSpec {
@ -138,72 +139,6 @@ class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends
}
}
class OrderedMerge extends FlexiMerge[Int] {
import FlexiMerge._
val input1 = createInputPort[Int]()
val input2 = createInputPort[Int]()
def createMergeLogic = new MergeLogic[Int] {
private var reference = 0
override def inputHandles(inputCount: Int) = Vector(input1, input2)
val emitOtherOnClose = CompletionHandling(
onUpstreamFinish = { (ctx, input)
ctx.changeCompletionHandling(emitLast)
readRemaining(other(input))
},
onUpstreamFailure = { (ctx, input, cause)
ctx.fail(cause)
SameState
})
def other(input: InputHandle): InputHandle = if (input eq input1) input2 else input1
def getFirstElement = State[Int](ReadAny(input1, input2)) { (ctx, input, element)
reference = element
ctx.changeCompletionHandling(emitOtherOnClose)
readUntilLarger(other(input))
}
def readUntilLarger(input: InputHandle): State[Int] = State[Int](Read(input)) {
(ctx, input, element)
if (element <= reference) {
ctx.emit(element)
SameState
} else {
ctx.emit(reference)
reference = element
readUntilLarger(other(input))
}
}
def readRemaining(input: InputHandle) = State[Int](Read(input)) {
(ctx, input, element)
if (element <= reference)
ctx.emit(element)
else {
ctx.emit(reference)
reference = element
}
SameState
}
val emitLast = CompletionHandling(
onUpstreamFinish = { (ctx, input)
if (ctx.isDemandAvailable)
ctx.emit(reference)
SameState
},
onUpstreamFailure = { (ctx, input, cause)
ctx.fail(cause)
SameState
})
override def initialState = getFirstElement
}
}
class PreferringMerge extends FlexiMerge[Int] {
import FlexiMerge._
val preferred = createInputPort[Int]()
@ -221,7 +156,7 @@ class PreferringMerge extends FlexiMerge[Int] {
}
}
class TestMerge extends FlexiMerge[String] {
class TestMerge(completionProbe: ActorRef) extends FlexiMerge[String] {
import FlexiMerge._
val input1 = createInputPort[String]()
val input2 = createInputPort[String]()
@ -254,8 +189,7 @@ class TestMerge extends FlexiMerge[String] {
onUpstreamFinish = { (ctx, input)
if (throwFromOnComplete)
throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
if (ctx.isDemandAvailable)
ctx.emit("onUpstreamFinish: " + input.portIndex)
completionProbe ! "onUpstreamFinish: " + input.portIndex
SameState
},
onUpstreamFailure = { (ctx, input, cause)
@ -385,54 +319,6 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectComplete()
}
"build simple ordered merge 1" in {
val output = Sink.publisher[Int]
val m = FlowGraph { implicit b
val merge = new OrderedMerge
Source(List(3, 5, 6, 7, 8)) ~> merge.input1
Source(List(1, 2, 4, 9)) ~> merge.input2
merge.out ~> output
}.run()
val s = SubscriberProbe[Int]
val p = m.get(output)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(100)
for (n 1 to 9) {
s.expectNext(n)
}
s.expectComplete()
}
"build simple ordered merge 2" in {
val output = Sink.publisher[Int]
val m = FlowGraph { implicit b
val merge = new OrderedMerge
Source(List(3, 5, 6, 7, 8)) ~> merge.input1
Source(List(3, 5, 6, 7, 8, 10)) ~> merge.input2
merge.out ~> output
}.run()
val s = SubscriberProbe[Int]
val p = m.get(output)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(100)
s.expectNext(3)
s.expectNext(3)
s.expectNext(5)
s.expectNext(5)
s.expectNext(6)
s.expectNext(6)
s.expectNext(7)
s.expectNext(7)
s.expectNext(8)
s.expectNext(8)
s.expectNext(10)
s.expectComplete()
}
"build perferring merge" in {
val output = Sink.publisher[Int]
val m = FlowGraph { implicit b
@ -523,8 +409,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
"support cancel of input" in {
val publisher = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(publisher) ~> merge.input1
Source(List("b", "c", "d")) ~> merge.input2
Source(List("e", "f")) ~> merge.input3
@ -546,9 +433,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectNext("onInput: e")
s.expectNext("onInput: c")
s.expectNext("onInput: f")
s.expectNext("onUpstreamFinish: 2")
completionProbe.expectMsg("onUpstreamFinish: 2")
s.expectNext("onInput: d")
s.expectNext("onUpstreamFinish: 1")
completionProbe.expectMsg("onUpstreamFinish: 1")
autoPublisher.sendNext("x")
@ -559,8 +446,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val publisher1 = PublisherProbe[String]
val publisher2 = PublisherProbe[String]
val publisher3 = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(publisher1) ~> merge.input1
Source(publisher2) ~> merge.input2
Source(publisher3) ~> merge.input3
@ -592,8 +480,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}
"handle failure" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1
Source(List("a", "b")) ~> merge.input2
Source(List("c")) ~> merge.input3
@ -608,16 +497,17 @@ class GraphFlexiMergeSpec extends AkkaSpec {
// IllegalArgumentException is swallowed by the CompletionHandler
s.expectNext("onInput: a")
s.expectNext("onInput: c")
s.expectNext("onUpstreamFinish: 2")
completionProbe.expectMsg("onUpstreamFinish: 2")
s.expectNext("onInput: b")
s.expectNext("onUpstreamFinish: 1")
completionProbe.expectMsg("onUpstreamFinish: 1")
s.expectComplete()
}
"propagate failure" in {
val publisher = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(publisher) ~> merge.input1
Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.input2
Source.empty[String] ~> merge.input3
@ -631,8 +521,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}
"emit failure" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(List("a", "err")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3
@ -650,8 +541,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}
"emit failure for user thrown exception" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(List("a", "exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3
@ -669,8 +561,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}
"emit failure for user thrown exception in onUpstreamFinish" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(List("a", "onUpstreamFinish-exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3
@ -689,8 +582,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
"emit failure for user thrown exception in onUpstreamFinish 2" in {
val publisher = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source.empty[String] ~> merge.input1
Source(publisher) ~> merge.input2
Source.empty[String] ~> merge.input3
@ -713,8 +607,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}
"support finish from onInput" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(List("a", "finish")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3
@ -732,8 +627,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}
"support unconnected inputs" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b
val merge = new TestMerge
val merge = new TestMerge(completionProbe.ref)
Source(List("a")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
// input3 not connected
@ -746,10 +642,10 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val sub = s.expectSubscription()
sub.request(10)
s.expectNext("onInput: a")
s.expectNext("onUpstreamFinish: 0")
completionProbe.expectMsg("onUpstreamFinish: 0")
s.expectNext("onInput: b")
s.expectNext("onInput: c")
s.expectNext("onUpstreamFinish: 1")
completionProbe.expectMsg("onUpstreamFinish: 1")
s.expectComplete()
}

View file

@ -10,6 +10,8 @@ import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.PublisherProbe
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.testkit.TestProbe
object GraphFlexiRouteSpec {
@ -89,7 +91,7 @@ object GraphFlexiRouteSpec {
}
}
class TestRoute extends FlexiRoute[String] {
class TestRoute(completionProbe: ActorRef) extends FlexiRoute[String] {
import FlexiRoute._
val output1 = createOutputPort[String]()
val output2 = createOutputPort[String]()
@ -122,26 +124,17 @@ object GraphFlexiRouteSpec {
onUpstreamFinish = { ctx
if (throwFromOnComplete)
throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
handles.foreach { output
if (ctx.isDemandAvailable(output))
ctx.emit(output, "onUpstreamFinish")
}
completionProbe ! "onUpstreamFinish"
},
onUpstreamFailure = { (ctx, cause)
cause match {
case _: IllegalArgumentException // swallow
case _
handles.foreach { output
if (ctx.isDemandAvailable(output))
ctx.emit(output, "onError")
}
completionProbe ! "onError"
}
},
onDownstreamFinish = { (ctx, cancelledOutput)
handles.foreach { output
if (output != cancelledOutput && ctx.isDemandAvailable(output))
ctx.emit(output, "onDownstreamFinish: " + cancelledOutput.portIndex)
}
completionProbe ! "onDownstreamFinish: " + cancelledOutput.portIndex
SameState
})
}
@ -151,8 +144,9 @@ object GraphFlexiRouteSpec {
val publisher = PublisherProbe[String]
val s1 = SubscriberProbe[String]
val s2 = SubscriberProbe[String]
val completionProbe = TestProbe()
FlowGraph { implicit b
val route = new TestRoute
val route = new TestRoute(completionProbe.ref)
Source(publisher) ~> route.in
route.output1 ~> Sink(s1)
route.output2 ~> Sink(s2)
@ -168,7 +162,6 @@ object GraphFlexiRouteSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class GraphFlexiRouteSpec extends AkkaSpec {
import GraphFlexiRouteSpec._
@ -316,7 +309,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
s1.expectError().getMessage should be("err-1")
autoPublisher.sendComplete()
s2.expectNext("onUpstreamFinish")
completionProbe.expectMsg("onUpstreamFinish")
s2.expectComplete()
}
@ -370,7 +363,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2)
sub1.cancel()
s2.expectNext("onDownstreamFinish: 0")
completionProbe.expectMsg("onDownstreamFinish: 0")
s1.expectNoMsg(200.millis)
autoPublisher.sendNext("c")
@ -393,8 +386,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2)
autoPublisher.sendComplete()
s1.expectNext("onUpstreamFinish")
s2.expectNext("onUpstreamFinish")
completionProbe.expectMsg("onUpstreamFinish")
s1.expectComplete()
s2.expectComplete()
@ -413,8 +405,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2)
autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace)
s1.expectNext("onError")
s2.expectNext("onError")
completionProbe.expectMsg("onError")
s1.expectError().getMessage should be("test err")
s2.expectError().getMessage should be("test err")
@ -433,7 +424,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2)
sub1.cancel()
s2.expectNext("onDownstreamFinish: 0")
completionProbe.expectMsg("onDownstreamFinish: 0")
sub2.cancel()
autoPublisher.subscription.expectCancellation()

View file

@ -41,6 +41,8 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
private var behavior: StateT = _
private var completion: CompletionT = _
// needed to ensure that at most one element is emitted from onInput
private var emitted = false
override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) {
override def onError(input: Int, e: Throwable): Unit = {
@ -56,10 +58,12 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
}
private val ctx: mergeLogic.MergeLogicContext = new mergeLogic.MergeLogicContext {
override def isDemandAvailable: Boolean = primaryOutputs.demandAvailable
override def emit(elem: Any): Unit = {
if (emitted)
throw new IllegalStateException("It is only allowed to `emit` zero or one element in response to `onInput`")
require(primaryOutputs.demandAvailable, "emit not allowed when no demand available")
emitted = true
primaryOutputs.enqueueOutputElement(elem)
}
@ -131,17 +135,17 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
val id = inputBunch.idToDequeue()
val elem = inputBunch.dequeueAndYield(id)
val inputHandle = inputMapping(id)
changeBehavior(behavior.onInput(ctx, inputHandle, elem))
callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle)
case read: ReadPreferred
val id = inputBunch.idToDequeue()
val elem = inputBunch.dequeueAndPrefer(id)
val inputHandle = inputMapping(id)
changeBehavior(behavior.onInput(ctx, inputHandle, elem))
callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle)
case Read(inputHandle)
val elem = inputBunch.dequeue(inputHandle.portIndex)
changeBehavior(behavior.onInput(ctx, inputHandle, elem))
callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle)
case read: ReadAll
val inputHandles = read.inputs
@ -150,7 +154,7 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
case input if include(input.portIndex) input inputBunch.dequeue(input.portIndex)
}
changeBehavior(behavior.onInput(ctx, inputHandles.head, read.mkResult(Map(values: _*))))
callOnInput(inputHandles.head, read.mkResult(Map(values: _*)))
// must be triggered after emitting the accumulated out value
triggerCompletionAfterRead(inputHandles)
@ -158,6 +162,11 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
})
private def callOnInput(input: InputHandle, element: Any): Unit = {
emitted = false
changeBehavior(behavior.onInput(ctx, input, element))
}
private def triggerCompletionAfterRead(inputs: Seq[InputHandle]): Unit = {
var j = 0
while (j < inputs.length) {

View file

@ -42,6 +42,8 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings,
private var behavior: StateT = _
private var completion: CompletionT = _
// needed to ensure that at most one element is emitted from onInput
private val emitted = Array.ofDim[Boolean](outputCount)
override protected val outputBunch = new OutputBunch(outputPorts, self, this) {
override def onCancel(output: Int): Unit =
@ -64,11 +66,14 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings,
}
private val ctx: routeLogic.RouteLogicContext[Any] = new routeLogic.RouteLogicContext[Any] {
override def isDemandAvailable(output: OutputHandle): Boolean =
(output.portIndex < outputCount) && outputBunch.isPending(output.portIndex)
override def emit(output: OutputHandle, elem: Any): Unit = {
require(outputBunch.isPending(output.portIndex), s"emit to [$output] not allowed when no demand available")
require(output.portIndex < outputCount, s"invalid output port index [${output.portIndex}, max index [${outputCount - 1}]")
if (emitted(output.portIndex))
throw new IllegalStateException("It is only allowed to `emit` at most one element to each output in response to `onInput`")
require(outputBunch.isPending(output.portIndex),
s"emit to [$output] not allowed when no demand available")
emitted(output.portIndex) = true
outputBunch.enqueue(output.portIndex, elem)
}
@ -138,18 +143,27 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings,
case any: DemandFromAny
val id = outputBunch.idToEnqueueAndYield()
val outputHandle = outputMapping(id)
changeBehavior(behavior.onInput(ctx, outputHandle, elem))
callOnInput(outputHandle, elem)
case DemandFrom(outputHandle)
changeBehavior(behavior.onInput(ctx, outputHandle, elem))
callOnInput(outputHandle, elem)
case all: DemandFromAll
val id = outputBunch.idToEnqueueAndYield()
val outputHandle = outputMapping(id)
changeBehavior(behavior.onInput(ctx, outputHandle, elem))
callOnInput(outputHandle, elem)
}
})
private def callOnInput(output: OutputHandle, element: Any): Unit = {
var i = 0
while (i < emitted.length) {
emitted(i) = false
i += 1
}
changeBehavior(behavior.onInput(ctx, output, element))
}
}

View file

@ -85,7 +85,7 @@ object FlexiMerge {
* fulfilled when there are elements for *all* of the given upstream
* inputs.
*
* The emited element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge.
* The emitted element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge.
*
* Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`,
* the resulting [[ReadAllInputs]] will then not contain values for this element, which can be
@ -105,25 +105,27 @@ object FlexiMerge {
}
/**
* Context that is passed to the methods of [[State]] and [[CompletionHandling]].
* Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements
* downstream.
*/
trait MergeLogicContext[Out] {
trait MergeLogicContext[Out] extends MergeLogicContextBase[Out] {
/**
* @return `true` if at least one element has been requested by downstream (output).
*/
def isDemandAvailable: Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException`
* Emit one element downstream. It is only allowed to `emit` zero or one
* element in response to `onInput`, otherwise `IllegalStateException`
* is thrown.
*/
def emit(elem: Out): Unit
}
/**
* Context that is passed to the functions of [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait MergeLogicContextBase[Out] {
/**
* Complete this stream succesfully. Upstream subscriptions will be cancelled.
* Complete this stream successfully. Upstream subscriptions will be cancelled.
*/
def finish(): Unit
@ -148,17 +150,20 @@ object FlexiMerge {
*
* The `onUpstreamFinish` method is called when an upstream input was completed sucessfully.
* It returns next behavior or [[MergeLogic#sameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#finish]],
* A completion can be propagated downstream with [[MergeLogicContextBase#finish]],
* or it can be swallowed to continue with remaining inputs.
*
* The `onUpstreamFailure` method is called when an upstream input was completed with failure.
* It returns next behavior or [[MergeLogic#sameState]] to keep current behavior.
* A failure can be propagated downstream with [[MergeLogicContext#fail]],
* A failure can be propagated downstream with [[MergeLogicContextBase#fail]],
* or it can be swallowed to continue with remaining inputs.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/
abstract class CompletionHandling[Out] {
def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out]
def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out]
def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[_, Out]
def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[_, Out]
}
/**
@ -224,9 +229,9 @@ object FlexiMerge {
*/
def defaultCompletionHandling[A]: CompletionHandling[Out] =
new CompletionHandling[Out] {
override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] =
override def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[A, Out] =
sameState
override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
override def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.fail(cause)
sameState
}
@ -238,11 +243,11 @@ object FlexiMerge {
*/
def eagerClose[A]: CompletionHandling[Out] =
new CompletionHandling[Out] {
override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = {
override def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[A, Out] = {
ctx.finish()
sameState
}
override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
override def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.fail(cause)
sameState
}
@ -283,13 +288,15 @@ object FlexiMerge {
delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling =
CompletionHandling(
onUpstreamFinish = (ctx, inputHandle) {
val widenedCtxt = ctx.asInstanceOf[MergeLogicContext] // we know that it is always a MergeLogicContext
val newDelegateState = delegateCompletionHandling.onUpstreamFinish(
new MergeLogicContextWrapper(ctx), asJava(inputHandle))
new MergeLogicContextWrapper(widenedCtxt), asJava(inputHandle))
wrapState(newDelegateState)
},
onUpstreamFailure = (ctx, inputHandle, cause) {
val widenedCtxt = ctx.asInstanceOf[MergeLogicContext] // we know that it is always a MergeLogicContext
val newDelegateState = delegateCompletionHandling.onUpstreamFailure(
new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause)
new MergeLogicContextWrapper(widenedCtxt), asJava(inputHandle), cause)
wrapState(newDelegateState)
})
@ -297,7 +304,6 @@ object FlexiMerge {
inputHandle.asInstanceOf[InputHandle]
class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] {
override def isDemandAvailable: Boolean = delegate.isDemandAvailable
override def emit(elem: Out): Unit = delegate.emit(elem)
override def finish(): Unit = delegate.finish()
override def fail(cause: Throwable): Unit = delegate.fail(cause)
@ -328,6 +334,8 @@ object FlexiMerge {
*
* The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]]
* that will be used when reading input elements and emitting output elements.
* As response to an input element it is allowed to emit at most one output element.
*
* The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance
* must not hold mutable state, since it may be shared across several materialized ``FlowGraph``
* instances.

View file

@ -75,23 +75,24 @@ object FlexiRoute {
class DemandFromAll(val outputs: JList[OutputHandle]) extends DemandCondition
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements
* downstream.
*/
trait RouteLogicContext[In, Out] {
trait RouteLogicContext[In, Out] extends RouteLogicContextBase[In] {
/**
* @return `true` if at least one element has been requested by the given downstream (output).
*/
def isDemandAvailable(output: OutputHandle): Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true` for the given `output`, otherwise
* `IllegalArgumentException` is thrown.
* Emit one element downstream. It is only allowed to `emit` at most one element to
* each output in response to `onInput`, `IllegalStateException` is thrown.
*/
def emit(output: OutputHandle, elem: Out): Unit
}
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait RouteLogicContextBase[In] {
/**
* Complete the given downstream successfully.
*/
@ -130,19 +131,22 @@ object FlexiRoute {
*
* The `onDownstreamFinish` method is called when a downstream output cancels.
* It returns next behavior or [[#sameState]] to keep current behavior.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/
abstract class CompletionHandling[In] {
def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit
def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit
def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _]
def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit
def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit
def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _]
}
/**
* Definition of which outputs that must have requested elements and how to act
* on the read elements. When an element has been read [[#onInput]] is called and
* then it is ensured that the specified downstream outputs have requested at least
* one element, i.e. it is allowed to emit at least one element downstream with
* [[RouteLogicContext#emit]].
* one element, i.e. it is allowed to emit at most one element to each downstream
* output with [[RouteLogicContext#emit]].
*
* The `onInput` method is called when an `element` was read from upstream.
* The function returns next behavior or [[#sameState]] to keep current behavior.
@ -195,9 +199,9 @@ object FlexiRoute {
*/
def defaultCompletionHandling: CompletionHandling[In] =
new CompletionHandling[In] {
override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = ()
override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = ()
override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] =
override def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit = ()
override def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit = ()
override def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] =
sameState
}
@ -207,9 +211,9 @@ object FlexiRoute {
*/
def eagerClose[A]: CompletionHandling[In] =
new CompletionHandling[In] {
override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = ()
override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = ()
override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = {
override def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit = ()
override def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit = ()
override def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] = {
ctx.finish()
sameState
}
@ -250,14 +254,17 @@ object FlexiRoute {
delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling =
CompletionHandling(
onUpstreamFinish = ctx {
delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(ctx))
val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext
delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(widenedCtxt))
},
onUpstreamFailure = (ctx, cause) {
delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(ctx), cause)
val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext
delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(widenedCtxt), cause)
},
onDownstreamFinish = (ctx, outputHandle) {
val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext
val newDelegateState = delegateCompletionHandling.onDownstreamFinish(
new RouteLogicContextWrapper(ctx), asJava(outputHandle))
new RouteLogicContextWrapper(widenedCtxt), asJava(outputHandle))
wrapState(newDelegateState)
})
@ -265,7 +272,6 @@ object FlexiRoute {
outputHandle.asInstanceOf[OutputHandle]
class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] {
override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output)
override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem)
override def finish(): Unit = delegate.finish()
override def finish(output: OutputHandle): Unit = delegate.finish(output)

View file

@ -89,7 +89,7 @@ object FlexiMerge {
* fulfilled when there are elements for *all* of the given upstream
* inputs.
*
* The emited element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge.
* The emitted element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge.
*
* Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`,
* the resulting [[ReadAllInputs]] will then not contain values for this element, which can be
@ -119,23 +119,25 @@ object FlexiMerge {
def initialCompletionHandling: CompletionHandling = defaultCompletionHandling
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements
* downstream.
*/
trait MergeLogicContext {
trait MergeLogicContext extends MergeLogicContextBase {
/**
* @return `true` if at least one element has been requested by downstream (output).
*/
def isDemandAvailable: Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException`
* Emit one element downstream. It is only allowed to `emit` zero or one
* element in response to `onInput`, otherwise `IllegalStateException`
* is thrown.
*/
def emit(elem: Out): Unit
}
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait MergeLogicContextBase {
/**
* Complete this stream successfully. Upstream subscriptions will be cancelled.
*/
@ -161,7 +163,7 @@ object FlexiMerge {
* Definition of which inputs to read from and how to act on the read elements.
* When an element has been read [[#onInput]] is called and then it is ensured
* that downstream has requested at least one element, i.e. it is allowed to
* emit at least one element downstream with [[MergeLogicContext#emit]].
* emit at most one element downstream with [[MergeLogicContext#emit]].
*
* The `onInput` function is called when an `element` was read from the `input`.
* The function returns next behavior or [[#SameState]] to keep current behavior.
@ -188,17 +190,20 @@ object FlexiMerge {
*
* The `onUpstreamFinish` function is called when an upstream input was completed successfully.
* It returns next behavior or [[#SameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#finish]],
* A completion can be propagated downstream with [[MergeLogicContextBase#finish]],
* or it can be swallowed to continue with remaining inputs.
*
* The `onUpstreamFailure` function is called when an upstream input was completed with failure.
* It returns next behavior or [[#SameState]] to keep current behavior.
* A failure can be propagated downstream with [[MergeLogicContext#fail]],
* A failure can be propagated downstream with [[MergeLogicContextBase#fail]],
* or it can be swallowed to continue with remaining inputs.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/
sealed case class CompletionHandling(
onUpstreamFinish: (MergeLogicContext, InputHandle) State[_],
onUpstreamFailure: (MergeLogicContext, InputHandle, Throwable) State[_])
onUpstreamFinish: (MergeLogicContextBase, InputHandle) State[_],
onUpstreamFailure: (MergeLogicContextBase, InputHandle, Throwable) State[_])
/**
* Will continue to operate until a read becomes unsatisfiable, then it completes.
@ -227,6 +232,8 @@ object FlexiMerge {
*
* The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]]
* that will be used when reading input elements and emitting output elements.
* As response to an input element it is allowed to emit at most one output element.
*
* The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance
* must not hold mutable state, since it may be shared across several materialized ``FlowGraph``
* instances.

View file

@ -85,23 +85,24 @@ object FlexiRoute {
def initialCompletionHandling: CompletionHandling = defaultCompletionHandling
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements
* downstream.
*/
trait RouteLogicContext[Out] {
trait RouteLogicContext[Out] extends RouteLogicContextBase {
/**
* @return `true` if at least one element has been requested by the given downstream (output).
*/
def isDemandAvailable(output: OutputHandle): Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true` for the given `output`, otherwise
* `IllegalArgumentException` is thrown.
* Emit one element downstream. It is only allowed to `emit` at most one element to
* each output in response to `onInput`, `IllegalStateException` is thrown.
*/
def emit(output: OutputHandle, elem: Out): Unit
}
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait RouteLogicContextBase {
/**
* Complete the given downstream successfully.
*/
@ -132,8 +133,8 @@ object FlexiRoute {
* Definition of which outputs that must have requested elements and how to act
* on the read elements. When an element has been read [[#onInput]] is called and
* then it is ensured that the specified downstream outputs have requested at least
* one element, i.e. it is allowed to emit at least one element downstream with
* [[RouteLogicContext#emit]].
* one element, i.e. it is allowed to emit at most one element to each downstream
* output with [[RouteLogicContext#emit]].
*
* The `onInput` function is called when an `element` was read from upstream.
* The function returns next behavior or [[#SameState]] to keep current behavior.
@ -144,7 +145,7 @@ object FlexiRoute {
/**
* Return this from [[State]] `onInput` to use same state for next element.
*/
def SameState[In]: State[In] = sameStateInstance.asInstanceOf[State[In]]
def SameState[T]: State[T] = sameStateInstance.asInstanceOf[State[T]]
private val sameStateInstance = new State[Any](DemandFromAny(Nil))((_, _, _)
throw new UnsupportedOperationException("SameState.onInput should not be called")) {
@ -165,11 +166,14 @@ object FlexiRoute {
*
* The `onDownstreamFinish` function is called when a downstream output cancels.
* It returns next behavior or [[#SameState]] to keep current behavior.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/
sealed case class CompletionHandling(
onUpstreamFinish: RouteLogicContext[Any] Unit,
onUpstreamFailure: (RouteLogicContext[Any], Throwable) Unit,
onDownstreamFinish: (RouteLogicContext[Any], OutputHandle) State[_])
onUpstreamFinish: RouteLogicContextBase Unit,
onUpstreamFailure: (RouteLogicContextBase, Throwable) Unit,
onDownstreamFinish: (RouteLogicContextBase, OutputHandle) State[_])
/**
* When an output cancels it continues with remaining outputs.