Merge pull request #16727 from akka/wip-16577-FlexiMerge-exceptions-patriknw

!str #6577 Align API of Flexi with Stage
This commit is contained in:
Patrik Nordwall 2015-01-29 10:26:53 +01:00
commit de5bff0444
12 changed files with 201 additions and 201 deletions

View file

@ -127,10 +127,10 @@ class FlexiDocSpec extends AkkaSpec {
override def initialCompletionHandling = override def initialCompletionHandling =
CompletionHandling( CompletionHandling(
onComplete = (ctx, input) => input match { onUpstreamFinish = (ctx, input) => input match {
case `important` => case `important` =>
log.info("Important input completed, shutting down.") log.info("Important input completed, shutting down.")
ctx.complete() ctx.finish()
SameState SameState
case replica => case replica =>
@ -141,9 +141,9 @@ class FlexiDocSpec extends AkkaSpec {
ctx.changeCompletionHandling(eagerClose) ctx.changeCompletionHandling(eagerClose)
SameState SameState
}, },
onError = (ctx, input, cause) => input match { onUpstreamFailure = (ctx, input, cause) => input match {
case `important` => case `important` =>
ctx.error(cause) ctx.fail(cause)
SameState SameState
case replica => case replica =>
@ -281,13 +281,13 @@ class FlexiDocSpec extends AkkaSpec {
override def initialCompletionHandling = override def initialCompletionHandling =
CompletionHandling( CompletionHandling(
// upstream: // upstream:
onComplete = (ctx) => (), onUpstreamFinish = (ctx) => (),
onError = (ctx, thr) => (), onUpstreamFailure = (ctx, thr) => (),
// downstream: // downstream:
onCancel = (ctx, output) => output match { onDownstreamFinish = (ctx, output) => output match {
case `important` => case `important` =>
// complete all downstreams, and cancel the upstream // finish all downstreams, and cancel the upstream
ctx.complete() ctx.finish()
SameState SameState
case _ => case _ =>
SameState SameState
@ -336,9 +336,9 @@ class FlexiDocSpec extends AkkaSpec {
} }
val signalStatusOnTermination = CompletionHandling( val signalStatusOnTermination = CompletionHandling(
onComplete = ctx => drainBuffer(ctx), onUpstreamFinish = ctx => drainBuffer(ctx),
onError = (ctx, cause) => drainBuffer(ctx), onUpstreamFailure = (ctx, cause) => drainBuffer(ctx),
onCancel = (_, _) => SameState) onDownstreamFinish = (_, _) => SameState)
//#flexiroute-completion-upstream-completed-signalling //#flexiroute-completion-upstream-completed-signalling
override def initialCompletionHandling = signalStatusOnTermination override def initialCompletionHandling = signalStatusOnTermination

View file

@ -42,12 +42,12 @@ private[http] object HttpClient {
/* /*
Basic Stream Setup Basic Stream Setup
================== ==================
requestIn +----------+ requestIn +----------+
+-----------------------------------------------+--->| Termi- | requestRendering +-----------------------------------------------+--->| Termi- | requestRendering
| | nation +---------------------> | | | nation +---------------------> |
+-------------------------------------->| Merge | | +-------------------------------------->| Merge | |
| Termination Backchannel | +----------+ | TCP- | Termination Backchannel | +----------+ | TCP-
| | | level | | | level
| | Method | client | | Method | client
| +------------+ | Bypass | flow | +------------+ | Bypass | flow
@ -122,13 +122,13 @@ private[http] object HttpClient {
} }
override def initialCompletionHandling = CompletionHandling( override def initialCompletionHandling = CompletionHandling(
onComplete = { onUpstreamFinish = {
case (ctx, `requestInput`) SameState case (ctx, `requestInput`) SameState
case (ctx, `terminationBackchannelInput`) case (ctx, `terminationBackchannelInput`)
ctx.complete() ctx.finish()
SameState SameState
}, },
onError = defaultCompletionHandling.onError) onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
} }
} }
@ -163,7 +163,7 @@ private[http] object HttpClient {
} }
private val gotoInitial = (ctx: MergeLogicContext) { private val gotoInitial = (ctx: MergeLogicContext) {
if (methodBypassCompleted) { if (methodBypassCompleted) {
ctx.complete() ctx.finish()
SameState SameState
} else { } else {
ctx.changeCompletionHandling(initialCompletionHandling) ctx.changeCompletionHandling(initialCompletionHandling)
@ -199,7 +199,7 @@ private[http] object HttpClient {
onNeedNextMethod(ctx) onNeedNextMethod(ctx)
case StreamEnd case StreamEnd
emit(b.result()) emit(b.result())
ctx.complete() ctx.finish()
SameState SameState
case NeedMoreData case NeedMoreData
emit(b.result()) emit(b.result())
@ -209,25 +209,25 @@ private[http] object HttpClient {
} }
override val initialCompletionHandling = CompletionHandling( override val initialCompletionHandling = CompletionHandling(
onComplete = (ctx, _) { ctx.complete(); SameState }, onUpstreamFinish = (ctx, _) { ctx.finish(); SameState },
onError = defaultCompletionHandling.onError) onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
val responseReadingCompletionHandling = CompletionHandling( val responseReadingCompletionHandling = CompletionHandling(
onComplete = { onUpstreamFinish = {
case (ctx, `methodBypassInput`) case (ctx, `methodBypassInput`)
methodBypassCompleted = true methodBypassCompleted = true
SameState SameState
case (ctx, `dataInput`) case (ctx, `dataInput`)
if (parser.onUpstreamFinish()) { if (parser.onUpstreamFinish()) {
ctx.complete() ctx.finish()
} else { } else {
// not pretty but because the FlexiMerge doesn't let us emit from here (#16565) // not pretty but because the FlexiMerge doesn't let us emit from here (#16565)
// we need to funnel the error through the error channel // we need to funnel the error through the error channel
ctx.error(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput])) ctx.fail(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput]))
} }
SameState SameState
}, },
onError = defaultCompletionHandling.onError) onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
} }
} }
@ -251,4 +251,4 @@ private[http] object HttpClient {
} }
() stage () stage
} }
} }

View file

@ -151,15 +151,15 @@ private[http] object HttpServer {
} }
val waitingForApplicationResponseCompletionHandling = CompletionHandling( val waitingForApplicationResponseCompletionHandling = CompletionHandling(
onComplete = { onUpstreamFinish = {
case (ctx, `bypassInput`) { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState } case (ctx, `bypassInput`) { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState }
case (ctx, _) { ctx.complete(); SameState } case (ctx, _) { ctx.finish(); SameState }
}, },
onError = { onUpstreamFailure = {
case (ctx, _, EntityStreamException(errorInfo)) case (ctx, _, EntityStreamException(errorInfo))
// the application has forwarded a request entity stream error to the response stream // the application has forwarded a request entity stream error to the response stream
finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo) finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo)
case (ctx, _, error) { ctx.error(error); SameState } case (ctx, _, error) { ctx.fail(error); SameState }
}) })
def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = { def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {
@ -170,7 +170,7 @@ private[http] object HttpServer {
} }
def finish(ctx: MergeLogicContext): State[Any] = { def finish(ctx: MergeLogicContext): State[Any] = {
ctx.complete() // shouldn't this return a `State` rather than `Unit`? ctx.finish() // shouldn't this return a `State` rather than `Unit`?
SameState // it seems weird to stay in the same state after completion SameState // it seems weird to stay in the same state after completion
} }
} }
@ -234,4 +234,4 @@ private[http] object HttpServer {
case _ ctx.fail(error) case _ ctx.fail(error)
} }
} }
} }

View file

@ -181,14 +181,14 @@ public class FlexiMergeTest {
private final CompletionHandling<T> emitOtherOnClose = new CompletionHandling<T>() { private final CompletionHandling<T> emitOtherOnClose = new CompletionHandling<T>() {
@Override @Override
public State<T, T> onComplete(MergeLogicContext<T> ctx, InputHandle input) { public State<T, T> onUpstreamFinish(MergeLogicContext<T> ctx, InputHandle input) {
ctx.changeCompletionHandling(defaultCompletionHandling()); ctx.changeCompletionHandling(defaultCompletionHandling());
return readRemaining(other(input)); return readRemaining(other(input));
} }
@Override @Override
public State<T, T> onError(MergeLogicContext<T> ctx, InputHandle inputHandle, Throwable cause) { public State<T, T> onUpstreamFailure(MergeLogicContext<T> ctx, InputHandle inputHandle, Throwable cause) {
ctx.error(cause); ctx.fail(cause);
return sameState(); return sameState();
} }
}; };

View file

@ -45,12 +45,12 @@ object GraphFlexiMergeSpec {
override def inputHandles(inputCount: Int) = Vector(input1, input2) override def inputHandles(inputCount: Int) = Vector(input1, input2)
val emitOtherOnClose = CompletionHandling( val emitOtherOnClose = CompletionHandling(
onComplete = { (ctx, input) onUpstreamFinish = { (ctx, input)
ctx.changeCompletionHandling(defaultCompletionHandling) ctx.changeCompletionHandling(defaultCompletionHandling)
readRemaining(other(input)) readRemaining(other(input))
}, },
onError = { (ctx, _, cause) onUpstreamFailure = { (ctx, _, cause)
ctx.error(cause) ctx.fail(cause)
SameState SameState
}) })
@ -109,7 +109,7 @@ object GraphFlexiMergeSpec {
class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends FlexiMerge[(A, B, C)] { class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends FlexiMerge[(A, B, C)] {
import FlexiMerge._ import FlexiMerge._
val soonCancelledInput = createInputPort[A]() val ssoonCancelledInputInput = createInputPort[A]()
val stableInput1 = createInputPort[B]() val stableInput1 = createInputPort[B]()
val stableInput2 = createInputPort[C]() val stableInput2 = createInputPort[C]()
@ -117,18 +117,18 @@ class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends
override def inputHandles(inputCount: Int) = { override def inputHandles(inputCount: Int) = {
require(inputCount == 3, s"TripleZip must have 3 connected inputs, was $inputCount") require(inputCount == 3, s"TripleZip must have 3 connected inputs, was $inputCount")
Vector(soonCancelledInput, stableInput1, stableInput2) Vector(ssoonCancelledInputInput, stableInput1, stableInput2)
} }
override def initialState = State[ReadAllInputs](ReadAll(soonCancelledInput, stableInput1, stableInput2)) { override def initialState = State[ReadAllInputs](ReadAll(ssoonCancelledInputInput, stableInput1, stableInput2)) {
case (ctx, input, inputs) case (ctx, input, inputs)
val a = inputs.getOrElse(soonCancelledInput, null) val a = inputs.getOrElse(ssoonCancelledInputInput, null)
val b = inputs.getOrElse(stableInput1, null) val b = inputs.getOrElse(stableInput1, null)
val c = inputs.getOrElse(stableInput2, null) val c = inputs.getOrElse(stableInput2, null)
ctx.emit((a, b, c)) ctx.emit((a, b, c))
if (cancelAfter == 0) if (cancelAfter == 0)
ctx.cancel(soonCancelledInput) ctx.cancel(ssoonCancelledInputInput)
cancelAfter -= 1 cancelAfter -= 1
SameState SameState
@ -149,12 +149,12 @@ class OrderedMerge extends FlexiMerge[Int] {
override def inputHandles(inputCount: Int) = Vector(input1, input2) override def inputHandles(inputCount: Int) = Vector(input1, input2)
val emitOtherOnClose = CompletionHandling( val emitOtherOnClose = CompletionHandling(
onComplete = { (ctx, input) onUpstreamFinish = { (ctx, input)
ctx.changeCompletionHandling(emitLast) ctx.changeCompletionHandling(emitLast)
readRemaining(other(input)) readRemaining(other(input))
}, },
onError = { (ctx, input, cause) onUpstreamFailure = { (ctx, input, cause)
ctx.error(cause) ctx.fail(cause)
SameState SameState
}) })
@ -190,13 +190,13 @@ class OrderedMerge extends FlexiMerge[Int] {
} }
val emitLast = CompletionHandling( val emitLast = CompletionHandling(
onComplete = { (ctx, input) onUpstreamFinish = { (ctx, input)
if (ctx.isDemandAvailable) if (ctx.isDemandAvailable)
ctx.emit(reference) ctx.emit(reference)
SameState SameState
}, },
onError = { (ctx, input, cause) onUpstreamFailure = { (ctx, input, cause)
ctx.error(cause) ctx.fail(cause)
SameState SameState
}) })
@ -237,12 +237,12 @@ class TestMerge extends FlexiMerge[String] {
if (element == "cancel") if (element == "cancel")
ctx.cancel(input) ctx.cancel(input)
else if (element == "err") else if (element == "err")
ctx.error(new RuntimeException("err") with NoStackTrace) ctx.fail(new RuntimeException("err") with NoStackTrace)
else if (element == "exc") else if (element == "exc")
throw new RuntimeException("exc") with NoStackTrace throw new RuntimeException("exc") with NoStackTrace
else if (element == "complete") else if (element == "finish")
ctx.complete() ctx.finish()
else if (element == "onComplete-exc") else if (element == "onUpstreamFinish-exc")
throwFromOnComplete = true throwFromOnComplete = true
else else
ctx.emit("onInput: " + element) ctx.emit("onInput: " + element)
@ -251,17 +251,17 @@ class TestMerge extends FlexiMerge[String] {
} }
override def initialCompletionHandling = CompletionHandling( override def initialCompletionHandling = CompletionHandling(
onComplete = { (ctx, input) onUpstreamFinish = { (ctx, input)
if (throwFromOnComplete) if (throwFromOnComplete)
throw new RuntimeException("onComplete-exc") with NoStackTrace throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
if (ctx.isDemandAvailable) if (ctx.isDemandAvailable)
ctx.emit("onComplete: " + input.portIndex) ctx.emit("onUpstreamFinish: " + input.portIndex)
SameState SameState
}, },
onError = { (ctx, input, cause) onUpstreamFailure = { (ctx, input, cause)
cause match { cause match {
case _: IllegalArgumentException // swallow case _: IllegalArgumentException // swallow
case _ ctx.error(cause) case _ ctx.fail(cause)
} }
SameState SameState
}) })
@ -343,7 +343,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TripleCancellingZip[Long, Int, String] val merge = new TripleCancellingZip[Long, Int, String]
// format: OFF // format: OFF
Source(List(1L, 2L )) ~> merge.soonCancelledInput Source(List(1L, 2L )) ~> merge.ssoonCancelledInputInput
Source(List(1, 2, 3, 4)) ~> merge.stableInput1 Source(List(1, 2, 3, 4)) ~> merge.stableInput1
Source(List("a", "b", "c" )) ~> merge.stableInput2 Source(List("a", "b", "c" )) ~> merge.stableInput2
merge.out ~> output merge.out ~> output
@ -365,7 +365,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TripleCancellingZip[Long, Int, String](cancelAfter = 1) val merge = new TripleCancellingZip[Long, Int, String](cancelAfter = 1)
// format: OFF // format: OFF
Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.soonCancelledInput Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.ssoonCancelledInputInput
Source(List(1, 2, 3, 4 )) ~> merge.stableInput1 Source(List(1, 2, 3, 4 )) ~> merge.stableInput1
Source(List("a", "b", "c" )) ~> merge.stableInput2 Source(List("a", "b", "c" )) ~> merge.stableInput2
merge.out ~> output merge.out ~> output
@ -380,7 +380,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
sub.request(10) sub.request(10)
s.expectNext((1L, 1, "a")) s.expectNext((1L, 1, "a"))
s.expectNext((2L, 2, "b")) s.expectNext((2L, 2, "b"))
// soonCancelledInput is now cancelled and continues with default (null) value // ssoonCancelledInputInput is now cancelled and continues with default (null) value
s.expectNext((null.asInstanceOf[Long], 3, "c")) s.expectNext((null.asInstanceOf[Long], 3, "c"))
s.expectComplete() s.expectComplete()
} }
@ -517,7 +517,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectNext(4) s.expectNext(4)
s2.sendComplete() s2.sendComplete()
// complete when all inputs have completed // finish when all inputs have completed
s.expectComplete() s.expectComplete()
} }
@ -546,16 +546,16 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectNext("onInput: e") s.expectNext("onInput: e")
s.expectNext("onInput: c") s.expectNext("onInput: c")
s.expectNext("onInput: f") s.expectNext("onInput: f")
s.expectNext("onComplete: 2") s.expectNext("onUpstreamFinish: 2")
s.expectNext("onInput: d") s.expectNext("onInput: d")
s.expectNext("onComplete: 1") s.expectNext("onUpstreamFinish: 1")
autoPublisher.sendNext("x") autoPublisher.sendNext("x")
s.expectComplete() s.expectComplete()
} }
"complete when all inputs cancelled" in { "finish when all inputs cancelled" in {
val publisher1 = PublisherProbe[String] val publisher1 = PublisherProbe[String]
val publisher2 = PublisherProbe[String] val publisher2 = PublisherProbe[String]
val publisher3 = PublisherProbe[String] val publisher3 = PublisherProbe[String]
@ -591,7 +591,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectComplete() s.expectComplete()
} }
"handle error" in { "handle failure" in {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1 Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1
@ -608,13 +608,13 @@ class GraphFlexiMergeSpec extends AkkaSpec {
// IllegalArgumentException is swallowed by the CompletionHandler // IllegalArgumentException is swallowed by the CompletionHandler
s.expectNext("onInput: a") s.expectNext("onInput: a")
s.expectNext("onInput: c") s.expectNext("onInput: c")
s.expectNext("onComplete: 2") s.expectNext("onUpstreamFinish: 2")
s.expectNext("onInput: b") s.expectNext("onInput: b")
s.expectNext("onComplete: 1") s.expectNext("onUpstreamFinish: 1")
s.expectComplete() s.expectComplete()
} }
"propagate error" in { "propagate failure" in {
val publisher = PublisherProbe[String] val publisher = PublisherProbe[String]
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
@ -630,7 +630,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectErrorOrSubscriptionFollowedByError().getMessage should be("ERROR") s.expectErrorOrSubscriptionFollowedByError().getMessage should be("ERROR")
} }
"emit error" in { "emit failure" in {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
Source(List("a", "err")) ~> merge.input1 Source(List("a", "err")) ~> merge.input1
@ -649,7 +649,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectError().getMessage should be("err") s.expectError().getMessage should be("err")
} }
"emit error for user thrown exception" in { "emit failure for user thrown exception" in {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
Source(List("a", "exc")) ~> merge.input1 Source(List("a", "exc")) ~> merge.input1
@ -668,10 +668,10 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectError().getMessage should be("exc") s.expectError().getMessage should be("exc")
} }
"emit error for user thrown exception in onComplete" in { "emit failure for user thrown exception in onUpstreamFinish" in {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
Source(List("a", "onComplete-exc")) ~> merge.input1 Source(List("a", "onUpstreamFinish-exc")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
merge.out ~> out1 merge.out ~> out1
@ -684,10 +684,10 @@ class GraphFlexiMergeSpec extends AkkaSpec {
sub.request(10) sub.request(10)
s.expectNext("onInput: a") s.expectNext("onInput: a")
s.expectNext("onInput: b") s.expectNext("onInput: b")
s.expectError().getMessage should be("onComplete-exc") s.expectError().getMessage should be("onUpstreamFinish-exc")
} }
"emit error for user thrown exception in onComplete 2" in { "emit failure for user thrown exception in onUpstreamFinish 2" in {
val publisher = PublisherProbe[String] val publisher = PublisherProbe[String]
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
@ -698,7 +698,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
}.run() }.run()
val autoPublisher = new AutoPublisher(publisher) val autoPublisher = new AutoPublisher(publisher)
autoPublisher.sendNext("onComplete-exc") autoPublisher.sendNext("onUpstreamFinish-exc")
autoPublisher.sendNext("a") autoPublisher.sendNext("a")
val s = SubscriberProbe[String] val s = SubscriberProbe[String]
@ -709,13 +709,13 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectNext("onInput: a") s.expectNext("onInput: a")
autoPublisher.sendComplete() autoPublisher.sendComplete()
s.expectError().getMessage should be("onComplete-exc") s.expectError().getMessage should be("onUpstreamFinish-exc")
} }
"support complete from onInput" in { "support finish from onInput" in {
val m = FlowGraph { implicit b val m = FlowGraph { implicit b
val merge = new TestMerge val merge = new TestMerge
Source(List("a", "complete")) ~> merge.input1 Source(List("a", "finish")) ~> merge.input1
Source(List("b", "c")) ~> merge.input2 Source(List("b", "c")) ~> merge.input2
Source.empty[String] ~> merge.input3 Source.empty[String] ~> merge.input3
merge.out ~> out1 merge.out ~> out1
@ -746,10 +746,10 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(10) sub.request(10)
s.expectNext("onInput: a") s.expectNext("onInput: a")
s.expectNext("onComplete: 0") s.expectNext("onUpstreamFinish: 0")
s.expectNext("onInput: b") s.expectNext("onInput: b")
s.expectNext("onInput: c") s.expectNext("onInput: c")
s.expectNext("onComplete: 1") s.expectNext("onUpstreamFinish: 1")
s.expectComplete() s.expectComplete()
} }

View file

@ -103,15 +103,15 @@ object GraphFlexiRouteSpec {
override def initialState = State[String](DemandFromAny(handles)) { override def initialState = State[String](DemandFromAny(handles)) {
(ctx, preferred, element) (ctx, preferred, element)
if (element == "err") if (element == "err")
ctx.error(new RuntimeException("err") with NoStackTrace) ctx.fail(new RuntimeException("err") with NoStackTrace)
else if (element == "err-output1") else if (element == "err-output1")
ctx.error(output1, new RuntimeException("err-1") with NoStackTrace) ctx.fail(output1, new RuntimeException("err-1") with NoStackTrace)
else if (element == "exc") else if (element == "exc")
throw new RuntimeException("exc") with NoStackTrace throw new RuntimeException("exc") with NoStackTrace
else if (element == "onComplete-exc") else if (element == "onUpstreamFinish-exc")
throwFromOnComplete = true throwFromOnComplete = true
else if (element == "complete") else if (element == "finish")
ctx.complete() ctx.finish()
else else
ctx.emit(preferred, "onInput: " + element) ctx.emit(preferred, "onInput: " + element)
@ -119,15 +119,15 @@ object GraphFlexiRouteSpec {
} }
override def initialCompletionHandling = CompletionHandling( override def initialCompletionHandling = CompletionHandling(
onComplete = { ctx onUpstreamFinish = { ctx
if (throwFromOnComplete) if (throwFromOnComplete)
throw new RuntimeException("onComplete-exc") with NoStackTrace throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
handles.foreach { output handles.foreach { output
if (ctx.isDemandAvailable(output)) if (ctx.isDemandAvailable(output))
ctx.emit(output, "onComplete") ctx.emit(output, "onUpstreamFinish")
} }
}, },
onError = { (ctx, cause) onUpstreamFailure = { (ctx, cause)
cause match { cause match {
case _: IllegalArgumentException // swallow case _: IllegalArgumentException // swallow
case _ case _
@ -137,10 +137,10 @@ object GraphFlexiRouteSpec {
} }
} }
}, },
onCancel = { (ctx, cancelledOutput) onDownstreamFinish = { (ctx, cancelledOutput)
handles.foreach { output handles.foreach { output
if (output != cancelledOutput && ctx.isDemandAvailable(output)) if (output != cancelledOutput && ctx.isDemandAvailable(output))
ctx.emit(output, "onCancel: " + cancelledOutput.portIndex) ctx.emit(output, "onDownstreamFinish: " + cancelledOutput.portIndex)
} }
SameState SameState
}) })
@ -267,11 +267,11 @@ class GraphFlexiRouteSpec extends AkkaSpec {
s2.expectComplete() s2.expectComplete()
} }
"support complete of downstreams and cancel of upstream" in { "support finish of downstreams and cancel of upstream" in {
val fixture = new TestFixture val fixture = new TestFixture
import fixture._ import fixture._
autoPublisher.sendNext("complete") autoPublisher.sendNext("finish")
sub1.request(1) sub1.request(1)
s1.expectNext("onInput: a") s1.expectNext("onInput: a")
@ -316,7 +316,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
s1.expectError().getMessage should be("err-1") s1.expectError().getMessage should be("err-1")
autoPublisher.sendComplete() autoPublisher.sendComplete()
s2.expectNext("onComplete") s2.expectNext("onUpstreamFinish")
s2.expectComplete() s2.expectComplete()
} }
@ -339,7 +339,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
autoPublisher.subscription.expectCancellation() autoPublisher.subscription.expectCancellation()
} }
"emit error for user thrown exception in onComplete" in { "emit error for user thrown exception in onUpstreamFinish" in {
val fixture = new TestFixture val fixture = new TestFixture
import fixture._ import fixture._
@ -350,11 +350,11 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub1.request(5) sub1.request(5)
sub2.request(5) sub2.request(5)
autoPublisher.sendNext("onComplete-exc") autoPublisher.sendNext("onUpstreamFinish-exc")
autoPublisher.sendComplete() autoPublisher.sendComplete()
s1.expectError().getMessage should be("onComplete-exc") s1.expectError().getMessage should be("onUpstreamFinish-exc")
s2.expectError().getMessage should be("onComplete-exc") s2.expectError().getMessage should be("onUpstreamFinish-exc")
} }
"handle cancel from output" in { "handle cancel from output" in {
@ -370,7 +370,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
sub1.cancel() sub1.cancel()
s2.expectNext("onCancel: 0") s2.expectNext("onDownstreamFinish: 0")
s1.expectNoMsg(200.millis) s1.expectNoMsg(200.millis)
autoPublisher.sendNext("c") autoPublisher.sendNext("c")
@ -380,7 +380,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
s2.expectComplete() s2.expectComplete()
} }
"handle complete from upstream input" in { "handle finish from upstream input" in {
val fixture = new TestFixture val fixture = new TestFixture
import fixture._ import fixture._
@ -393,8 +393,8 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
autoPublisher.sendComplete() autoPublisher.sendComplete()
s1.expectNext("onComplete") s1.expectNext("onUpstreamFinish")
s2.expectNext("onComplete") s2.expectNext("onUpstreamFinish")
s1.expectComplete() s1.expectComplete()
s2.expectComplete() s2.expectComplete()
@ -433,7 +433,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub2.request(2) sub2.request(2)
sub1.cancel() sub1.cancel()
s2.expectNext("onCancel: 0") s2.expectNext("onDownstreamFinish: 0")
sub2.cancel() sub2.cancel()
autoPublisher.subscription.expectCancellation() autoPublisher.subscription.expectCancellation()
@ -450,7 +450,7 @@ class GraphFlexiRouteSpec extends AkkaSpec {
sub1.request(2) sub1.request(2)
sub2.request(2) sub2.request(2)
autoPublisher.sendNext("complete") autoPublisher.sendNext("finish")
s1.expectComplete() s1.expectComplete()
s2.expectComplete() s2.expectComplete()
autoPublisher.subscription.expectCancellation() autoPublisher.subscription.expectCancellation()

View file

@ -44,7 +44,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) { override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) {
override def onError(input: Int, e: Throwable): Unit = { override def onError(input: Int, e: Throwable): Unit = {
changeBehavior(try completion.onError(ctx, inputMapping(input), e) changeBehavior(try completion.onUpstreamFailure(ctx, inputMapping(input), e)
catch { catch {
case NonFatal(e) fail(e); mergeLogic.SameState case NonFatal(e) fail(e); mergeLogic.SameState
}) })
@ -63,13 +63,13 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
primaryOutputs.enqueueOutputElement(elem) primaryOutputs.enqueueOutputElement(elem)
} }
override def complete(): Unit = { override def finish(): Unit = {
inputBunch.cancel() inputBunch.cancel()
primaryOutputs.complete() primaryOutputs.complete()
context.stop(self) context.stop(self)
} }
override def error(cause: Throwable): Unit = fail(cause) override def fail(cause: Throwable): Unit = FlexiMergeImpl.this.fail(cause)
override def cancel(input: InputHandle): Unit = inputBunch.cancel(input.portIndex) override def cancel(input: InputHandle): Unit = inputBunch.cancel(input.portIndex)
@ -171,7 +171,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
triggerCompletion(inputHandle) triggerCompletion(inputHandle)
private def triggerCompletion(inputHandle: InputHandle): Unit = private def triggerCompletion(inputHandle: InputHandle): Unit =
changeBehavior(try completion.onComplete(ctx, inputHandle) changeBehavior(try completion.onUpstreamFinish(ctx, inputHandle)
catch { catch {
case NonFatal(e) fail(e); mergeLogic.SameState case NonFatal(e) fail(e); mergeLogic.SameState
}) })

View file

@ -45,7 +45,7 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings,
override protected val outputBunch = new OutputBunch(outputPorts, self, this) { override protected val outputBunch = new OutputBunch(outputPorts, self, this) {
override def onCancel(output: Int): Unit = override def onCancel(output: Int): Unit =
changeBehavior(try completion.onCancel(ctx, outputMapping(output)) changeBehavior(try completion.onDownstreamFinish(ctx, outputMapping(output))
catch { catch {
case NonFatal(e) fail(e); routeLogic.SameState case NonFatal(e) fail(e); routeLogic.SameState
}) })
@ -53,12 +53,12 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings,
override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) { override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) {
override def onError(e: Throwable): Unit = { override def onError(e: Throwable): Unit = {
try completion.onError(ctx, e) catch { case NonFatal(e) fail(e) } try completion.onUpstreamFailure(ctx, e) catch { case NonFatal(e) fail(e) }
fail(e) fail(e)
} }
override def onComplete(): Unit = { override def onComplete(): Unit = {
try completion.onComplete(ctx) catch { case NonFatal(e) fail(e) } try completion.onUpstreamFinish(ctx) catch { case NonFatal(e) fail(e) }
super.onComplete() super.onComplete()
} }
} }
@ -72,18 +72,18 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings,
outputBunch.enqueue(output.portIndex, elem) outputBunch.enqueue(output.portIndex, elem)
} }
override def complete(): Unit = { override def finish(): Unit = {
primaryInputs.cancel() primaryInputs.cancel()
outputBunch.complete() outputBunch.complete()
context.stop(self) context.stop(self)
} }
override def complete(output: OutputHandle): Unit = override def finish(output: OutputHandle): Unit =
outputBunch.complete(output.portIndex) outputBunch.complete(output.portIndex)
override def error(cause: Throwable): Unit = fail(cause) override def fail(cause: Throwable): Unit = FlexiRouteImpl.this.fail(cause)
override def error(output: OutputHandle, cause: Throwable): Unit = override def fail(output: OutputHandle, cause: Throwable): Unit =
outputBunch.error(output.portIndex, cause) outputBunch.error(output.portIndex, cause)
override def changeCompletionHandling(newCompletion: CompletionT): Unit = override def changeCompletionHandling(newCompletion: CompletionT): Unit =

View file

@ -125,12 +125,12 @@ object FlexiMerge {
/** /**
* Complete this stream succesfully. Upstream subscriptions will be cancelled. * Complete this stream succesfully. Upstream subscriptions will be cancelled.
*/ */
def complete(): Unit def finish(): Unit
/** /**
* Complete this stream with failure. Upstream subscriptions will be cancelled. * Complete this stream with failure. Upstream subscriptions will be cancelled.
*/ */
def error(cause: Throwable): Unit def fail(cause: Throwable): Unit
/** /**
* Cancel a specific upstream input stream. * Cancel a specific upstream input stream.
@ -144,21 +144,21 @@ object FlexiMerge {
} }
/** /**
* How to handle completion or error from upstream input. * How to handle completion or failure from upstream input.
* *
* The `onComplete` method is called when an upstream input was completed sucessfully. * The `onUpstreamFinish` method is called when an upstream input was completed sucessfully.
* It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#complete]], * A completion can be propagated downstream with [[MergeLogicContext#finish]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
* *
* The `onError` method is called when an upstream input was completed sucessfully. * The `onUpstreamFailure` method is called when an upstream input was completed with failure.
* It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior.
* An error can be propagated downstream with [[MergeLogicContext#error]], * A failure can be propagated downstream with [[MergeLogicContext#fail]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
*/ */
abstract class CompletionHandling[Out] { abstract class CompletionHandling[Out] {
def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out] def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out]
def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out] def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out]
} }
/** /**
@ -176,7 +176,7 @@ object FlexiMerge {
/** /**
* The possibly stateful logic that reads from input via the defined [[State]] and * The possibly stateful logic that reads from input via the defined [[State]] and
* handles completion and error via the defined [[CompletionHandling]]. * handles completion and failure via the defined [[CompletionHandling]].
* *
* Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]]. * Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]].
*/ */
@ -224,10 +224,10 @@ object FlexiMerge {
*/ */
def defaultCompletionHandling[A]: CompletionHandling[Out] = def defaultCompletionHandling[A]: CompletionHandling[Out] =
new CompletionHandling[Out] { new CompletionHandling[Out] {
override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] =
sameState sameState
override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.error(cause) ctx.fail(cause)
sameState sameState
} }
} }
@ -238,12 +238,12 @@ object FlexiMerge {
*/ */
def eagerClose[A]: CompletionHandling[Out] = def eagerClose[A]: CompletionHandling[Out] =
new CompletionHandling[Out] { new CompletionHandling[Out] {
override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = { override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = {
ctx.complete() ctx.finish()
sameState sameState
} }
override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.error(cause) ctx.fail(cause)
sameState sameState
} }
} }
@ -282,13 +282,13 @@ object FlexiMerge {
private def wrapCompletionHandling( private def wrapCompletionHandling(
delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling = delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling =
CompletionHandling( CompletionHandling(
onComplete = (ctx, inputHandle) { onUpstreamFinish = (ctx, inputHandle) {
val newDelegateState = delegateCompletionHandling.onComplete( val newDelegateState = delegateCompletionHandling.onUpstreamFinish(
new MergeLogicContextWrapper(ctx), asJava(inputHandle)) new MergeLogicContextWrapper(ctx), asJava(inputHandle))
wrapState(newDelegateState) wrapState(newDelegateState)
}, },
onError = (ctx, inputHandle, cause) { onUpstreamFailure = (ctx, inputHandle, cause) {
val newDelegateState = delegateCompletionHandling.onError( val newDelegateState = delegateCompletionHandling.onUpstreamFailure(
new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause) new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause)
wrapState(newDelegateState) wrapState(newDelegateState)
}) })
@ -299,8 +299,8 @@ object FlexiMerge {
class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] { class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] {
override def isDemandAvailable: Boolean = delegate.isDemandAvailable override def isDemandAvailable: Boolean = delegate.isDemandAvailable
override def emit(elem: Out): Unit = delegate.emit(elem) override def emit(elem: Out): Unit = delegate.emit(elem)
override def complete(): Unit = delegate.complete() override def finish(): Unit = delegate.finish()
override def error(cause: Throwable): Unit = delegate.error(cause) override def fail(cause: Throwable): Unit = delegate.fail(cause)
override def cancel(input: InputHandle): Unit = delegate.cancel(input) override def cancel(input: InputHandle): Unit = delegate.cancel(input)
override def changeCompletionHandling(completion: FlexiMerge.CompletionHandling[Out]): Unit = override def changeCompletionHandling(completion: FlexiMerge.CompletionHandling[Out]): Unit =
delegate.changeCompletionHandling(wrapCompletionHandling(completion)) delegate.changeCompletionHandling(wrapCompletionHandling(completion))

View file

@ -95,22 +95,22 @@ object FlexiRoute {
/** /**
* Complete the given downstream successfully. * Complete the given downstream successfully.
*/ */
def complete(output: OutputHandle): Unit def finish(output: OutputHandle): Unit
/** /**
* Complete all downstreams successfully and cancel upstream. * Complete all downstreams successfully and cancel upstream.
*/ */
def complete(): Unit def finish(): Unit
/** /**
* Complete the given downstream with failure. * Complete the given downstream with failure.
*/ */
def error(output: OutputHandle, cause: Throwable): Unit def fail(output: OutputHandle, cause: Throwable): Unit
/** /**
* Complete all downstreams with failure and cancel upstream. * Complete all downstreams with failure and cancel upstream.
*/ */
def error(cause: Throwable): Unit def fail(cause: Throwable): Unit
/** /**
* Replace current [[CompletionHandling]]. * Replace current [[CompletionHandling]].
@ -119,22 +119,22 @@ object FlexiRoute {
} }
/** /**
* How to handle completion or error from upstream input and how to * How to handle completion or failure from upstream input and how to
* handle cancel from downstream output. * handle cancel from downstream output.
* *
* The `onComplete` method is called the upstream input was completed successfully. * The `onUpstreamFinish` method is called the upstream input was completed successfully.
* It returns next behavior or [[#sameState]] to keep current behavior. * It returns next behavior or [[#sameState]] to keep current behavior.
* *
* The `onError` method is called when the upstream input was completed with failure. * The `onUpstreamFailure` method is called when the upstream input was completed with failure.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
* *
* The `onCancel` method is called when a downstream output cancels. * The `onDownstreamFinish` method is called when a downstream output cancels.
* It returns next behavior or [[#sameState]] to keep current behavior. * It returns next behavior or [[#sameState]] to keep current behavior.
*/ */
abstract class CompletionHandling[In] { abstract class CompletionHandling[In] {
def onComplete(ctx: RouteLogicContext[In, Any]): Unit def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit
def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit
def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _]
} }
/** /**
@ -153,7 +153,7 @@ object FlexiRoute {
/** /**
* The possibly stateful logic that reads from the input and enables emitting to downstream * The possibly stateful logic that reads from the input and enables emitting to downstream
* via the defined [[State]]. Handles completion, error and cancel via the defined * via the defined [[State]]. Handles completion, failure and cancel via the defined
* [[CompletionHandling]]. * [[CompletionHandling]].
* *
* Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]]. * Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]].
@ -195,9 +195,9 @@ object FlexiRoute {
*/ */
def defaultCompletionHandling: CompletionHandling[In] = def defaultCompletionHandling: CompletionHandling[In] =
new CompletionHandling[In] { new CompletionHandling[In] {
override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = () override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = ()
override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = ()
override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] =
sameState sameState
} }
@ -207,10 +207,10 @@ object FlexiRoute {
*/ */
def eagerClose[A]: CompletionHandling[In] = def eagerClose[A]: CompletionHandling[In] =
new CompletionHandling[In] { new CompletionHandling[In] {
override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = () override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = ()
override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = ()
override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = { override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = {
ctx.complete() ctx.finish()
sameState sameState
} }
} }
@ -249,14 +249,14 @@ object FlexiRoute {
private def wrapCompletionHandling[Out]( private def wrapCompletionHandling[Out](
delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling = delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling =
CompletionHandling( CompletionHandling(
onComplete = ctx { onUpstreamFinish = ctx {
delegateCompletionHandling.onComplete(new RouteLogicContextWrapper(ctx)) delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(ctx))
}, },
onError = (ctx, cause) { onUpstreamFailure = (ctx, cause) {
delegateCompletionHandling.onError(new RouteLogicContextWrapper(ctx), cause) delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(ctx), cause)
}, },
onCancel = (ctx, outputHandle) { onDownstreamFinish = (ctx, outputHandle) {
val newDelegateState = delegateCompletionHandling.onCancel( val newDelegateState = delegateCompletionHandling.onDownstreamFinish(
new RouteLogicContextWrapper(ctx), asJava(outputHandle)) new RouteLogicContextWrapper(ctx), asJava(outputHandle))
wrapState(newDelegateState) wrapState(newDelegateState)
}) })
@ -267,10 +267,10 @@ object FlexiRoute {
class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] { class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] {
override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output) override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output)
override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem) override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem)
override def complete(): Unit = delegate.complete() override def finish(): Unit = delegate.finish()
override def complete(output: OutputHandle): Unit = delegate.complete(output) override def finish(output: OutputHandle): Unit = delegate.finish(output)
override def error(cause: Throwable): Unit = delegate.error(cause) override def fail(cause: Throwable): Unit = delegate.fail(cause)
override def error(output: OutputHandle, cause: Throwable): Unit = delegate.error(output, cause) override def fail(output: OutputHandle, cause: Throwable): Unit = delegate.fail(output, cause)
override def changeCompletionHandling(completion: FlexiRoute.CompletionHandling[In]): Unit = override def changeCompletionHandling(completion: FlexiRoute.CompletionHandling[In]): Unit =
delegate.changeCompletionHandling(wrapCompletionHandling(completion)) delegate.changeCompletionHandling(wrapCompletionHandling(completion))
} }

View file

@ -109,7 +109,7 @@ object FlexiMerge {
/** /**
* The possibly stateful logic that reads from input via the defined [[MergeLogic#State]] and * The possibly stateful logic that reads from input via the defined [[MergeLogic#State]] and
* handles completion and error via the defined [[MergeLogic#CompletionHandling]]. * handles completion and failure via the defined [[MergeLogic#CompletionHandling]].
* *
* Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]]. * Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]].
*/ */
@ -139,12 +139,12 @@ object FlexiMerge {
/** /**
* Complete this stream successfully. Upstream subscriptions will be cancelled. * Complete this stream successfully. Upstream subscriptions will be cancelled.
*/ */
def complete(): Unit def finish(): Unit
/** /**
* Complete this stream with failure. Upstream subscriptions will be cancelled. * Complete this stream with failure. Upstream subscriptions will be cancelled.
*/ */
def error(cause: Throwable): Unit def fail(cause: Throwable): Unit
/** /**
* Cancel a specific upstream input stream. * Cancel a specific upstream input stream.
@ -184,37 +184,37 @@ object FlexiMerge {
} }
/** /**
* How to handle completion or error from upstream input. * How to handle completion or failure from upstream input.
* *
* The `onComplete` function is called when an upstream input was completed successfully. * The `onUpstreamFinish` function is called when an upstream input was completed successfully.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#complete]], * A completion can be propagated downstream with [[MergeLogicContext#finish]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
* *
* The `onError` function is called when an upstream input was completed with failure. * The `onUpstreamFailure` function is called when an upstream input was completed with failure.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
* An error can be propagated downstream with [[MergeLogicContext#error]], * A failure can be propagated downstream with [[MergeLogicContext#fail]],
* or it can be swallowed to continue with remaining inputs. * or it can be swallowed to continue with remaining inputs.
*/ */
sealed case class CompletionHandling( sealed case class CompletionHandling(
onComplete: (MergeLogicContext, InputHandle) State[_], onUpstreamFinish: (MergeLogicContext, InputHandle) State[_],
onError: (MergeLogicContext, InputHandle, Throwable) State[_]) onUpstreamFailure: (MergeLogicContext, InputHandle, Throwable) State[_])
/** /**
* Will continue to operate until a read becomes unsatisfiable, then it completes. * Will continue to operate until a read becomes unsatisfiable, then it completes.
* Errors are immediately propagated. * Errors are immediately propagated.
*/ */
val defaultCompletionHandling: CompletionHandling = CompletionHandling( val defaultCompletionHandling: CompletionHandling = CompletionHandling(
onComplete = (_, _) SameState, onUpstreamFinish = (_, _) SameState,
onError = (ctx, _, cause) { ctx.error(cause); SameState }) onUpstreamFailure = (ctx, _, cause) { ctx.fail(cause); SameState })
/** /**
* Completes as soon as any input completes. * Completes as soon as any input completes.
* Errors are immediately propagated. * Errors are immediately propagated.
*/ */
def eagerClose: CompletionHandling = CompletionHandling( def eagerClose: CompletionHandling = CompletionHandling(
onComplete = (ctx, _) { ctx.complete(); SameState }, onUpstreamFinish = (ctx, _) { ctx.finish(); SameState },
onError = (ctx, _, cause) { ctx.error(cause); SameState }) onUpstreamFailure = (ctx, _, cause) { ctx.fail(cause); SameState })
} }
} }

View file

@ -74,7 +74,7 @@ object FlexiRoute {
/** /**
* The possibly stateful logic that reads from the input and enables emitting to downstream * The possibly stateful logic that reads from the input and enables emitting to downstream
* via the defined [[State]]. Handles completion, error and cancel via the defined * via the defined [[State]]. Handles completion, failure and cancel via the defined
* [[CompletionHandling]]. * [[CompletionHandling]].
* *
* Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]]. * Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]].
@ -105,22 +105,22 @@ object FlexiRoute {
/** /**
* Complete the given downstream successfully. * Complete the given downstream successfully.
*/ */
def complete(output: OutputHandle): Unit def finish(output: OutputHandle): Unit
/** /**
* Complete all downstreams successfully and cancel upstream. * Complete all downstreams successfully and cancel upstream.
*/ */
def complete(): Unit def finish(): Unit
/** /**
* Complete the given downstream with failure. * Complete the given downstream with failure.
*/ */
def error(output: OutputHandle, cause: Throwable): Unit def fail(output: OutputHandle, cause: Throwable): Unit
/** /**
* Complete all downstreams with failure and cancel upstream. * Complete all downstreams with failure and cancel upstream.
*/ */
def error(cause: Throwable): Unit def fail(cause: Throwable): Unit
/** /**
* Replace current [[CompletionHandling]]. * Replace current [[CompletionHandling]].
@ -156,38 +156,38 @@ object FlexiRoute {
} }
/** /**
* How to handle completion or error from upstream input and how to * How to handle completion or failure from upstream input and how to
* handle cancel from downstream output. * handle cancel from downstream output.
* *
* The `onComplete` function is called the upstream input was completed successfully. * The `onUpstreamFinish` function is called the upstream input was completed successfully.
* *
* The `onError` function is called when the upstream input was completed with failure. * The `onUpstreamFailure` function is called when the upstream input was completed with failure.
* *
* The `onCancel` function is called when a downstream output cancels. * The `onDownstreamFinish` function is called when a downstream output cancels.
* It returns next behavior or [[#SameState]] to keep current behavior. * It returns next behavior or [[#SameState]] to keep current behavior.
*/ */
sealed case class CompletionHandling( sealed case class CompletionHandling(
onComplete: RouteLogicContext[Any] Unit, onUpstreamFinish: RouteLogicContext[Any] Unit,
onError: (RouteLogicContext[Any], Throwable) Unit, onUpstreamFailure: (RouteLogicContext[Any], Throwable) Unit,
onCancel: (RouteLogicContext[Any], OutputHandle) State[_]) onDownstreamFinish: (RouteLogicContext[Any], OutputHandle) State[_])
/** /**
* When an output cancels it continues with remaining outputs. * When an output cancels it continues with remaining outputs.
* Error or completion from upstream are immediately propagated. * Error or completion from upstream are immediately propagated.
*/ */
val defaultCompletionHandling: CompletionHandling = CompletionHandling( val defaultCompletionHandling: CompletionHandling = CompletionHandling(
onComplete = _ (), onUpstreamFinish = _ (),
onError = (ctx, cause) (), onUpstreamFailure = (ctx, cause) (),
onCancel = (ctx, _) SameState) onDownstreamFinish = (ctx, _) SameState)
/** /**
* Completes as soon as any output cancels. * Completes as soon as any output cancels.
* Error or completion from upstream are immediately propagated. * Error or completion from upstream are immediately propagated.
*/ */
val eagerClose: CompletionHandling = CompletionHandling( val eagerClose: CompletionHandling = CompletionHandling(
onComplete = _ (), onUpstreamFinish = _ (),
onError = (ctx, cause) (), onUpstreamFailure = (ctx, cause) (),
onCancel = (ctx, _) { ctx.complete(); SameState }) onDownstreamFinish = (ctx, _) { ctx.finish(); SameState })
} }