=str #16577 emit user exception from Flexi* completion handler

This commit is contained in:
Patrik Nordwall 2015-01-23 10:54:05 +01:00
parent 84a5505ca9
commit 1b73e09e2e
4 changed files with 133 additions and 10 deletions

View file

@ -230,6 +230,7 @@ class TestMerge extends FlexiMerge[String] {
def createMergeLogic: MergeLogic[String] = new MergeLogic[String] {
val handles = Vector(input1, input2, input3)
override def inputHandles(inputCount: Int) = handles
var throwFromOnComplete = false
override def initialState = State[String](ReadAny(handles)) {
(ctx, input, element)
@ -237,8 +238,12 @@ class TestMerge extends FlexiMerge[String] {
ctx.cancel(input)
else if (element == "err")
ctx.error(new RuntimeException("err") with NoStackTrace)
else if (element == "exc")
throw new RuntimeException("exc") with NoStackTrace
else if (element == "complete")
ctx.complete()
else if (element == "onComplete-exc")
throwFromOnComplete = true
else
ctx.emit("onInput: " + element)
@ -247,6 +252,8 @@ class TestMerge extends FlexiMerge[String] {
override def initialCompletionHandling = CompletionHandling(
onComplete = { (ctx, input)
if (throwFromOnComplete)
throw new RuntimeException("onComplete-exc") with NoStackTrace
if (ctx.isDemandAvailable)
ctx.emit("onComplete: " + input.portIndex)
SameState
@ -642,6 +649,69 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectError().getMessage should be("err")
}
"emit error for user thrown exception" in {
val m = FlowGraph { implicit b
val merge = new TestMerge
Source(List("a", "exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3
merge.out ~> out1
}.run()
val s = SubscriberProbe[String]
val p = m.get(out1)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext("onInput: a")
s.expectNext("onInput: b")
s.expectError().getMessage should be("exc")
}
"emit error for user thrown exception in onComplete" in {
val m = FlowGraph { implicit b
val merge = new TestMerge
Source(List("a", "onComplete-exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3
merge.out ~> out1
}.run()
val s = SubscriberProbe[String]
val p = m.get(out1)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext("onInput: a")
s.expectNext("onInput: b")
s.expectError().getMessage should be("onComplete-exc")
}
"emit error for user thrown exception in onComplete 2" in {
val publisher = PublisherProbe[String]
val m = FlowGraph { implicit b
val merge = new TestMerge
Source.empty[String] ~> merge.input1
Source(publisher) ~> merge.input2
Source.empty[String] ~> merge.input3
merge.out ~> out1
}.run()
val autoPublisher = new AutoPublisher(publisher)
autoPublisher.sendNext("onComplete-exc")
autoPublisher.sendNext("a")
val s = SubscriberProbe[String]
val p = m.get(out1)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(1)
s.expectNext("onInput: a")
autoPublisher.sendComplete()
s.expectError().getMessage should be("onComplete-exc")
}
"support complete from onInput" in {
val m = FlowGraph { implicit b
val merge = new TestMerge

View file

@ -94,6 +94,7 @@ object GraphFlexiRouteSpec {
val output1 = createOutputPort[String]()
val output2 = createOutputPort[String]()
val output3 = createOutputPort[String]()
var throwFromOnComplete = false
def createRouteLogic: RouteLogic[String] = new RouteLogic[String] {
val handles = Vector(output1, output2, output3)
@ -105,6 +106,10 @@ object GraphFlexiRouteSpec {
ctx.error(new RuntimeException("err") with NoStackTrace)
else if (element == "err-output1")
ctx.error(output1, new RuntimeException("err-1") with NoStackTrace)
else if (element == "exc")
throw new RuntimeException("exc") with NoStackTrace
else if (element == "onComplete-exc")
throwFromOnComplete = true
else if (element == "complete")
ctx.complete()
else
@ -115,6 +120,8 @@ object GraphFlexiRouteSpec {
override def initialCompletionHandling = CompletionHandling(
onComplete = { ctx
if (throwFromOnComplete)
throw new RuntimeException("onComplete-exc") with NoStackTrace
handles.foreach { output
if (ctx.isDemandAvailable(output))
ctx.emit(output, "onComplete")
@ -313,6 +320,43 @@ class GraphFlexiRouteSpec extends AkkaSpec {
s2.expectComplete()
}
"emit error for user thrown exception" in {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(5)
sub2.request(5)
autoPublisher.sendNext("exc")
s1.expectError().getMessage should be("exc")
s2.expectError().getMessage should be("exc")
autoPublisher.subscription.expectCancellation()
}
"emit error for user thrown exception in onComplete" in {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(5)
sub2.request(5)
autoPublisher.sendNext("onComplete-exc")
autoPublisher.sendComplete()
s1.expectError().getMessage should be("onComplete-exc")
s2.expectError().getMessage should be("onComplete-exc")
}
"handle cancel from output" in {
val fixture = new TestFixture
import fixture._

View file

@ -7,8 +7,8 @@ import akka.actor.Props
import akka.stream.MaterializerSettings
import akka.stream.scaladsl.OperationAttributes
import akka.stream.scaladsl.FlexiMerge
import scala.collection.breakOut
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -44,7 +44,10 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) {
override def onError(input: Int, e: Throwable): Unit = {
changeBehavior(completion.onError(ctx, inputMapping(input), e))
changeBehavior(try completion.onError(ctx, inputMapping(input), e)
catch {
case NonFatal(e) fail(e); mergeLogic.SameState
})
cancel(input)
}
@ -149,7 +152,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
changeBehavior(behavior.onInput(ctx, inputHandles.head, read.mkResult(Map(values: _*))))
// must be triggered after emiting the accumulated out value
// must be triggered after emitting the accumulated out value
triggerCompletionAfterRead(inputHandles)
}
@ -167,7 +170,10 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
if (inputBunch.isDepleted(inputHandle.portIndex))
triggerCompletion(inputHandle)
private def triggerCompletion(inputHandle: InputHandle) {
changeBehavior(completion.onComplete(ctx, inputHandle))
}
private def triggerCompletion(inputHandle: InputHandle): Unit =
changeBehavior(try completion.onComplete(ctx, inputHandle)
catch {
case NonFatal(e) fail(e); mergeLogic.SameState
})
}

View file

@ -4,12 +4,12 @@
package akka.stream.impl
import akka.stream.scaladsl.OperationAttributes
import scala.collection.breakOut
import akka.actor.Props
import akka.stream.scaladsl.FlexiRoute
import akka.stream.MaterializerSettings
import akka.stream.impl.FanOut.OutputBunch
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -45,17 +45,20 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings,
override protected val outputBunch = new OutputBunch(outputPorts, self, this) {
override def onCancel(output: Int): Unit =
changeBehavior(completion.onCancel(ctx, outputMapping(output)))
changeBehavior(try completion.onCancel(ctx, outputMapping(output))
catch {
case NonFatal(e) fail(e); routeLogic.SameState
})
}
override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) {
override def onError(e: Throwable): Unit = {
completion.onError(ctx, e)
try completion.onError(ctx, e) catch { case NonFatal(e) fail(e) }
fail(e)
}
override def onComplete(): Unit = {
completion.onComplete(ctx)
try completion.onComplete(ctx) catch { case NonFatal(e) fail(e) }
super.onComplete()
}
}