!str #16565 Make Flexi* limitations explicit

* remove isDemandAvailable
* hide emit from CompletionHandler context
* throw if more than one emit in response to an input
* had to remove the OrderedMerge test/sample because emitting
  from CompletionHandler is currently not supported
* FlexiRoute and FlexiMerge might become more capable later, see issue 16753
This commit is contained in:
Patrik Nordwall 2015-01-29 15:58:23 +01:00
parent 8095ebb3cc
commit 2740d67c61
14 changed files with 206 additions and 303 deletions

View file

@ -39,7 +39,7 @@ Akka Streams currently provide these junctions:
- ``ZipWith<A,B,...,Out>`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - ``ZipWith<A,B,...,Out>`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output,
- ``Zip<A,B>`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair<A,B>`` stream, - ``Zip<A,B>`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair<A,B>`` stream,
- ``Concat<A>`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - ``Concat<A>`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters,
- ``FlexiMerge<Out>`` (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. - ``FlexiMerge<Out>`` (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL.
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating

View file

@ -312,44 +312,4 @@ class FlexiDocSpec extends AkkaSpec {
}.run() }.run()
} }
"flexi route completion handling emitting element upstream completion" in {
class ElementsAndStatus[A] extends FlexiRoute[A] {
import FlexiRoute._
val out = createOutputPort[A]()
override def createRouteLogic() = new RouteLogic[A] {
override def outputHandles(outputCount: Int) = Vector(out)
// format: OFF
//#flexiroute-completion-upstream-completed-signalling
var buffer: List[A]
//#flexiroute-completion-upstream-completed-signalling
= List[A]()
// format: ON
//#flexiroute-completion-upstream-completed-signalling
def drainBuffer(ctx: RouteLogicContext[Any]): Unit =
while (ctx.isDemandAvailable(out) && buffer.nonEmpty) {
ctx.emit(out, buffer.head)
buffer = buffer.tail
}
val signalStatusOnTermination = CompletionHandling(
onUpstreamFinish = ctx => drainBuffer(ctx),
onUpstreamFailure = (ctx, cause) => drainBuffer(ctx),
onDownstreamFinish = (_, _) => SameState)
//#flexiroute-completion-upstream-completed-signalling
override def initialCompletionHandling = signalStatusOnTermination
override def initialState = State[A](DemandFromAny(out)) {
(ctx, output, element) =>
ctx.emit(output, element)
SameState
}
}
}
}
} }

View file

@ -203,6 +203,9 @@ completion or errors to the merges downstream stage.
The state function must always return the next behaviour to be used when an element should be pulled from its upstreams, The state function must always return the next behaviour to be used when an element should be pulled from its upstreams,
we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed. we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed.
.. note::
As response to an input element it is allowed to emit at most one output element.
Implementing Zip-like merges Implementing Zip-like merges
^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
More complex fan-in junctions may require not only multiple States but also sharing state between those states. More complex fan-in junctions may require not only multiple States but also sharing state between those states.
@ -254,6 +257,9 @@ to this stages downstream effectively shutting down the stream.
In case you want to change back to the default completion handling, it is available as ``MergeLogic#defaultCompletionHandling``. In case you want to change back to the default completion handling, it is available as ``MergeLogic#defaultCompletionHandling``.
It is not possible to emit elements from the completion handling, since completion
handlers may be invoked at any time (without regard to downstream demand being available).
Using FlexiRoute Using FlexiRoute
---------------- ----------------
Similarily to using :class:`FlexiMerge`, implementing custom fan-out stages requires extending the :class:`FlexiRoute` class Similarily to using :class:`FlexiMerge`, implementing custom fan-out stages requires extending the :class:`FlexiRoute` class
@ -284,12 +290,15 @@ of the tuple to the ``right`` stream. Notice that since we are emitting values o
the type parameter of this ``State[_]`` must be set to ``Any``. This type can be utilised more efficiently when a junction the type parameter of this ``State[_]`` must be set to ``Any``. This type can be utilised more efficiently when a junction
is emitting the same type of element to its downstreams e.g. in all *strictly routing* stages. is emitting the same type of element to its downstreams e.g. in all *strictly routing* stages.
The state function must always return the next behaviour to be used when an element should be emited, The state function must always return the next behaviour to be used when an element should be emitted,
we use the special :class:`SameState` object which signals :class:`FlexiRoute` that no state transition is needed. we use the special :class:`SameState` object which signals :class:`FlexiRoute` that no state transition is needed.
.. warning:: .. warning::
While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance
*must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances. *must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances.
.. note::
It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown.
Completion handling Completion handling
^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^
@ -313,14 +322,6 @@ Notice that State changes are only allowed in reaction to downstream cancellatio
cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements
and continue with shutting down the entire stream. and continue with shutting down the entire stream.
Sometimes you may want to emit buffered or additional elements from the completion handler when the stream is shutting down. It is not possible to emit elements from the completion handling, since completion
However calling ``ctx.emit`` is only legal when the stream we emit to *has demand available*. In normal operation, handlers may be invoked at any time (without regard to downstream demand being available).
this is guaranteed by properly using demand conditions, however as completion handlers may be invokead at any time (without
regard to downstream demand being available) we must explicitly check that the downstream has demand available before signalling it.
The completion strategy below assumes that we have implemented some kind of :class:`FlexiRoute` which buffers elements,
yet when its upstream completes it should drain as much as possible to its downstream ``out`` output port. We use the
``ctx.isDemandAvailable(outputHandle)`` method to make sure calling emit with the buffered elements is valid and
complete this flushing once all demand (or the buffer) is drained:
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion-upstream-completed-signalling

View file

