From 2740d67c61b5a7554d0eedca4a84fc244008e757 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 29 Jan 2015 15:58:23 +0100 Subject: [PATCH] !str #16565 Make Flexi* limitations explicit * remove isDemandAvailable * hide emit from CompletionHandler context * throw if more than one emit in response to an input * had to remove the OrderedMerge test/sample because emitting from CompletionHandler is currently not supported * FlexiRoute and FlexiMerge might become more capable later, see issue 16753 --- akka-docs-dev/rst/java/stream-graphs.rst | 2 +- .../scala/code/docs/stream/FlexiDocSpec.scala | 40 ----- akka-docs-dev/rst/scala/stream-customize.rst | 23 +-- akka-docs-dev/rst/scala/stream-graphs.rst | 2 +- .../akka/http/engine/server/HttpServer.scala | 13 +- .../akka/stream/javadsl/FlexiMergeTest.java | 4 +- .../stream/scaladsl/GraphFlexiMergeSpec.scala | 164 ++++-------------- .../stream/scaladsl/GraphFlexiRouteSpec.scala | 35 ++-- .../akka/stream/impl/FlexiMergeImpl.scala | 19 +- .../akka/stream/impl/FlexiRouteImpl.scala | 26 ++- .../akka/stream/javadsl/FlexiMerge.scala | 52 +++--- .../akka/stream/javadsl/FlexiRoute.scala | 56 +++--- .../akka/stream/scaladsl/FlexiMerge.scala | 37 ++-- .../akka/stream/scaladsl/FlexiRoute.scala | 36 ++-- 14 files changed, 206 insertions(+), 303 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index a2ab83a929..5a84168b59 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -39,7 +39,7 @@ Akka Streams currently provide these junctions: - ``ZipWith`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - ``Zip`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair`` stream, - ``Concat`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - - ``FlexiMerge`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + - ``FlexiMerge`` – (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL. One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala index be009c603b..546d8268b4 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala @@ -312,44 +312,4 @@ class FlexiDocSpec extends AkkaSpec { }.run() } - "flexi route completion handling emitting element upstream completion" in { - class ElementsAndStatus[A] extends FlexiRoute[A] { - import FlexiRoute._ - val out = createOutputPort[A]() - - override def createRouteLogic() = new RouteLogic[A] { - override def outputHandles(outputCount: Int) = Vector(out) - - // format: OFF - //#flexiroute-completion-upstream-completed-signalling - var buffer: List[A] - //#flexiroute-completion-upstream-completed-signalling - = List[A]() - // format: ON - - //#flexiroute-completion-upstream-completed-signalling - - def drainBuffer(ctx: RouteLogicContext[Any]): Unit = - while (ctx.isDemandAvailable(out) && buffer.nonEmpty) { - ctx.emit(out, buffer.head) - buffer = buffer.tail - } - - val signalStatusOnTermination = CompletionHandling( - onUpstreamFinish = ctx => drainBuffer(ctx), - onUpstreamFailure = (ctx, cause) => drainBuffer(ctx), - onDownstreamFinish = (_, _) => SameState) - //#flexiroute-completion-upstream-completed-signalling - - override def initialCompletionHandling = signalStatusOnTermination - - override def initialState = State[A](DemandFromAny(out)) { - (ctx, output, element) => - ctx.emit(output, element) - SameState - } - } - } - } - } diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index 82ff1571f7..08b6916d2a 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -203,6 +203,9 @@ completion or errors to the merges downstream stage. The state function must always return the next behaviour to be used when an element should be pulled from its upstreams, we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed. +.. note:: + As response to an input element it is allowed to emit at most one output element. + Implementing Zip-like merges ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ More complex fan-in junctions may require not only multiple States but also sharing state between those states. @@ -254,6 +257,9 @@ to this stages downstream effectively shutting down the stream. In case you want to change back to the default completion handling, it is available as ``MergeLogic#defaultCompletionHandling``. +It is not possible to emit elements from the completion handling, since completion +handlers may be invoked at any time (without regard to downstream demand being available). + Using FlexiRoute ---------------- Similarily to using :class:`FlexiMerge`, implementing custom fan-out stages requires extending the :class:`FlexiRoute` class @@ -284,12 +290,15 @@ of the tuple to the ``right`` stream. Notice that since we are emitting values o the type parameter of this ``State[_]`` must be set to ``Any``. This type can be utilised more efficiently when a junction is emitting the same type of element to its downstreams e.g. in all *strictly routing* stages. -The state function must always return the next behaviour to be used when an element should be emited, +The state function must always return the next behaviour to be used when an element should be emitted, we use the special :class:`SameState` object which signals :class:`FlexiRoute` that no state transition is needed. .. warning:: While a :class:`RouteLogic` instance *may* be stateful, the :class:`FlexiRoute` instance *must not* hold any mutable state, since it may be shared across several materialized ``FlowGraph`` instances. + +.. note:: + It is only allowed to `emit` at most one element to each output in response to `onInput`, `IllegalStateException` is thrown. Completion handling ^^^^^^^^^^^^^^^^^^^ @@ -313,14 +322,6 @@ Notice that State changes are only allowed in reaction to downstream cancellatio cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements and continue with shutting down the entire stream. -Sometimes you may want to emit buffered or additional elements from the completion handler when the stream is shutting down. -However calling ``ctx.emit`` is only legal when the stream we emit to *has demand available*. In normal operation, -this is guaranteed by properly using demand conditions, however as completion handlers may be invokead at any time (without -regard to downstream demand being available) we must explicitly check that the downstream has demand available before signalling it. +It is not possible to emit elements from the completion handling, since completion +handlers may be invoked at any time (without regard to downstream demand being available). -The completion strategy below assumes that we have implemented some kind of :class:`FlexiRoute` which buffers elements, -yet when its upstream completes it should drain as much as possible to its downstream ``out`` output port. We use the -``ctx.isDemandAvailable(outputHandle)`` method to make sure calling emit with the buffered elements is valid and -complete this flushing once all demand (or the buffer) is drained: - -.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion-upstream-completed-signalling diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index bb203b86ec..eafefb41cf 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -39,7 +39,7 @@ Akka Streams currently provide these junctions: - ``ZipWith[A,B,...,Out]`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - ``Zip[A,B]`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream, - ``Concat[A]`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - - ``FlexiMerge[Out]`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + - ``FlexiMerge[Out]`` – (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL. One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 4e6f46d1e4..08a888c5dc 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -162,14 +162,21 @@ private[http] object HttpServer { case (ctx, _, error) ⇒ { ctx.fail(error); SameState } }) - def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = { + def finishWithError(ctx: MergeLogicContextBase, target: String, status: StatusCode, info: ErrorInfo): State[Any] = { log.warning("Illegal {}, responding with status '{}': {}", target, status, info.formatPretty) val msg = if (settings.verboseErrorMessages) info.formatPretty else info.summary - ctx.emit(ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true)) + // FIXME this is a workaround that is supposed to be solved by issue #16753 + ctx match { + case fullCtx: MergeLogicContext ⇒ + // note that this will throw IllegalArgumentException if no demand available + fullCtx.emit(ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true)) + case other ⇒ throw new IllegalStateException(s"Unexpected MergeLogicContext [${other.getClass.getName}]") + } + // finish(ctx) } - def finish(ctx: MergeLogicContext): State[Any] = { + def finish(ctx: MergeLogicContextBase): State[Any] = { ctx.finish() // shouldn't this return a `State` rather than `Unit`? SameState // it seems weird to stay in the same state after completion } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java index 65ab18d15b..cc7b8f1603 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java @@ -181,13 +181,13 @@ public class FlexiMergeTest { private final CompletionHandling emitOtherOnClose = new CompletionHandling() { @Override - public State onUpstreamFinish(MergeLogicContext ctx, InputHandle input) { + public State onUpstreamFinish(MergeLogicContextBase ctx, InputHandle input) { ctx.changeCompletionHandling(defaultCompletionHandling()); return readRemaining(other(input)); } @Override - public State onUpstreamFailure(MergeLogicContext ctx, InputHandle inputHandle, Throwable cause) { + public State onUpstreamFailure(MergeLogicContextBase ctx, InputHandle inputHandle, Throwable cause) { ctx.fail(cause); return sameState(); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index 1a4cf1f3db..2d2721b017 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -7,8 +7,9 @@ import akka.stream.testkit.StreamTestKit.AutoPublisher import akka.stream.testkit.StreamTestKit.OnNext import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe - import scala.util.control.NoStackTrace +import akka.actor.ActorRef +import akka.testkit.TestProbe object GraphFlexiMergeSpec { @@ -138,72 +139,6 @@ class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends } } -class OrderedMerge extends FlexiMerge[Int] { - import FlexiMerge._ - val input1 = createInputPort[Int]() - val input2 = createInputPort[Int]() - - def createMergeLogic = new MergeLogic[Int] { - private var reference = 0 - - override def inputHandles(inputCount: Int) = Vector(input1, input2) - - val emitOtherOnClose = CompletionHandling( - onUpstreamFinish = { (ctx, input) ⇒ - ctx.changeCompletionHandling(emitLast) - readRemaining(other(input)) - }, - onUpstreamFailure = { (ctx, input, cause) ⇒ - ctx.fail(cause) - SameState - }) - - def other(input: InputHandle): InputHandle = if (input eq input1) input2 else input1 - - def getFirstElement = State[Int](ReadAny(input1, input2)) { (ctx, input, element) ⇒ - reference = element - ctx.changeCompletionHandling(emitOtherOnClose) - readUntilLarger(other(input)) - } - - def readUntilLarger(input: InputHandle): State[Int] = State[Int](Read(input)) { - (ctx, input, element) ⇒ - if (element <= reference) { - ctx.emit(element) - SameState - } else { - ctx.emit(reference) - reference = element - readUntilLarger(other(input)) - } - } - - def readRemaining(input: InputHandle) = State[Int](Read(input)) { - (ctx, input, element) ⇒ - if (element <= reference) - ctx.emit(element) - else { - ctx.emit(reference) - reference = element - } - SameState - } - - val emitLast = CompletionHandling( - onUpstreamFinish = { (ctx, input) ⇒ - if (ctx.isDemandAvailable) - ctx.emit(reference) - SameState - }, - onUpstreamFailure = { (ctx, input, cause) ⇒ - ctx.fail(cause) - SameState - }) - - override def initialState = getFirstElement - } -} - class PreferringMerge extends FlexiMerge[Int] { import FlexiMerge._ val preferred = createInputPort[Int]() @@ -221,7 +156,7 @@ class PreferringMerge extends FlexiMerge[Int] { } } -class TestMerge extends FlexiMerge[String] { +class TestMerge(completionProbe: ActorRef) extends FlexiMerge[String] { import FlexiMerge._ val input1 = createInputPort[String]() val input2 = createInputPort[String]() @@ -254,8 +189,7 @@ class TestMerge extends FlexiMerge[String] { onUpstreamFinish = { (ctx, input) ⇒ if (throwFromOnComplete) throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace - if (ctx.isDemandAvailable) - ctx.emit("onUpstreamFinish: " + input.portIndex) + completionProbe ! "onUpstreamFinish: " + input.portIndex SameState }, onUpstreamFailure = { (ctx, input, cause) ⇒ @@ -385,54 +319,6 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "build simple ordered merge 1" in { - val output = Sink.publisher[Int] - val m = FlowGraph { implicit b ⇒ - val merge = new OrderedMerge - Source(List(3, 5, 6, 7, 8)) ~> merge.input1 - Source(List(1, 2, 4, 9)) ~> merge.input2 - merge.out ~> output - }.run() - - val s = SubscriberProbe[Int] - val p = m.get(output) - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(100) - for (n ← 1 to 9) { - s.expectNext(n) - } - s.expectComplete() - } - - "build simple ordered merge 2" in { - val output = Sink.publisher[Int] - val m = FlowGraph { implicit b ⇒ - val merge = new OrderedMerge - Source(List(3, 5, 6, 7, 8)) ~> merge.input1 - Source(List(3, 5, 6, 7, 8, 10)) ~> merge.input2 - merge.out ~> output - }.run() - - val s = SubscriberProbe[Int] - val p = m.get(output) - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(100) - s.expectNext(3) - s.expectNext(3) - s.expectNext(5) - s.expectNext(5) - s.expectNext(6) - s.expectNext(6) - s.expectNext(7) - s.expectNext(7) - s.expectNext(8) - s.expectNext(8) - s.expectNext(10) - s.expectComplete() - } - "build perferring merge" in { val output = Sink.publisher[Int] val m = FlowGraph { implicit b ⇒ @@ -523,8 +409,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { "support cancel of input" in { val publisher = PublisherProbe[String] + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(publisher) ~> merge.input1 Source(List("b", "c", "d")) ~> merge.input2 Source(List("e", "f")) ~> merge.input3 @@ -546,9 +433,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectNext("onInput: e") s.expectNext("onInput: c") s.expectNext("onInput: f") - s.expectNext("onUpstreamFinish: 2") + completionProbe.expectMsg("onUpstreamFinish: 2") s.expectNext("onInput: d") - s.expectNext("onUpstreamFinish: 1") + completionProbe.expectMsg("onUpstreamFinish: 1") autoPublisher.sendNext("x") @@ -559,8 +446,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { val publisher1 = PublisherProbe[String] val publisher2 = PublisherProbe[String] val publisher3 = PublisherProbe[String] + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(publisher1) ~> merge.input1 Source(publisher2) ~> merge.input2 Source(publisher3) ~> merge.input3 @@ -592,8 +480,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { } "handle failure" in { + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1 Source(List("a", "b")) ~> merge.input2 Source(List("c")) ~> merge.input3 @@ -608,16 +497,17 @@ class GraphFlexiMergeSpec extends AkkaSpec { // IllegalArgumentException is swallowed by the CompletionHandler s.expectNext("onInput: a") s.expectNext("onInput: c") - s.expectNext("onUpstreamFinish: 2") + completionProbe.expectMsg("onUpstreamFinish: 2") s.expectNext("onInput: b") - s.expectNext("onUpstreamFinish: 1") + completionProbe.expectMsg("onUpstreamFinish: 1") s.expectComplete() } "propagate failure" in { val publisher = PublisherProbe[String] + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(publisher) ~> merge.input1 Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.input2 Source.empty[String] ~> merge.input3 @@ -631,8 +521,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { } "emit failure" in { + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(List("a", "err")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 Source.empty[String] ~> merge.input3 @@ -650,8 +541,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { } "emit failure for user thrown exception" in { + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(List("a", "exc")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 Source.empty[String] ~> merge.input3 @@ -669,8 +561,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { } "emit failure for user thrown exception in onUpstreamFinish" in { + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(List("a", "onUpstreamFinish-exc")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 Source.empty[String] ~> merge.input3 @@ -689,8 +582,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { "emit failure for user thrown exception in onUpstreamFinish 2" in { val publisher = PublisherProbe[String] + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source.empty[String] ~> merge.input1 Source(publisher) ~> merge.input2 Source.empty[String] ~> merge.input3 @@ -713,8 +607,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { } "support finish from onInput" in { + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(List("a", "finish")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 Source.empty[String] ~> merge.input3 @@ -732,8 +627,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { } "support unconnected inputs" in { + val completionProbe = TestProbe() val m = FlowGraph { implicit b ⇒ - val merge = new TestMerge + val merge = new TestMerge(completionProbe.ref) Source(List("a")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 // input3 not connected @@ -746,10 +642,10 @@ class GraphFlexiMergeSpec extends AkkaSpec { val sub = s.expectSubscription() sub.request(10) s.expectNext("onInput: a") - s.expectNext("onUpstreamFinish: 0") + completionProbe.expectMsg("onUpstreamFinish: 0") s.expectNext("onInput: b") s.expectNext("onInput: c") - s.expectNext("onUpstreamFinish: 1") + completionProbe.expectMsg("onUpstreamFinish: 1") s.expectComplete() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala index 55296431d1..e9015a9bde 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala @@ -10,6 +10,8 @@ import akka.stream.testkit.StreamTestKit.OnNext import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.actor.ActorSystem +import akka.actor.ActorRef +import akka.testkit.TestProbe object GraphFlexiRouteSpec { @@ -89,7 +91,7 @@ object GraphFlexiRouteSpec { } } - class TestRoute extends FlexiRoute[String] { + class TestRoute(completionProbe: ActorRef) extends FlexiRoute[String] { import FlexiRoute._ val output1 = createOutputPort[String]() val output2 = createOutputPort[String]() @@ -122,26 +124,17 @@ object GraphFlexiRouteSpec { onUpstreamFinish = { ctx ⇒ if (throwFromOnComplete) throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace - handles.foreach { output ⇒ - if (ctx.isDemandAvailable(output)) - ctx.emit(output, "onUpstreamFinish") - } + completionProbe ! "onUpstreamFinish" }, onUpstreamFailure = { (ctx, cause) ⇒ cause match { case _: IllegalArgumentException ⇒ // swallow case _ ⇒ - handles.foreach { output ⇒ - if (ctx.isDemandAvailable(output)) - ctx.emit(output, "onError") - } + completionProbe ! "onError" } }, onDownstreamFinish = { (ctx, cancelledOutput) ⇒ - handles.foreach { output ⇒ - if (output != cancelledOutput && ctx.isDemandAvailable(output)) - ctx.emit(output, "onDownstreamFinish: " + cancelledOutput.portIndex) - } + completionProbe ! "onDownstreamFinish: " + cancelledOutput.portIndex SameState }) } @@ -151,8 +144,9 @@ object GraphFlexiRouteSpec { val publisher = PublisherProbe[String] val s1 = SubscriberProbe[String] val s2 = SubscriberProbe[String] + val completionProbe = TestProbe() FlowGraph { implicit b ⇒ - val route = new TestRoute + val route = new TestRoute(completionProbe.ref) Source(publisher) ~> route.in route.output1 ~> Sink(s1) route.output2 ~> Sink(s2) @@ -168,7 +162,6 @@ object GraphFlexiRouteSpec { } -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class GraphFlexiRouteSpec extends AkkaSpec { import GraphFlexiRouteSpec._ @@ -316,7 +309,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s1.expectError().getMessage should be("err-1") autoPublisher.sendComplete() - s2.expectNext("onUpstreamFinish") + completionProbe.expectMsg("onUpstreamFinish") s2.expectComplete() } @@ -370,7 +363,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) sub1.cancel() - s2.expectNext("onDownstreamFinish: 0") + completionProbe.expectMsg("onDownstreamFinish: 0") s1.expectNoMsg(200.millis) autoPublisher.sendNext("c") @@ -393,8 +386,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) autoPublisher.sendComplete() - s1.expectNext("onUpstreamFinish") - s2.expectNext("onUpstreamFinish") + completionProbe.expectMsg("onUpstreamFinish") s1.expectComplete() s2.expectComplete() @@ -413,8 +405,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace) - s1.expectNext("onError") - s2.expectNext("onError") + completionProbe.expectMsg("onError") s1.expectError().getMessage should be("test err") s2.expectError().getMessage should be("test err") @@ -433,7 +424,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) sub1.cancel() - s2.expectNext("onDownstreamFinish: 0") + completionProbe.expectMsg("onDownstreamFinish: 0") sub2.cancel() autoPublisher.subscription.expectCancellation() diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index e1eb500f8b..a3d99bcd23 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -41,6 +41,8 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings, private var behavior: StateT = _ private var completion: CompletionT = _ + // needed to ensure that at most one element is emitted from onInput + private var emitted = false override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) { override def onError(input: Int, e: Throwable): Unit = { @@ -56,10 +58,12 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings, } private val ctx: mergeLogic.MergeLogicContext = new mergeLogic.MergeLogicContext { - override def isDemandAvailable: Boolean = primaryOutputs.demandAvailable override def emit(elem: Any): Unit = { + if (emitted) + throw new IllegalStateException("It is only allowed to `emit` zero or one element in response to `onInput`") require(primaryOutputs.demandAvailable, "emit not allowed when no demand available") + emitted = true primaryOutputs.enqueueOutputElement(elem) } @@ -131,17 +135,17 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings, val id = inputBunch.idToDequeue() val elem = inputBunch.dequeueAndYield(id) val inputHandle = inputMapping(id) - changeBehavior(behavior.onInput(ctx, inputHandle, elem)) + callOnInput(inputHandle, elem) triggerCompletionAfterRead(inputHandle) case read: ReadPreferred ⇒ val id = inputBunch.idToDequeue() val elem = inputBunch.dequeueAndPrefer(id) val inputHandle = inputMapping(id) - changeBehavior(behavior.onInput(ctx, inputHandle, elem)) + callOnInput(inputHandle, elem) triggerCompletionAfterRead(inputHandle) case Read(inputHandle) ⇒ val elem = inputBunch.dequeue(inputHandle.portIndex) - changeBehavior(behavior.onInput(ctx, inputHandle, elem)) + callOnInput(inputHandle, elem) triggerCompletionAfterRead(inputHandle) case read: ReadAll ⇒ val inputHandles = read.inputs @@ -150,7 +154,7 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings, case input if include(input.portIndex) ⇒ input → inputBunch.dequeue(input.portIndex) } - changeBehavior(behavior.onInput(ctx, inputHandles.head, read.mkResult(Map(values: _*)))) + callOnInput(inputHandles.head, read.mkResult(Map(values: _*))) // must be triggered after emitting the accumulated out value triggerCompletionAfterRead(inputHandles) @@ -158,6 +162,11 @@ private[akka] class FlexiMergeImpl(_settings: ActorFlowMaterializerSettings, }) + private def callOnInput(input: InputHandle, element: Any): Unit = { + emitted = false + changeBehavior(behavior.onInput(ctx, input, element)) + } + private def triggerCompletionAfterRead(inputs: Seq[InputHandle]): Unit = { var j = 0 while (j < inputs.length) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala index c7b8e7c0a7..4a66be9173 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala @@ -42,6 +42,8 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings, private var behavior: StateT = _ private var completion: CompletionT = _ + // needed to ensure that at most one element is emitted from onInput + private val emitted = Array.ofDim[Boolean](outputCount) override protected val outputBunch = new OutputBunch(outputPorts, self, this) { override def onCancel(output: Int): Unit = @@ -64,11 +66,14 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings, } private val ctx: routeLogic.RouteLogicContext[Any] = new routeLogic.RouteLogicContext[Any] { - override def isDemandAvailable(output: OutputHandle): Boolean = - (output.portIndex < outputCount) && outputBunch.isPending(output.portIndex) override def emit(output: OutputHandle, elem: Any): Unit = { - require(outputBunch.isPending(output.portIndex), s"emit to [$output] not allowed when no demand available") + require(output.portIndex < outputCount, s"invalid output port index [${output.portIndex}, max index [${outputCount - 1}]") + if (emitted(output.portIndex)) + throw new IllegalStateException("It is only allowed to `emit` at most one element to each output in response to `onInput`") + require(outputBunch.isPending(output.portIndex), + s"emit to [$output] not allowed when no demand available") + emitted(output.portIndex) = true outputBunch.enqueue(output.portIndex, elem) } @@ -138,18 +143,27 @@ private[akka] class FlexiRouteImpl(_settings: ActorFlowMaterializerSettings, case any: DemandFromAny ⇒ val id = outputBunch.idToEnqueueAndYield() val outputHandle = outputMapping(id) - changeBehavior(behavior.onInput(ctx, outputHandle, elem)) + callOnInput(outputHandle, elem) case DemandFrom(outputHandle) ⇒ - changeBehavior(behavior.onInput(ctx, outputHandle, elem)) + callOnInput(outputHandle, elem) case all: DemandFromAll ⇒ val id = outputBunch.idToEnqueueAndYield() val outputHandle = outputMapping(id) - changeBehavior(behavior.onInput(ctx, outputHandle, elem)) + callOnInput(outputHandle, elem) } }) + private def callOnInput(output: OutputHandle, element: Any): Unit = { + var i = 0 + while (i < emitted.length) { + emitted(i) = false + i += 1 + } + changeBehavior(behavior.onInput(ctx, output, element)) + } + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index 6b24496dee..c07fefa1ec 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -85,7 +85,7 @@ object FlexiMerge { * fulfilled when there are elements for *all* of the given upstream * inputs. * - * The emited element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge. + * The emitted element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge. * * Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`, * the resulting [[ReadAllInputs]] will then not contain values for this element, which can be @@ -105,25 +105,27 @@ object FlexiMerge { } /** - * Context that is passed to the methods of [[State]] and [[CompletionHandling]]. + * Context that is passed to the `onInput` function of [[State]]. * The context provides means for performing side effects, such as emitting elements * downstream. */ - trait MergeLogicContext[Out] { + trait MergeLogicContext[Out] extends MergeLogicContextBase[Out] { /** - * @return `true` if at least one element has been requested by downstream (output). - */ - def isDemandAvailable: Boolean - - /** - * Emit one element downstream. It is only allowed to `emit` when - * [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException` + * Emit one element downstream. It is only allowed to `emit` zero or one + * element in response to `onInput`, otherwise `IllegalStateException` * is thrown. */ def emit(elem: Out): Unit + } + /** + * Context that is passed to the functions of [[CompletionHandling]]. + * The context provides means for performing side effects, such as completing + * the stream successfully or with failure. + */ + trait MergeLogicContextBase[Out] { /** - * Complete this stream succesfully. Upstream subscriptions will be cancelled. + * Complete this stream successfully. Upstream subscriptions will be cancelled. */ def finish(): Unit @@ -148,17 +150,20 @@ object FlexiMerge { * * The `onUpstreamFinish` method is called when an upstream input was completed sucessfully. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. - * A completion can be propagated downstream with [[MergeLogicContext#finish]], + * A completion can be propagated downstream with [[MergeLogicContextBase#finish]], * or it can be swallowed to continue with remaining inputs. * * The `onUpstreamFailure` method is called when an upstream input was completed with failure. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. - * A failure can be propagated downstream with [[MergeLogicContext#fail]], + * A failure can be propagated downstream with [[MergeLogicContextBase#fail]], * or it can be swallowed to continue with remaining inputs. + * + * It is not possible to emit elements from the completion handling, since completion + * handlers may be invoked at any time (without regard to downstream demand being available). */ abstract class CompletionHandling[Out] { - def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out] - def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out] + def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[_, Out] + def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[_, Out] } /** @@ -224,9 +229,9 @@ object FlexiMerge { */ def defaultCompletionHandling[A]: CompletionHandling[Out] = new CompletionHandling[Out] { - override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = + override def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[A, Out] = sameState - override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { + override def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[A, Out] = { ctx.fail(cause) sameState } @@ -238,11 +243,11 @@ object FlexiMerge { */ def eagerClose[A]: CompletionHandling[Out] = new CompletionHandling[Out] { - override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = { + override def onUpstreamFinish(ctx: MergeLogicContextBase[Out], input: InputHandle): State[A, Out] = { ctx.finish() sameState } - override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { + override def onUpstreamFailure(ctx: MergeLogicContextBase[Out], input: InputHandle, cause: Throwable): State[A, Out] = { ctx.fail(cause) sameState } @@ -283,13 +288,15 @@ object FlexiMerge { delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling = CompletionHandling( onUpstreamFinish = (ctx, inputHandle) ⇒ { + val widenedCtxt = ctx.asInstanceOf[MergeLogicContext] // we know that it is always a MergeLogicContext val newDelegateState = delegateCompletionHandling.onUpstreamFinish( - new MergeLogicContextWrapper(ctx), asJava(inputHandle)) + new MergeLogicContextWrapper(widenedCtxt), asJava(inputHandle)) wrapState(newDelegateState) }, onUpstreamFailure = (ctx, inputHandle, cause) ⇒ { + val widenedCtxt = ctx.asInstanceOf[MergeLogicContext] // we know that it is always a MergeLogicContext val newDelegateState = delegateCompletionHandling.onUpstreamFailure( - new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause) + new MergeLogicContextWrapper(widenedCtxt), asJava(inputHandle), cause) wrapState(newDelegateState) }) @@ -297,7 +304,6 @@ object FlexiMerge { inputHandle.asInstanceOf[InputHandle] class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] { - override def isDemandAvailable: Boolean = delegate.isDemandAvailable override def emit(elem: Out): Unit = delegate.emit(elem) override def finish(): Unit = delegate.finish() override def fail(cause: Throwable): Unit = delegate.fail(cause) @@ -328,6 +334,8 @@ object FlexiMerge { * * The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]] * that will be used when reading input elements and emitting output elements. + * As response to an input element it is allowed to emit at most one output element. + * * The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance * must not hold mutable state, since it may be shared across several materialized ``FlowGraph`` * instances. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala index f294a9c29e..a050c43bcb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -75,23 +75,24 @@ object FlexiRoute { class DemandFromAll(val outputs: JList[OutputHandle]) extends DemandCondition /** - * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * Context that is passed to the `onInput` function of [[State]]. * The context provides means for performing side effects, such as emitting elements * downstream. */ - trait RouteLogicContext[In, Out] { + trait RouteLogicContext[In, Out] extends RouteLogicContextBase[In] { /** - * @return `true` if at least one element has been requested by the given downstream (output). - */ - def isDemandAvailable(output: OutputHandle): Boolean - - /** - * Emit one element downstream. It is only allowed to `emit` when - * [[#isDemandAvailable]] is `true` for the given `output`, otherwise - * `IllegalArgumentException` is thrown. + * Emit one element downstream. It is only allowed to `emit` at most one element to + * each output in response to `onInput`, `IllegalStateException` is thrown. */ def emit(output: OutputHandle, elem: Out): Unit + } + /** + * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * The context provides means for performing side effects, such as completing + * the stream successfully or with failure. + */ + trait RouteLogicContextBase[In] { /** * Complete the given downstream successfully. */ @@ -130,19 +131,22 @@ object FlexiRoute { * * The `onDownstreamFinish` method is called when a downstream output cancels. * It returns next behavior or [[#sameState]] to keep current behavior. + * + * It is not possible to emit elements from the completion handling, since completion + * handlers may be invoked at any time (without regard to downstream demand being available). */ abstract class CompletionHandling[In] { - def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit - def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit - def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] + def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit + def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit + def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] } /** * Definition of which outputs that must have requested elements and how to act * on the read elements. When an element has been read [[#onInput]] is called and * then it is ensured that the specified downstream outputs have requested at least - * one element, i.e. it is allowed to emit at least one element downstream with - * [[RouteLogicContext#emit]]. + * one element, i.e. it is allowed to emit at most one element to each downstream + * output with [[RouteLogicContext#emit]]. * * The `onInput` method is called when an `element` was read from upstream. * The function returns next behavior or [[#sameState]] to keep current behavior. @@ -195,9 +199,9 @@ object FlexiRoute { */ def defaultCompletionHandling: CompletionHandling[In] = new CompletionHandling[In] { - override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = () - override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () - override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = + override def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit = () + override def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit = () + override def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] = sameState } @@ -207,9 +211,9 @@ object FlexiRoute { */ def eagerClose[A]: CompletionHandling[In] = new CompletionHandling[In] { - override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = () - override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () - override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = { + override def onUpstreamFinish(ctx: RouteLogicContextBase[In]): Unit = () + override def onUpstreamFailure(ctx: RouteLogicContextBase[In], cause: Throwable): Unit = () + override def onDownstreamFinish(ctx: RouteLogicContextBase[In], output: OutputHandle): State[In, _] = { ctx.finish() sameState } @@ -250,14 +254,17 @@ object FlexiRoute { delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling = CompletionHandling( onUpstreamFinish = ctx ⇒ { - delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(ctx)) + val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext + delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(widenedCtxt)) }, onUpstreamFailure = (ctx, cause) ⇒ { - delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(ctx), cause) + val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext + delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(widenedCtxt), cause) }, onDownstreamFinish = (ctx, outputHandle) ⇒ { + val widenedCtxt = ctx.asInstanceOf[RouteLogicContext[Any]] // we know that it is always a RouteLogicContext val newDelegateState = delegateCompletionHandling.onDownstreamFinish( - new RouteLogicContextWrapper(ctx), asJava(outputHandle)) + new RouteLogicContextWrapper(widenedCtxt), asJava(outputHandle)) wrapState(newDelegateState) }) @@ -265,7 +272,6 @@ object FlexiRoute { outputHandle.asInstanceOf[OutputHandle] class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] { - override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output) override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem) override def finish(): Unit = delegate.finish() override def finish(output: OutputHandle): Unit = delegate.finish(output) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index c60ef898f8..e54a8943da 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -89,7 +89,7 @@ object FlexiMerge { * fulfilled when there are elements for *all* of the given upstream * inputs. * - * The emited element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge. + * The emitted element the will be a [[ReadAllInputs]] object, which contains values for all non-cancelled inputs of this FlexiMerge. * * Cancelled inputs are not used, i.e. it is allowed to specify them in the list of `inputs`, * the resulting [[ReadAllInputs]] will then not contain values for this element, which can be @@ -119,23 +119,25 @@ object FlexiMerge { def initialCompletionHandling: CompletionHandling = defaultCompletionHandling /** - * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * Context that is passed to the `onInput` function of [[State]]. * The context provides means for performing side effects, such as emitting elements * downstream. */ - trait MergeLogicContext { + trait MergeLogicContext extends MergeLogicContextBase { /** - * @return `true` if at least one element has been requested by downstream (output). - */ - def isDemandAvailable: Boolean - - /** - * Emit one element downstream. It is only allowed to `emit` when - * [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException` + * Emit one element downstream. It is only allowed to `emit` zero or one + * element in response to `onInput`, otherwise `IllegalStateException` * is thrown. */ def emit(elem: Out): Unit + } + /** + * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * The context provides means for performing side effects, such as completing + * the stream successfully or with failure. + */ + trait MergeLogicContextBase { /** * Complete this stream successfully. Upstream subscriptions will be cancelled. */ @@ -161,7 +163,7 @@ object FlexiMerge { * Definition of which inputs to read from and how to act on the read elements. * When an element has been read [[#onInput]] is called and then it is ensured * that downstream has requested at least one element, i.e. it is allowed to - * emit at least one element downstream with [[MergeLogicContext#emit]]. + * emit at most one element downstream with [[MergeLogicContext#emit]]. * * The `onInput` function is called when an `element` was read from the `input`. * The function returns next behavior or [[#SameState]] to keep current behavior. @@ -188,17 +190,20 @@ object FlexiMerge { * * The `onUpstreamFinish` function is called when an upstream input was completed successfully. * It returns next behavior or [[#SameState]] to keep current behavior. - * A completion can be propagated downstream with [[MergeLogicContext#finish]], + * A completion can be propagated downstream with [[MergeLogicContextBase#finish]], * or it can be swallowed to continue with remaining inputs. * * The `onUpstreamFailure` function is called when an upstream input was completed with failure. * It returns next behavior or [[#SameState]] to keep current behavior. - * A failure can be propagated downstream with [[MergeLogicContext#fail]], + * A failure can be propagated downstream with [[MergeLogicContextBase#fail]], * or it can be swallowed to continue with remaining inputs. + * + * It is not possible to emit elements from the completion handling, since completion + * handlers may be invoked at any time (without regard to downstream demand being available). */ sealed case class CompletionHandling( - onUpstreamFinish: (MergeLogicContext, InputHandle) ⇒ State[_], - onUpstreamFailure: (MergeLogicContext, InputHandle, Throwable) ⇒ State[_]) + onUpstreamFinish: (MergeLogicContextBase, InputHandle) ⇒ State[_], + onUpstreamFailure: (MergeLogicContextBase, InputHandle, Throwable) ⇒ State[_]) /** * Will continue to operate until a read becomes unsatisfiable, then it completes. @@ -227,6 +232,8 @@ object FlexiMerge { * * The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]] * that will be used when reading input elements and emitting output elements. + * As response to an input element it is allowed to emit at most one output element. + * * The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance * must not hold mutable state, since it may be shared across several materialized ``FlowGraph`` * instances. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index aa796db99b..e50b5a5161 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -85,23 +85,24 @@ object FlexiRoute { def initialCompletionHandling: CompletionHandling = defaultCompletionHandling /** - * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * Context that is passed to the `onInput` function of [[State]]. * The context provides means for performing side effects, such as emitting elements * downstream. */ - trait RouteLogicContext[Out] { + trait RouteLogicContext[Out] extends RouteLogicContextBase { /** - * @return `true` if at least one element has been requested by the given downstream (output). - */ - def isDemandAvailable(output: OutputHandle): Boolean - - /** - * Emit one element downstream. It is only allowed to `emit` when - * [[#isDemandAvailable]] is `true` for the given `output`, otherwise - * `IllegalArgumentException` is thrown. + * Emit one element downstream. It is only allowed to `emit` at most one element to + * each output in response to `onInput`, `IllegalStateException` is thrown. */ def emit(output: OutputHandle, elem: Out): Unit + } + /** + * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * The context provides means for performing side effects, such as completing + * the stream successfully or with failure. + */ + trait RouteLogicContextBase { /** * Complete the given downstream successfully. */ @@ -132,8 +133,8 @@ object FlexiRoute { * Definition of which outputs that must have requested elements and how to act * on the read elements. When an element has been read [[#onInput]] is called and * then it is ensured that the specified downstream outputs have requested at least - * one element, i.e. it is allowed to emit at least one element downstream with - * [[RouteLogicContext#emit]]. + * one element, i.e. it is allowed to emit at most one element to each downstream + * output with [[RouteLogicContext#emit]]. * * The `onInput` function is called when an `element` was read from upstream. * The function returns next behavior or [[#SameState]] to keep current behavior. @@ -144,7 +145,7 @@ object FlexiRoute { /** * Return this from [[State]] `onInput` to use same state for next element. */ - def SameState[In]: State[In] = sameStateInstance.asInstanceOf[State[In]] + def SameState[T]: State[T] = sameStateInstance.asInstanceOf[State[T]] private val sameStateInstance = new State[Any](DemandFromAny(Nil))((_, _, _) ⇒ throw new UnsupportedOperationException("SameState.onInput should not be called")) { @@ -165,11 +166,14 @@ object FlexiRoute { * * The `onDownstreamFinish` function is called when a downstream output cancels. * It returns next behavior or [[#SameState]] to keep current behavior. + * + * It is not possible to emit elements from the completion handling, since completion + * handlers may be invoked at any time (without regard to downstream demand being available). */ sealed case class CompletionHandling( - onUpstreamFinish: RouteLogicContext[Any] ⇒ Unit, - onUpstreamFailure: (RouteLogicContext[Any], Throwable) ⇒ Unit, - onDownstreamFinish: (RouteLogicContext[Any], OutputHandle) ⇒ State[_]) + onUpstreamFinish: RouteLogicContextBase ⇒ Unit, + onUpstreamFailure: (RouteLogicContextBase, Throwable) ⇒ Unit, + onDownstreamFinish: (RouteLogicContextBase, OutputHandle) ⇒ State[_]) /** * When an output cancels it continues with remaining outputs.