From 1b73e09e2e06a4ec91797e21bd67cd3239205ed0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 23 Jan 2015 10:54:05 +0100 Subject: [PATCH] =str #16577 emit user exception from Flexi* completion handler --- .../stream/scaladsl/GraphFlexiMergeSpec.scala | 70 +++++++++++++++++++ .../stream/scaladsl/GraphFlexiRouteSpec.scala | 44 ++++++++++++ .../akka/stream/impl/FlexiMergeImpl.scala | 18 +++-- .../akka/stream/impl/FlexiRouteImpl.scala | 11 +-- 4 files changed, 133 insertions(+), 10 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index f8788c40b3..4856e44e9e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -230,6 +230,7 @@ class TestMerge extends FlexiMerge[String] { def createMergeLogic: MergeLogic[String] = new MergeLogic[String] { val handles = Vector(input1, input2, input3) override def inputHandles(inputCount: Int) = handles + var throwFromOnComplete = false override def initialState = State[String](ReadAny(handles)) { (ctx, input, element) ⇒ @@ -237,8 +238,12 @@ class TestMerge extends FlexiMerge[String] { ctx.cancel(input) else if (element == "err") ctx.error(new RuntimeException("err") with NoStackTrace) + else if (element == "exc") + throw new RuntimeException("exc") with NoStackTrace else if (element == "complete") ctx.complete() + else if (element == "onComplete-exc") + throwFromOnComplete = true else ctx.emit("onInput: " + element) @@ -247,6 +252,8 @@ class TestMerge extends FlexiMerge[String] { override def initialCompletionHandling = CompletionHandling( onComplete = { (ctx, input) ⇒ + if (throwFromOnComplete) + throw new RuntimeException("onComplete-exc") with NoStackTrace if (ctx.isDemandAvailable) ctx.emit("onComplete: " + input.portIndex) SameState @@ -642,6 +649,69 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("err") } + "emit error for user thrown exception" in { + val m = FlowGraph { implicit b ⇒ + val merge = new TestMerge + Source(List("a", "exc")) ~> merge.input1 + Source(List("b", "c")) ~> merge.input2 + Source.empty[String] ~> merge.input3 + merge.out ~> out1 + }.run() + + val s = SubscriberProbe[String] + val p = m.get(out1) + p.subscribe(s) + val sub = s.expectSubscription() + sub.request(10) + s.expectNext("onInput: a") + s.expectNext("onInput: b") + s.expectError().getMessage should be("exc") + } + + "emit error for user thrown exception in onComplete" in { + val m = FlowGraph { implicit b ⇒ + val merge = new TestMerge + Source(List("a", "onComplete-exc")) ~> merge.input1 + Source(List("b", "c")) ~> merge.input2 + Source.empty[String] ~> merge.input3 + merge.out ~> out1 + }.run() + + val s = SubscriberProbe[String] + val p = m.get(out1) + p.subscribe(s) + val sub = s.expectSubscription() + sub.request(10) + s.expectNext("onInput: a") + s.expectNext("onInput: b") + s.expectError().getMessage should be("onComplete-exc") + } + + "emit error for user thrown exception in onComplete 2" in { + val publisher = PublisherProbe[String] + val m = FlowGraph { implicit b ⇒ + val merge = new TestMerge + Source.empty[String] ~> merge.input1 + Source(publisher) ~> merge.input2 + Source.empty[String] ~> merge.input3 + merge.out ~> out1 + }.run() + + val autoPublisher = new AutoPublisher(publisher) + autoPublisher.sendNext("onComplete-exc") + autoPublisher.sendNext("a") + + val s = SubscriberProbe[String] + val p = m.get(out1) + p.subscribe(s) + val sub = s.expectSubscription() + sub.request(1) + s.expectNext("onInput: a") + + autoPublisher.sendComplete() + s.expectError().getMessage should be("onComplete-exc") + } + "support complete from onInput" in { val m = FlowGraph { implicit b ⇒ val merge = new TestMerge diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala index c37e03b8cd..f64b71d131 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala @@ -94,6 +94,7 @@ object GraphFlexiRouteSpec { val output1 = createOutputPort[String]() val output2 = createOutputPort[String]() val output3 = createOutputPort[String]() + var throwFromOnComplete = false def createRouteLogic: RouteLogic[String] = new RouteLogic[String] { val handles = Vector(output1, output2, output3) @@ -105,6 +106,10 @@ object GraphFlexiRouteSpec { ctx.error(new RuntimeException("err") with NoStackTrace) else if (element == "err-output1") ctx.error(output1, new RuntimeException("err-1") with NoStackTrace) + else if (element == "exc") + throw new RuntimeException("exc") with NoStackTrace + else if (element == "onComplete-exc") + throwFromOnComplete = true else if (element == "complete") ctx.complete() else @@ -115,6 +120,8 @@ object GraphFlexiRouteSpec { override def initialCompletionHandling = CompletionHandling( onComplete = { ctx ⇒ + if (throwFromOnComplete) + throw new RuntimeException("onComplete-exc") with NoStackTrace handles.foreach { output ⇒ if (ctx.isDemandAvailable(output)) ctx.emit(output, "onComplete") @@ -313,6 +320,43 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } + "emit error for user thrown exception" in { + val fixture = new TestFixture + import fixture._ + + sub1.request(1) + s1.expectNext("onInput: a") + sub2.request(1) + s2.expectNext("onInput: b") + + sub1.request(5) + sub2.request(5) + autoPublisher.sendNext("exc") + + s1.expectError().getMessage should be("exc") + s2.expectError().getMessage should be("exc") + + autoPublisher.subscription.expectCancellation() + } + + "emit error for user thrown exception in onComplete" in { + val fixture = new TestFixture + import fixture._ + + sub1.request(1) + s1.expectNext("onInput: a") + sub2.request(1) + s2.expectNext("onInput: b") + + sub1.request(5) + sub2.request(5) + autoPublisher.sendNext("onComplete-exc") + autoPublisher.sendComplete() + + s1.expectError().getMessage should be("onComplete-exc") + s2.expectError().getMessage should be("onComplete-exc") + } + "handle cancel from output" in { val fixture = new TestFixture import fixture._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index 5b224344f2..c84fe9f789 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -7,8 +7,8 @@ import akka.actor.Props import akka.stream.MaterializerSettings import akka.stream.scaladsl.OperationAttributes import akka.stream.scaladsl.FlexiMerge - import scala.collection.breakOut +import scala.util.control.NonFatal /** * INTERNAL API @@ -44,7 +44,10 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) { override def onError(input: Int, e: Throwable): Unit = { - changeBehavior(completion.onError(ctx, inputMapping(input), e)) + changeBehavior(try completion.onError(ctx, inputMapping(input), e) + catch { + case NonFatal(e) ⇒ fail(e); mergeLogic.SameState + }) cancel(input) } @@ -149,7 +152,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, changeBehavior(behavior.onInput(ctx, inputHandles.head, read.mkResult(Map(values: _*)))) - // must be triggered after emiting the accumulated out value + // must be triggered after emitting the accumulated out value triggerCompletionAfterRead(inputHandles) } @@ -167,7 +170,10 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, if (inputBunch.isDepleted(inputHandle.portIndex)) triggerCompletion(inputHandle) - private def triggerCompletion(inputHandle: InputHandle) { - changeBehavior(completion.onComplete(ctx, inputHandle)) - } + private def triggerCompletion(inputHandle: InputHandle): Unit = + changeBehavior(try completion.onComplete(ctx, inputHandle) + catch { + case NonFatal(e) ⇒ fail(e); mergeLogic.SameState + }) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala index 6567c94573..02773cffd3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala @@ -4,12 +4,12 @@ package akka.stream.impl import akka.stream.scaladsl.OperationAttributes - import scala.collection.breakOut import akka.actor.Props import akka.stream.scaladsl.FlexiRoute import akka.stream.MaterializerSettings import akka.stream.impl.FanOut.OutputBunch +import scala.util.control.NonFatal /** * INTERNAL API @@ -45,17 +45,20 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings, override protected val outputBunch = new OutputBunch(outputPorts, self, this) { override def onCancel(output: Int): Unit = - changeBehavior(completion.onCancel(ctx, outputMapping(output))) + changeBehavior(try completion.onCancel(ctx, outputMapping(output)) + catch { + case NonFatal(e) ⇒ fail(e); routeLogic.SameState + }) } override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) { override def onError(e: Throwable): Unit = { - completion.onError(ctx, e) + try completion.onError(ctx, e) catch { case NonFatal(e) ⇒ fail(e) } fail(e) } override def onComplete(): Unit = { - completion.onComplete(ctx) + try completion.onComplete(ctx) catch { case NonFatal(e) ⇒ fail(e) } super.onComplete() } }