@ -39,7 +39,7 @@ Akka Streams currently provide these junctions:
- ``ZipWith[A,B,...,Out]`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - ``ZipWith[A,B,...,Out]`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output,
- ``Zip[A,B]`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream, - ``Zip[A,B]`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream,
- ``Concat[A]`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - ``Concat[A]`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters,
- ``FlexiMerge[Out]`` (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. - ``FlexiMerge[Out]`` (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL.
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating

View file

@ -162,14 +162,21 @@ private[http] object HttpServer {
case (ctx, _, error) { ctx.fail(error); SameState } case (ctx, _, error) { ctx.fail(error); SameState }
}) })
def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = { def finishWithError(ctx: MergeLogicContextBase, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {
log.warning("Illegal {}, responding with status '{}': {}", target, status, info.formatPretty) log.warning("Illegal {}, responding with status '{}': {}", target, status, info.formatPretty)
val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary
ctx.emit(ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true)) // FIXME this is a workaround that is supposed to be solved by issue #16753
ctx match {
case fullCtx: MergeLogicContext
// note that this will throw IllegalArgumentException if no demand available
fullCtx.emit(ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true))
case other throw new IllegalStateException(s"Unexpected MergeLogicContext [${other.getClass.getName}]")
}
//
finish(ctx) finish(ctx)
} }
def finish(ctx: MergeLogicContext): State[Any] = { def finish(ctx: MergeLogicContextBase): State[Any] = {
ctx.finish() // shouldn't this return a `State` rather than `Unit`? ctx.finish() // shouldn't this return a `State` rather than `Unit`?
SameState // it seems weird to stay in the same state after completion SameState // it seems weird to stay in the same state after completion
} }

View file

@ -181,13 +181,13 @@ public class FlexiMergeTest {
private final CompletionHandling<T> emitOtherOnClose = new CompletionHandling<T>() { private final CompletionHandling<T> emitOtherOnClose = new CompletionHandling<T>() {
@Override @Override
public State<T, T> onUpstreamFinish(MergeLogicContext<T> ctx, InputHandle input) { public State<T, T> onUpstreamFinish(MergeLogicContextBase<T> ctx, InputHandle input) {
ctx.changeCompletionHandling(defaultCompletionHandling()); ctx.changeCompletionHandling(defaultCompletionHandling());
return readRemaining(other(input)); return readRemaining(other(input));
} }
@Override @Override
public State<T, T> onUpstreamFailure(MergeLogicContext<T> ctx, InputHandle inputHandle, Throwable cause) { public State<T, T> onUpstreamFailure(MergeLogicContextBase<T> ctx, InputHandle inputHandle, Throwable cause) {
ctx.fail(cause); ctx.fail(cause);
return sameState(); return sameState();
} }

View file

@ -7,8 +7,9 @@ import akka.stream.testkit.StreamTestKit.AutoPublisher
import akka.stream.testkit.StreamTestKit.OnNext import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.PublisherProbe
import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.actor.ActorRef
import akka.testkit.TestProbe
object GraphFlexiMergeSpec { object GraphFlexiMergeSpec {
@ -138,72 +139,6 @@ class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends
} }
} }
class OrderedMerge extends FlexiMerge[Int] {
import FlexiMerge._
val input1 = createInputPort[Int]()
val input2 = createInputPort[Int]()
def createMergeLogic = new MergeLogic[Int] {
private var reference = 0
override def inputHandles(inputCount: Int) = Vector(input1, input2)
val emitOtherOnClose = CompletionHandling(
onUpstreamFinish = { (ctx, input)
ctx.changeCompletionHandling(emitLast)
readRemaining(other(input))
},
onUpstreamFailure = { (ctx, input, cause)
ctx.fail(cause)
SameState
})
def other(input: InputHandle): InputHandle = if (input eq input1) input2 else input1
def getFirstElement = State[Int](ReadAny(input1, input2)) { (ctx, input, element)
reference = element
ctx.changeCompletionHandling(emitOtherOnClose)
readUntilLarger(other(input))
}
def readUntilLarger(input: InputHandle): State[Int] = State[Int](Read(input)) {
(ctx, input, element)
if (element <= reference) {
ctx.emit(element)
SameState
} else {
ctx.emit(reference)
reference = element
readUntilLarger(other(input))
}
}
def readRemaining(input: InputHandle) = State[Int](Read(input)) {
(ctx, input, element)
if (element <= reference)
ctx.emit(element)
else {
ctx.emit(reference)
reference = element
}
SameState
}
val emitLast = CompletionHandling(
onUpstreamFinish = { (ctx, input)
if (ctx.isDemandAvailable)
ctx.emit(reference)
SameState
},
onUpstreamFailure = { (ctx, input, cause)
ctx.fail(cause)
SameState
})
override def initialState = getFirstElement
}
}
class PreferringMerge extends FlexiMerge[Int] { class PreferringMerge extends FlexiMerge[Int] {
import FlexiMerge._ import FlexiMerge._
val preferred = createInputPort[Int]() val preferred = createInputPort[Int]()
@ -221,7 +156,7 @@ class PreferringMerge extends FlexiMerge[Int] {
} }
} }
class TestMerge extends FlexiMerge[String] { class TestMerge(completionProbe: ActorRef) extends FlexiMerge[String] {
import FlexiMerge._ import FlexiMerge._
val input1 = createInputPort[String]() val input1 = createInputPort[String]()
val input2 = createInputPort[String]() val input2 = createInputPort[String]()
@ -254,8 +189,7 @@ class TestMerge extends FlexiMerge[String] {
onUpstreamFinish = { (ctx, input) onUpstreamFinish = { (ctx, input)
if (throwFromOnComplete) if (throwFromOnComplete)
throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
if (ctx.isDemandAvailable) completionProbe ! "onUpstreamFinish: " + input.portIndex
ctx.emit("onUpstreamFinish: " + input.portIndex)
SameState SameState
}, },
onUpstreamFailure = { (ctx, input, cause) onUpstreamFailure = { (ctx, input, cause)
@ -385,54 +319,6 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectComplete() s.expectComplete()
} }
"build simple ordered merge 1" in {
val output = Sink.publisher[Int]
val m = FlowGraph { implicit b
val merge = new OrderedMerge
Source(List(3, 5, 6, 7, 8)) ~> merge.input1
Source(List(1, 2, 4, 9)) ~> merge.input2
merge.out ~> output
}.run()
val s = SubscriberProbe[Int]
val p = m.get(output)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(100)
for (n 1 to 9) {
s.expectNext(n)
}
s.expectComplete()
}
"build simple ordered merge 2" in {
val output = Sink.publisher[Int]
val m = FlowGraph { implicit b
val merge = new OrderedMerge
Source(List(3, 5, 6, 7, 8)) ~> merge.input1
Source(List(3, 5, 6, 7, 8, 10)) ~> merge.input2
merge.out ~> output
}.run()
val s = SubscriberProbe[Int]
val p = m.get(output)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(100)
s.expectNext(3)
s.expectNext(3)
s.expectNext(5)
s.expectNext(5)
s.expectNext(6)
s.expectNext(6)
s.expectNext(7)
s.expectNext(7)
s.expectNext(8)
s.expectNext(8)
s.expectNext(10)
s.expectComplete()
}
"build perferring merge" in { "build perferring merge" in {
val output = Sink.publisher[Int] val output = Sink.publisher[Int]
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
@ -523,8 +409,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
"support cancel of input" in { "support cancel of input" in {
val publisher = PublisherProbe[String] val publisher = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(publisher) ~> merge.input1 Source(publisher) ~> merge.input1
Source(List("b", "c", "d")) ~> merge.input2 Source(List("b", "c", "d")) ~> merge.input2
Source(List("e", "f")) ~> merge.input3 Source(List("e", "f")) ~> merge.input3
@ -546,9 +433,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectNext("onInput: e") s.expectNext("onInput: e")
s.expectNext("onInput: c") s.expectNext("onInput: c")
s.expectNext("onInput: f") s.expectNext("onInput: f")
s.expectNext("onUpstreamFinish: 2") completionProbe.expectMsg("onUpstreamFinish: 2")
s.expectNext("onInput: d") s.expectNext("onInput: d")
s.expectNext("onUpstreamFinish: 1") completionProbe.expectMsg("onUpstreamFinish: 1")
autoPublisher.sendNext("x") autoPublisher.sendNext("x")
@ -559,8 +446,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val publisher1 = PublisherProbe[String] val publisher1 = PublisherProbe[String]
val publisher2 = PublisherProbe[String] val publisher2 = PublisherProbe[String]
val publisher3 = PublisherProbe[String] val publisher3 = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(publisher1) ~> merge.input1 Source(publisher1) ~> merge.input1
Source(publisher2) ~> merge.input2 Source(publisher2) ~> merge.input2
Source(publisher3) ~> merge.input3 Source(publisher3) ~> merge.input3
@ -592,8 +480,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
} }
"handle failure" in { "handle failure" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1 Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1
Source(List("a", "b")) ~> merge.input2 Source(List("a", "b")) ~> merge.input2
Source(List("c")) ~> merge.input3 Source(List("c")) ~> merge.input3
@ -608,16 +497,17 @@ class GraphFlexiMergeSpec extends AkkaSpec {
// IllegalArgumentException is swallowed by the CompletionHandler // IllegalArgumentException is swallowed by the CompletionHandler
s.expectNext("onInput: a") s.expectNext("onInput: a")
s.expectNext("onInput: c") s.expectNext("onInput: c")
s.expectNext("onUpstreamFinish: 2") completionProbe.expectMsg("onUpstreamFinish: 2")
s.expectNext("onInput: b") s.expectNext("onInput: b")
s.expectNext("onUpstreamFinish: 1") completionProbe.expectMsg("onUpstreamFinish: 1")
s.expectComplete() s.expectComplete()
} }
"propagate failure" in { "propagate failure" in {
val publisher = PublisherProbe[String] val publisher = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(publisher) ~> merge.input1 Source(publisher) ~> merge.input1
Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.input2 Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
@ -631,8 +521,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
} }
"emit failure" in { "emit failure" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(List("a", "err")) ~> merge.input1 Source(List("a", "err")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
@ -650,8 +541,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
} }
"emit failure for user thrown exception" in { "emit failure for user thrown exception" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(List("a", "exc")) ~> merge.input1 Source(List("a", "exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
@ -669,8 +561,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
} }
"emit failure for user thrown exception in onUpstreamFinish" in { "emit failure for user thrown exception in onUpstreamFinish" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(List("a", "onUpstreamFinish-exc")) ~> merge.input1 Source(List("a", "onUpstreamFinish-exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
@ -689,8 +582,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
"emit failure for user thrown exception in onUpstreamFinish 2" in { "emit failure for user thrown exception in onUpstreamFinish 2" in {
val publisher = PublisherProbe[String] val publisher = PublisherProbe[String]
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source.empty[String] ~> merge.input1 Source.empty[String] ~> merge.input1
Source(publisher) ~> merge.input2 Source(publisher) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
@ -713,8 +607,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
} }
"support finish from onInput" in { "support finish from onInput" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(List("a", "finish")) ~> merge.input1 Source(List("a", "finish")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
@ -732,8 +627,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
} }
"support unconnected inputs" in { "support unconnected inputs" in {
val completionProbe = TestProbe()
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge(completionProbe.ref)
Source(List("a")) ~> merge.input1 Source(List("a")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
// input3 not connected // input3 not connected
@ -746,10 +642,10 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(10) sub.request(10)
s.expectNext("onInput: a") s.expectNext("onInput: a")
s.expectNext("onUpstreamFinish: 0") completionProbe.expectMsg("onUpstreamFinish: 0")
s.expectNext("onInput: b") s.expectNext("onInput: b")
s.expectNext("onInput: c") s.expectNext("onInput: c")
s.expectNext("onUpstreamFinish: 1") completionProbe.expectMsg("onUpstreamFinish: 1")
s.expectComplete() s.expectComplete()
} }

View file

@ -10,6 +10,8 @@ import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.PublisherProbe
import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.testkit.TestProbe
object GraphFlexiRouteSpec { object GraphFlexiRouteSpec {
@ -89,7 +91,7 @@ object GraphFlexiRouteSpec {
} }
} }
class TestRoute extends FlexiRoute[String] { class TestRoute(completionProbe: ActorRef) extends FlexiRoute[String] {
import FlexiRoute._ import FlexiRoute._
val output1 = createOutputPort[String]() val output1 = createOutputPort[String]()
val output2 = createOutputPort[String]() val output2 = createOutputPort[String]()
@ -122,26 +124,17 @@ object GraphFlexiRouteSpec {
onUpstreamFinish = { ctx onUpstreamFinish = { ctx
if (throwFromOnComplete) if (throwFromOnComplete)
throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
handles.foreach { output completionProbe ! "onUpstreamFinish"
if (ctx.isDemandAvailable(output))
ctx.emit(output, "onUpstreamFinish")
}
}, },
onUpstreamFailure = { (ctx, cause) onUpstreamFailure = { (ctx, cause)
cause match { cause match {
case _: IllegalArgumentException // swallow case _: IllegalArgumentException // swallow
case _ case _
handles.foreach { output completionProbe ! "onError"
if (ctx.isDemandAvailable(output))
ctx.emit(output, "onError")
}
} }
}, },
onDownstreamFinish = { (ctx, cancelledOutput) onDownstreamFinish = { (ctx, cancelledOutput)
handles.foreach { output completionProbe ! "onDownstreamFinish: " + cancelledOutput.portIndex
if (output != cancelledOutput && ctx.isDemandAvailable(output))
ctx.emit(output, "onDownstreamFinish: " + cancelledOutput.portIndex)
}
SameState SameState
}) })
} }
@ -151,8 +144,9 @@ object GraphFlexiRouteSpec {
val publisher = PublisherProbe[String] val publisher = PublisherProbe[String]
val s1 = SubscriberProbe[String] val s1 = SubscriberProbe[String]
val s2 = SubscriberProbe[String] val s2 = SubscriberProbe[String]
val completionProbe = TestProbe()
FlowGraph { implicit b FlowGraph { implicit b
val route = new TestRoute val route = new TestRoute(completionProbe.ref)
Source(publisher) ~> route.in Source(publisher) ~> route.in
route.output1 ~> Sink(s1) route.output1 ~> Sink(s1)
route.output2 ~> Sink(s2) route.output2 ~> Sink(s2)
@ -168,7 +162,6 @@ object GraphFlexiRouteSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class GraphFlexiRouteSpec extends AkkaSpec { class GraphFlexiRouteSpec extends AkkaSpec {
import GraphFlexiRouteSpec._ import GraphFlexiRouteSpec._
@ -316,7 +309,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
s1.expectError().getMessage should be("err-1") s1.expectError().getMessage should be("err-1")
autoPublisher.sendComplete() autoPublisher.sendComplete()
s2.expectNext("onUpstreamFinish") completionProbe.expectMsg("onUpstreamFinish")
s2.expectComplete() s2.expectComplete()
} }
@ -370,7 +363,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
sub1.cancel() sub1.cancel()
s2.expectNext("onDownstreamFinish: 0") completionProbe.expectMsg("onDownstreamFinish: 0")
s1.expectNoMsg(200.millis) s1.expectNoMsg(200.millis)
autoPublisher.sendNext("c") autoPublisher.sendNext("c")
@ -393,8 +386,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
autoPublisher.sendComplete() autoPublisher.sendComplete()
s1.expectNext("onUpstreamFinish") completionProbe.expectMsg("onUpstreamFinish")
s2.expectNext("onUpstreamFinish")
s1.expectComplete() s1.expectComplete()
s2.expectComplete() s2.expectComplete()
@ -413,8 +405,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace) autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace)
s1.expectNext("onError") completionProbe.expectMsg("onError")
s2.expectNext("onError")
s1.expectError().getMessage should be("test err") s1.expectError().getMessage should be("test err")
s2.expectError().getMessage should be("test err") s2.expectError().getMessage should be("test err")
@ -433,7 +424,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
sub1.cancel() sub1.cancel()
s2.expectNext("onDownstreamFinish: 0") completionProbe.expectMsg("onDownstreamFinish: 0")
sub2.cancel() sub2.cancel()
autoPublisher.subscription.expectCancellation() autoPublisher.subscription.expectCancellation()

View file

@ -41,6 +41,8 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
private var behavior: StateT = _ private var behavior: StateT = _
private var completion: CompletionT = _ private var completion: CompletionT = _
// needed to ensure that at most one element is emitted from onInput
private var emitted = false
override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) { override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) {
override def onError(input: Int, e: Throwable): Unit = { override def onError(input: Int, e: Throwable): Unit = {
@ -56,10 +58,12 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
} }
private val ctx: mergeLogic.MergeLogicContext = new mergeLogic.MergeLogicContext { private val ctx: mergeLogic.MergeLogicContext = new mergeLogic.MergeLogicContext {
override def isDemandAvailable: Boolean = primaryOutputs.demandAvailable
override def emit(elem: Any): Unit = { override def emit(elem: Any): Unit = {
if (emitted)
throw new IllegalStateException("It is only allowed to `emit` zero or one element in response to `onInput`")
require(primaryOutputs.demandAvailable, "emit not allowed when no demand available") require(primaryOutputs.demandAvailable, "emit not allowed when no demand available")
emitted = true
primaryOutputs.enqueueOutputElement(elem) primaryOutputs.enqueueOutputElement(elem)
} }
@ -131,17 +135,17 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
val id = inputBunch.idToDequeue() val id = inputBunch.idToDequeue()
val elem = inputBunch.dequeueAndYield(id) val elem = inputBunch.dequeueAndYield(id)
val inputHandle = inputMapping(id) val inputHandle = inputMapping(id)
changeBehavior(behavior.onInput(ctx, inputHandle, elem)) callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle) triggerCompletionAfterRead(inputHandle)
case read: ReadPreferred case read: ReadPreferred
val id = inputBunch.idToDequeue() val id = inputBunch.idToDequeue()
val elem = inputBunch.dequeueAndPrefer(id) val elem = inputBunch.dequeueAndPrefer(id)
val inputHandle = inputMapping(id) val inputHandle = inputMapping(id)
changeBehavior(behavior.onInput(ctx, inputHandle, elem)) callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle) triggerCompletionAfterRead(inputHandle)
case Read(inputHandle) case Read(inputHandle)
val elem = inputBunch.dequeue(inputHandle.portIndex) val elem = inputBunch.dequeue(inputHandle.portIndex)
changeBehavior(behavior.onInput(ctx, inputHandle, elem)) callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle) triggerCompletionAfterRead(inputHandle)
case read: ReadAll case read: ReadAll
val inputHandles = read.inputs val inputHandles = read.inputs
@ -150,7 +154,7 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
case input if include(input.portIndex) input inputBunch.dequeue(input.portIndex) case input if include(input.portIndex) input inputBunch.dequeue(input.portIndex)
} }
changeBehavior(behavior.onInput(ctx, inputHandles.head, read.mkResult(Map(values: _*)))) callOnInput(inputHandles.head, read.mkResult(Map(values: _*)))
// must be triggered after emitting the accumulated out value // must be triggered after emitting the accumulated out value
triggerCompletionAfterRead(inputHandles) triggerCompletionAfterRead(inputHandles)
@ -158,6 +162,11 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings,
}) })
private def callOnInput(input: InputHandle, element: Any): Unit = {
emitted = false
changeBehavior(behavior.onInput(ctx, input, element))
}
private def triggerCompletionAfterRead(inputs: Seq[InputHandle]): Unit = { private def triggerCompletionAfterRead(inputs: Seq[InputHandle]): Unit = {
var j = 0 var j = 0
while (j < inputs.length) { while (j < inputs.length) {

View file

@ -42,6 +42,8 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings,
private var behavior: StateT = _ private var behavior: StateT = _
private var completion: CompletionT = _ private var completion: CompletionT = _
// needed to ensure that at most one element is emitted from onInput
private val emitted = Array.ofDim[Boolean](outputCount)
override protected val outputBunch = new OutputBunch(outputPorts, self, this) { override protected val outputBunch = new OutputBunch(outputPorts, self, this) {
override def onCancel(output: Int): Unit = override def onCancel(output: Int): Unit =
@ -64,11 +66,14 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings,
} }
private val ctx: routeLogic.RouteLogicContext[Any] = new routeLogic.RouteLogicContext[Any] { private val ctx: routeLogic.RouteLogicContext[Any] = new routeLogic.RouteLogicContext[Any] {
override def isDemandAvailable(output: OutputHandle): Boolean =
(output.portIndex < outputCount) && outputBunch.isPending(output.portIndex)
override def emit(output: OutputHandle, elem: Any): Unit = { override def emit(output: OutputHandle, elem: Any): Unit = {
require(outputBunch.isPending(output.portIndex), s"emit to [$output] not allowed when no demand available") require(output.portIndex < outputCount, s"invalid output port index [${output.portIndex}, max index [${outputCount - 1}]")
if (emitted(output.portIndex))
throw new IllegalStateException("It is only allowed to `emit` at most one element to each output in response to `onInput`")
require(outputBunch.isPending(output.portIndex),
s"emit to [$output] not allowed when no demand available")
emitted(output.portIndex) = true
outputBunch.enqueue(output.portIndex, elem) outputBunch.enqueue(output.portIndex, elem)
} }
@ -138,18 +143,27 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings,
case any: DemandFromAny case any: DemandFromAny
val id = outputBunch.idToEnqueueAndYield() val id = outputBunch.idToEnqueueAndYield()
val outputHandle = outputMapping(id) val outputHandle = outputMapping(id)
changeBehavior(behavior.onInput(ctx, outputHandle, elem)) callOnInput(outputHandle, elem)
case DemandFrom(outputHandle) case DemandFrom(outputHandle)
changeBehavior(behavior.onInput(ctx, outputHandle, elem)) callOnInput(outputHandle, elem)
case all: DemandFromAll case all: DemandFromAll
val id = outputBunch.idToEnqueueAndYield() val id = outputBunch.idToEnqueueAndYield()
val outputHandle = outputMapping(id) val outputHandle = outputMapping(id)
changeBehavior(behavior.onInput(ctx, outputHandle, elem)) callOnInput(outputHandle, elem)
} }
}) })
private def callOnInput(output: OutputHandle, element: Any): Unit = {
var i = 0
while (i < emitted.length) {
emitted(i) = false
i += 1
}
changeBehavior(behavior.onInput(ctx, output, element))
}
} }

View file

@ -85,7 +85,7 @@ object FlexiMerge {
* fulfilled when there are elements for *all* of the given upstream * fulfilled when there are elements for *all* of the given upstream
* inputs. * inputs.
* *
* The emited element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge. * The emitted element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge.
* *
* Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`, * Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`,
* the resulting [[ReadAllInputs]] will then not contain values for this element, which can be * the resulting [[ReadAllInputs]] will then not contain values for this element, which can be
@ -105,25 +105,27 @@ object FlexiMerge {
} }
/** /**
* Context that is passed to the methods of [[State]] and [[CompletionHandling]]. * Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements * The context provides means for performing side effects, such as emitting elements
* downstream. * downstream.
*/ */
trait MergeLogicContext[Out] { trait MergeLogicContext[Out] extends MergeLogicContextBase[Out] {
/** /**
* @return `true` if at least one element has been requested by downstream (output). * Emit one element downstream. It is only allowed to `emit` zero or one
*/ * element in response to `onInput`, otherwise `IllegalStateException`
def isDemandAvailable: Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException`
* is thrown. * is thrown.
*/ */
def emit(elem: Out): Unit def emit(elem: Out): Unit
}
/**
* Context that is passed to the functions of [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait MergeLogicContextBase[Out] {
/** /**
* Complete this stream succesfully. Upstream subscriptions will be cancelled. * Complete this stream successfully. Upstream subscriptions will be cancelled.
*/ */
def finish(): Unit def finish(): Unit
@ -148,17 +150,20 @@ object FlexiMerge {
* *
* The `onUpstreamFinish` method is called when an upstream input was completed sucessfully. * The `onUpstreamFinish` method is called when an upstream input was completed sucessfully.
* It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#finish]], * A completion can be propagated downstream with [[MergeLogicContextBase#finish]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
* *
* The `onUpstreamFailure` method is called when an upstream input was completed with failure. * The `onUpstreamFailure` method is called when an upstream input was completed with failure.
* It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior.
* A failure can be propagated downstream with [[MergeLogicContext#fail]], * A failure can be propagated downstream with [[MergeLogicContextBase#fail]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/ */
abstract class CompletionHandling[Out] { abstract class CompletionHandling[Out] {
def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out] def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[_, Out]
def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out] def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[_, Out]
} }
/** /**
@ -224,9 +229,9 @@ object FlexiMerge {
*/ */
def defaultCompletionHandling[A]: CompletionHandling[Out] = def defaultCompletionHandling[A]: CompletionHandling[Out] =
new CompletionHandling[Out] { new CompletionHandling[Out] {
override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = override def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[A, Out] =
sameState sameState
override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { override def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.fail(cause) ctx.fail(cause)
sameState sameState
} }
@ -238,11 +243,11 @@ object FlexiMerge {
*/ */
def eagerClose[A]: CompletionHandling[Out] = def eagerClose[A]: CompletionHandling[Out] =
new CompletionHandling[Out] { new CompletionHandling[Out] {
override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = { override def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[A, Out] = {
ctx.finish() ctx.finish()
sameState sameState
} }
override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { override def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.fail(cause) ctx.fail(cause)
sameState sameState
} }
@ -283,13 +288,15 @@ object FlexiMerge {
delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling = delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling =
CompletionHandling( CompletionHandling(
onUpstreamFinish = (ctx, inputHandle) { onUpstreamFinish = (ctx, inputHandle) {
val widenedCtxt = ctx.asInstanceOf[MergeLogicContext] // we know that it is always a MergeLogicContext
val newDelegateState = delegateCompletionHandling.onUpstreamFinish( val newDelegateState = delegateCompletionHandling.onUpstreamFinish(
new MergeLogicContextWrapper(ctx), asJava(inputHandle)) new MergeLogicContextWrapper(widenedCtxt), asJava(inputHandle))
wrapState(newDelegateState) wrapState(newDelegateState)
}, },
onUpstreamFailure = (ctx, inputHandle, cause) { onUpstreamFailure = (ctx, inputHandle, cause) {
val widenedCtxt = ctx.asInstanceOf[MergeLogicContext] // we know that it is always a MergeLogicContext
val newDelegateState = delegateCompletionHandling.onUpstreamFailure( val newDelegateState = delegateCompletionHandling.onUpstreamFailure(
new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause) new MergeLogicContextWrapper(widenedCtxt), asJava(inputHandle), cause)
wrapState(newDelegateState) wrapState(newDelegateState)
}) })
@ -297,7 +304,6 @@ object FlexiMerge {
inputHandle.asInstanceOf[InputHandle] inputHandle.asInstanceOf[InputHandle]
class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] { class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] {
override def isDemandAvailable: Boolean = delegate.isDemandAvailable
override def emit(elem: Out): Unit = delegate.emit(elem) override def emit(elem: Out): Unit = delegate.emit(elem)
override def finish(): Unit = delegate.finish() override def finish(): Unit = delegate.finish()
override def fail(cause: Throwable): Unit = delegate.fail(cause) override def fail(cause: Throwable): Unit = delegate.fail(cause)
@ -328,6 +334,8 @@ object FlexiMerge {
* *
* The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]] * The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]]
* that will be used when reading input elements and emitting output elements. * that will be used when reading input elements and emitting output elements.
* As response to an input element it is allowed to emit at most one output element.
*
* The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance * The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance
* must not hold mutable state, since it may be shared across several materialized ``FlowGraph`` * must not hold mutable state, since it may be shared across several materialized ``FlowGraph``
* instances. * instances.

View file

@ -75,23 +75,24 @@ object FlexiRoute {
class DemandFromAll(val outputs: JList[OutputHandle]) extends DemandCondition class DemandFromAll(val outputs: JList[OutputHandle]) extends DemandCondition
/** /**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]]. * Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements * The context provides means for performing side effects, such as emitting elements
* downstream. * downstream.
*/ */
trait RouteLogicContext[In, Out] { trait RouteLogicContext[In, Out] extends RouteLogicContextBase[In] {
/** /**
* @return `true` if at least one element has been requested by the given downstream (output). * Emit one element downstream. It is only allowed to `emit` at most one element to
*/ * each output in response to `onInput`, `IllegalStateException` is thrown.
def isDemandAvailable(output: OutputHandle): Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true` for the given `output`, otherwise
* `IllegalArgumentException` is thrown.
*/ */
def emit(output: OutputHandle, elem: Out): Unit def emit(output: OutputHandle, elem: Out): Unit
}
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait RouteLogicContextBase[In] {
/** /**
* Complete the given downstream successfully. * Complete the given downstream successfully.
*/ */
@ -130,19 +131,22 @@ object FlexiRoute {
* *
* The `onDownstreamFinish` method is called when a downstream output cancels. * The `onDownstreamFinish` method is called when a downstream output cancels.
* It returns next behavior or [[#sameState]] to keep current behavior. * It returns next behavior or [[#sameState]] to keep current behavior.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/ */
abstract class CompletionHandling[In] { abstract class CompletionHandling[In] {
def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit
def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit
def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _]
} }
/** /**
* Definition of which outputs that must have requested elements and how to act * Definition of which outputs that must have requested elements and how to act
* on the read elements. When an element has been read [[#onInput]] is called and * on the read elements. When an element has been read [[#onInput]] is called and
* then it is ensured that the specified downstream outputs have requested at least * then it is ensured that the specified downstream outputs have requested at least
* one element, i.e. it is allowed to emit at least one element downstream with * one element, i.e. it is allowed to emit at most one element to each downstream
* [[RouteLogicContext#emit]]. * output with [[RouteLogicContext#emit]].
* *
* The `onInput` method is called when an `element` was read from upstream. * The `onInput` method is called when an `element` was read from upstream.
* The function returns next behavior or [[#sameState]] to keep current behavior. * The function returns next behavior or [[#sameState]] to keep current behavior.
@ -195,9 +199,9 @@ object FlexiRoute {
*/ */
def defaultCompletionHandling: CompletionHandling[In] = def defaultCompletionHandling: CompletionHandling[In] =
new CompletionHandling[In] { new CompletionHandling[In] {
override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = () override def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit = ()
override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () override def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit = ()
override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = override def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] =
sameState sameState
} }
@ -207,9 +211,9 @@ object FlexiRoute {
*/ */
def eagerClose[A]: CompletionHandling[In] = def eagerClose[A]: CompletionHandling[In] =
new CompletionHandling[In] { new CompletionHandling[In] {
override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = () override def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit = ()
override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () override def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit = ()
override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = { override def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] = {
ctx.finish() ctx.finish()
sameState sameState
} }
@ -250,14 +254,17 @@ object FlexiRoute {
delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling = delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling =
CompletionHandling( CompletionHandling(
onUpstreamFinish = ctx { onUpstreamFinish = ctx {
delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(ctx)) val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext
delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(widenedCtxt))
}, },
onUpstreamFailure = (ctx, cause) { onUpstreamFailure = (ctx, cause) {
delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(ctx), cause) val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext
delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(widenedCtxt), cause)
}, },
onDownstreamFinish = (ctx, outputHandle) { onDownstreamFinish = (ctx, outputHandle) {
val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext
val newDelegateState = delegateCompletionHandling.onDownstreamFinish( val newDelegateState = delegateCompletionHandling.onDownstreamFinish(
new RouteLogicContextWrapper(ctx), asJava(outputHandle)) new RouteLogicContextWrapper(widenedCtxt), asJava(outputHandle))
wrapState(newDelegateState) wrapState(newDelegateState)
}) })
@ -265,7 +272,6 @@ object FlexiRoute {
outputHandle.asInstanceOf[OutputHandle] outputHandle.asInstanceOf[OutputHandle]
class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] { class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] {
override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output)
override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem) override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem)
override def finish(): Unit = delegate.finish() override def finish(): Unit = delegate.finish()
override def finish(output: OutputHandle): Unit = delegate.finish(output) override def finish(output: OutputHandle): Unit = delegate.finish(output)

View file

@ -89,7 +89,7 @@ object FlexiMerge {
* fulfilled when there are elements for *all* of the given upstream * fulfilled when there are elements for *all* of the given upstream
* inputs. * inputs.
* *
* The emited element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge. * The emitted element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge.
* *
* Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`, * Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`,
* the resulting [[ReadAllInputs]] will then not contain values for this element, which can be * the resulting [[ReadAllInputs]] will then not contain values for this element, which can be
@ -119,23 +119,25 @@ object FlexiMerge {
def initialCompletionHandling: CompletionHandling = defaultCompletionHandling def initialCompletionHandling: CompletionHandling = defaultCompletionHandling
/** /**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]]. * Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements * The context provides means for performing side effects, such as emitting elements
* downstream. * downstream.
*/ */
trait MergeLogicContext { trait MergeLogicContext extends MergeLogicContextBase {
/** /**
* @return `true` if at least one element has been requested by downstream (output). * Emit one element downstream. It is only allowed to `emit` zero or one
*/ * element in response to `onInput`, otherwise `IllegalStateException`
def isDemandAvailable: Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException`
* is thrown. * is thrown.
*/ */
def emit(elem: Out): Unit def emit(elem: Out): Unit
}
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait MergeLogicContextBase {
/** /**
* Complete this stream successfully. Upstream subscriptions will be cancelled. * Complete this stream successfully. Upstream subscriptions will be cancelled.
*/ */
@ -161,7 +163,7 @@ object FlexiMerge {
* Definition of which inputs to read from and how to act on the read elements. * Definition of which inputs to read from and how to act on the read elements.
* When an element has been read [[#onInput]] is called and then it is ensured * When an element has been read [[#onInput]] is called and then it is ensured
* that downstream has requested at least one element, i.e. it is allowed to * that downstream has requested at least one element, i.e. it is allowed to
* emit at least one element downstream with [[MergeLogicContext#emit]]. * emit at most one element downstream with [[MergeLogicContext#emit]].
* *
* The `onInput` function is called when an `element` was read from the `input`. * The `onInput` function is called when an `element` was read from the `input`.
* The function returns next behavior or [[#SameState]] to keep current behavior. * The function returns next behavior or [[#SameState]] to keep current behavior.
@ -188,17 +190,20 @@ object FlexiMerge {
* *
* The `onUpstreamFinish` function is called when an upstream input was completed successfully. * The `onUpstreamFinish` function is called when an upstream input was completed successfully.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#finish]], * A completion can be propagated downstream with [[MergeLogicContextBase#finish]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
* *
* The `onUpstreamFailure` function is called when an upstream input was completed with failure. * The `onUpstreamFailure` function is called when an upstream input was completed with failure.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
* A failure can be propagated downstream with [[MergeLogicContext#fail]], * A failure can be propagated downstream with [[MergeLogicContextBase#fail]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/ */
sealed case class CompletionHandling( sealed case class CompletionHandling(
onUpstreamFinish: (MergeLogicContext, InputHandle) State[_], onUpstreamFinish: (MergeLogicContextBase, InputHandle) State[_],
onUpstreamFailure: (MergeLogicContext, InputHandle, Throwable) State[_]) onUpstreamFailure: (MergeLogicContextBase, InputHandle, Throwable) State[_])
/** /**
* Will continue to operate until a read becomes unsatisfiable, then it completes. * Will continue to operate until a read becomes unsatisfiable, then it completes.
@ -227,6 +232,8 @@ object FlexiMerge {
* *
* The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]] * The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]]
* that will be used when reading input elements and emitting output elements. * that will be used when reading input elements and emitting output elements.
* As response to an input element it is allowed to emit at most one output element.
*
* The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance * The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance
* must not hold mutable state, since it may be shared across several materialized ``FlowGraph`` * must not hold mutable state, since it may be shared across several materialized ``FlowGraph``
* instances. * instances.

View file

@ -85,23 +85,24 @@ object FlexiRoute {
def initialCompletionHandling: CompletionHandling = defaultCompletionHandling def initialCompletionHandling: CompletionHandling = defaultCompletionHandling
/** /**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]]. * Context that is passed to the `onInput` function of [[State]].
* The context provides means for performing side effects, such as emitting elements * The context provides means for performing side effects, such as emitting elements
* downstream. * downstream.
*/ */
trait RouteLogicContext[Out] { trait RouteLogicContext[Out] extends RouteLogicContextBase {
/** /**
* @return `true` if at least one element has been requested by the given downstream (output). * Emit one element downstream. It is only allowed to `emit` at most one element to
*/ * each output in response to `onInput`, `IllegalStateException` is thrown.
def isDemandAvailable(output: OutputHandle): Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true` for the given `output`, otherwise
* `IllegalArgumentException` is thrown.
*/ */
def emit(output: OutputHandle, elem: Out): Unit def emit(output: OutputHandle, elem: Out): Unit
}
/**
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as completing
* the stream successfully or with failure.
*/
trait RouteLogicContextBase {
/** /**
* Complete the given downstream successfully. * Complete the given downstream successfully.
*/ */
@ -132,8 +133,8 @@ object FlexiRoute {
* Definition of which outputs that must have requested elements and how to act * Definition of which outputs that must have requested elements and how to act
* on the read elements. When an element has been read [[#onInput]] is called and * on the read elements. When an element has been read [[#onInput]] is called and
* then it is ensured that the specified downstream outputs have requested at least * then it is ensured that the specified downstream outputs have requested at least
* one element, i.e. it is allowed to emit at least one element downstream with * one element, i.e. it is allowed to emit at most one element to each downstream
* [[RouteLogicContext#emit]]. * output with [[RouteLogicContext#emit]].
* *
* The `onInput` function is called when an `element` was read from upstream. * The `onInput` function is called when an `element` was read from upstream.
* The function returns next behavior or [[#SameState]] to keep current behavior. * The function returns next behavior or [[#SameState]] to keep current behavior.
@ -144,7 +145,7 @@ object FlexiRoute {
/** /**
* Return this from [[State]] `onInput` to use same state for next element. * Return this from [[State]] `onInput` to use same state for next element.
*/ */
def SameState[In]: State[In] = sameStateInstance.asInstanceOf[State[In]] def SameState[T]: State[T] = sameStateInstance.asInstanceOf[State[T]]
private val sameStateInstance = new State[Any](DemandFromAny(Nil))((_, _, _) private val sameStateInstance = new State[Any](DemandFromAny(Nil))((_, _, _)
throw new UnsupportedOperationException("SameState.onInput should not be called")) { throw new UnsupportedOperationException("SameState.onInput should not be called")) {
@ -165,11 +166,14 @@ object FlexiRoute {
* *
* The `onDownstreamFinish` function is called when a downstream output cancels. * The `onDownstreamFinish` function is called when a downstream output cancels.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
*
* It is not possible to emit elements from the completion handling, since completion
* handlers may be invoked at any time (without regard to downstream demand being available).
*/ */
sealed case class CompletionHandling( sealed case class CompletionHandling(
onUpstreamFinish: RouteLogicContext[Any] Unit, onUpstreamFinish: RouteLogicContextBase Unit,
onUpstreamFailure: (RouteLogicContext[Any], Throwable) Unit, onUpstreamFailure: (RouteLogicContextBase, Throwable) Unit,
onDownstreamFinish: (RouteLogicContext[Any], OutputHandle) State[_]) onDownstreamFinish: (RouteLogicContextBase, OutputHandle) State[_])
/** /**
* When an output cancels it continues with remaining outputs. * When an output cancels it continues with remaining outputs.