From 8505e4935a013c69278522ce151c14669c9197ed Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 27 Jan 2015 11:32:43 +0100 Subject: [PATCH] !str #6577 Align API of Flexi with Stage * error -> fail * complete -> finish * onComplete - onUpstreamFinish * onError - onUpstreamFailure * onCancel - onDownstreamFinish --- .../scala/code/docs/stream/FlexiDocSpec.scala | 24 ++--- .../akka/http/engine/client/HttpClient.scala | 30 +++--- .../akka/http/engine/server/HttpServer.scala | 12 +-- .../akka/stream/javadsl/FlexiMergeTest.java | 6 +- .../stream/scaladsl/GraphFlexiMergeSpec.scala | 92 +++++++++---------- .../stream/scaladsl/GraphFlexiRouteSpec.scala | 48 +++++----- .../akka/stream/impl/FlexiMergeImpl.scala | 8 +- .../akka/stream/impl/FlexiRouteImpl.scala | 14 +-- .../akka/stream/javadsl/FlexiMerge.scala | 46 +++++----- .../akka/stream/javadsl/FlexiRoute.scala | 58 ++++++------ .../akka/stream/scaladsl/FlexiMerge.scala | 28 +++--- .../akka/stream/scaladsl/FlexiRoute.scala | 36 ++++---- 12 files changed, 201 insertions(+), 201 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala index df20a65c2b..e230a48045 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala @@ -127,10 +127,10 @@ class FlexiDocSpec extends AkkaSpec { override def initialCompletionHandling = CompletionHandling( - onComplete = (ctx, input) => input match { + onUpstreamFinish = (ctx, input) => input match { case `important` => log.info("Important input completed, shutting down.") - ctx.complete() + ctx.finish() SameState case replica => @@ -141,9 +141,9 @@ class FlexiDocSpec extends AkkaSpec { ctx.changeCompletionHandling(eagerClose) SameState }, - onError = (ctx, input, cause) => input match { + onUpstreamFailure = (ctx, input, cause) => input match { case `important` => - ctx.error(cause) + ctx.fail(cause) SameState case replica => @@ -281,13 +281,13 @@ class FlexiDocSpec extends AkkaSpec { override def initialCompletionHandling = CompletionHandling( // upstream: - onComplete = (ctx) => (), - onError = (ctx, thr) => (), + onUpstreamFinish = (ctx) => (), + onUpstreamFailure = (ctx, thr) => (), // downstream: - onCancel = (ctx, output) => output match { + onDownstreamFinish = (ctx, output) => output match { case `important` => - // complete all downstreams, and cancel the upstream - ctx.complete() + // finish all downstreams, and cancel the upstream + ctx.finish() SameState case _ => SameState @@ -336,9 +336,9 @@ class FlexiDocSpec extends AkkaSpec { } val signalStatusOnTermination = CompletionHandling( - onComplete = ctx => drainBuffer(ctx), - onError = (ctx, cause) => drainBuffer(ctx), - onCancel = (_, _) => SameState) + onUpstreamFinish = ctx => drainBuffer(ctx), + onUpstreamFailure = (ctx, cause) => drainBuffer(ctx), + onDownstreamFinish = (_, _) => SameState) //#flexiroute-completion-upstream-completed-signalling override def initialCompletionHandling = signalStatusOnTermination diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala index 86d3098d3c..93385f4314 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala @@ -42,12 +42,12 @@ private[http] object HttpClient { /* Basic Stream Setup ================== - + requestIn +----------+ - +-----------------------------------------------+--->| Termi- | requestRendering + +-----------------------------------------------+--->| Termi- | requestRendering | | nation +---------------------> | +-------------------------------------->| Merge | | - | Termination Backchannel | +----------+ | TCP- + | Termination Backchannel | +----------+ | TCP- | | | level | | Method | client | +------------+ | Bypass | flow @@ -122,13 +122,13 @@ private[http] object HttpClient { } override def initialCompletionHandling = CompletionHandling( - onComplete = { + onUpstreamFinish = { case (ctx, `requestInput`) ⇒ SameState case (ctx, `terminationBackchannelInput`) ⇒ - ctx.complete() + ctx.finish() SameState }, - onError = defaultCompletionHandling.onError) + onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure) } } @@ -163,7 +163,7 @@ private[http] object HttpClient { } private val gotoInitial = (ctx: MergeLogicContext) ⇒ { if (methodBypassCompleted) { - ctx.complete() + ctx.finish() SameState } else { ctx.changeCompletionHandling(initialCompletionHandling) @@ -199,7 +199,7 @@ private[http] object HttpClient { onNeedNextMethod(ctx) case StreamEnd ⇒ emit(b.result()) - ctx.complete() + ctx.finish() SameState case NeedMoreData ⇒ emit(b.result()) @@ -209,25 +209,25 @@ private[http] object HttpClient { } override val initialCompletionHandling = CompletionHandling( - onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, - onError = defaultCompletionHandling.onError) + onUpstreamFinish = (ctx, _) ⇒ { ctx.finish(); SameState }, + onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure) val responseReadingCompletionHandling = CompletionHandling( - onComplete = { + onUpstreamFinish = { case (ctx, `methodBypassInput`) ⇒ methodBypassCompleted = true SameState case (ctx, `dataInput`) ⇒ if (parser.onUpstreamFinish()) { - ctx.complete() + ctx.finish() } else { // not pretty but because the FlexiMerge doesn't let us emit from here (#16565) // we need to funnel the error through the error channel - ctx.error(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput])) + ctx.fail(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput])) } SameState }, - onError = defaultCompletionHandling.onError) + onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure) } } @@ -251,4 +251,4 @@ private[http] object HttpClient { } () ⇒ stage } -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 7b125f196d..4e6f46d1e4 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -151,15 +151,15 @@ private[http] object HttpServer { } val waitingForApplicationResponseCompletionHandling = CompletionHandling( - onComplete = { + onUpstreamFinish = { case (ctx, `bypassInput`) ⇒ { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState } - case (ctx, _) ⇒ { ctx.complete(); SameState } + case (ctx, _) ⇒ { ctx.finish(); SameState } }, - onError = { + onUpstreamFailure = { case (ctx, _, EntityStreamException(errorInfo)) ⇒ // the application has forwarded a request entity stream error to the response stream finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo) - case (ctx, _, error) ⇒ { ctx.error(error); SameState } + case (ctx, _, error) ⇒ { ctx.fail(error); SameState } }) def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = { @@ -170,7 +170,7 @@ private[http] object HttpServer { } def finish(ctx: MergeLogicContext): State[Any] = { - ctx.complete() // shouldn't this return a `State` rather than `Unit`? + ctx.finish() // shouldn't this return a `State` rather than `Unit`? SameState // it seems weird to stay in the same state after completion } } @@ -234,4 +234,4 @@ private[http] object HttpServer { case _ ⇒ ctx.fail(error) } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java index 44233e82a8..234e4bfdef 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java @@ -181,14 +181,14 @@ public class FlexiMergeTest { private final CompletionHandling emitOtherOnClose = new CompletionHandling() { @Override - public State onComplete(MergeLogicContext ctx, InputHandle input) { + public State onUpstreamFinish(MergeLogicContext ctx, InputHandle input) { ctx.changeCompletionHandling(defaultCompletionHandling()); return readRemaining(other(input)); } @Override - public State onError(MergeLogicContext ctx, InputHandle inputHandle, Throwable cause) { - ctx.error(cause); + public State onUpstreamFailure(MergeLogicContext ctx, InputHandle inputHandle, Throwable cause) { + ctx.fail(cause); return sameState(); } }; diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index 4856e44e9e..75bbc9e5ab 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -45,12 +45,12 @@ object GraphFlexiMergeSpec { override def inputHandles(inputCount: Int) = Vector(input1, input2) val emitOtherOnClose = CompletionHandling( - onComplete = { (ctx, input) ⇒ + onUpstreamFinish = { (ctx, input) ⇒ ctx.changeCompletionHandling(defaultCompletionHandling) readRemaining(other(input)) }, - onError = { (ctx, _, cause) ⇒ - ctx.error(cause) + onUpstreamFailure = { (ctx, _, cause) ⇒ + ctx.fail(cause) SameState }) @@ -109,7 +109,7 @@ object GraphFlexiMergeSpec { class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends FlexiMerge[(A, B, C)] { import FlexiMerge._ - val soonCancelledInput = createInputPort[A]() + val ssoonCancelledInputInput = createInputPort[A]() val stableInput1 = createInputPort[B]() val stableInput2 = createInputPort[C]() @@ -117,18 +117,18 @@ class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue) extends override def inputHandles(inputCount: Int) = { require(inputCount == 3, s"TripleZip must have 3 connected inputs, was $inputCount") - Vector(soonCancelledInput, stableInput1, stableInput2) + Vector(ssoonCancelledInputInput, stableInput1, stableInput2) } - override def initialState = State[ReadAllInputs](ReadAll(soonCancelledInput, stableInput1, stableInput2)) { + override def initialState = State[ReadAllInputs](ReadAll(ssoonCancelledInputInput, stableInput1, stableInput2)) { case (ctx, input, inputs) ⇒ - val a = inputs.getOrElse(soonCancelledInput, null) + val a = inputs.getOrElse(ssoonCancelledInputInput, null) val b = inputs.getOrElse(stableInput1, null) val c = inputs.getOrElse(stableInput2, null) ctx.emit((a, b, c)) if (cancelAfter == 0) - ctx.cancel(soonCancelledInput) + ctx.cancel(ssoonCancelledInputInput) cancelAfter -= 1 SameState @@ -149,12 +149,12 @@ class OrderedMerge extends FlexiMerge[Int] { override def inputHandles(inputCount: Int) = Vector(input1, input2) val emitOtherOnClose = CompletionHandling( - onComplete = { (ctx, input) ⇒ + onUpstreamFinish = { (ctx, input) ⇒ ctx.changeCompletionHandling(emitLast) readRemaining(other(input)) }, - onError = { (ctx, input, cause) ⇒ - ctx.error(cause) + onUpstreamFailure = { (ctx, input, cause) ⇒ + ctx.fail(cause) SameState }) @@ -190,13 +190,13 @@ class OrderedMerge extends FlexiMerge[Int] { } val emitLast = CompletionHandling( - onComplete = { (ctx, input) ⇒ + onUpstreamFinish = { (ctx, input) ⇒ if (ctx.isDemandAvailable) ctx.emit(reference) SameState }, - onError = { (ctx, input, cause) ⇒ - ctx.error(cause) + onUpstreamFailure = { (ctx, input, cause) ⇒ + ctx.fail(cause) SameState }) @@ -237,12 +237,12 @@ class TestMerge extends FlexiMerge[String] { if (element == "cancel") ctx.cancel(input) else if (element == "err") - ctx.error(new RuntimeException("err") with NoStackTrace) + ctx.fail(new RuntimeException("err") with NoStackTrace) else if (element == "exc") throw new RuntimeException("exc") with NoStackTrace - else if (element == "complete") - ctx.complete() - else if (element == "onComplete-exc") + else if (element == "finish") + ctx.finish() + else if (element == "onUpstreamFinish-exc") throwFromOnComplete = true else ctx.emit("onInput: " + element) @@ -251,17 +251,17 @@ class TestMerge extends FlexiMerge[String] { } override def initialCompletionHandling = CompletionHandling( - onComplete = { (ctx, input) ⇒ + onUpstreamFinish = { (ctx, input) ⇒ if (throwFromOnComplete) - throw new RuntimeException("onComplete-exc") with NoStackTrace + throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace if (ctx.isDemandAvailable) - ctx.emit("onComplete: " + input.portIndex) + ctx.emit("onUpstreamFinish: " + input.portIndex) SameState }, - onError = { (ctx, input, cause) ⇒ + onUpstreamFailure = { (ctx, input, cause) ⇒ cause match { case _: IllegalArgumentException ⇒ // swallow - case _ ⇒ ctx.error(cause) + case _ ⇒ ctx.fail(cause) } SameState }) @@ -343,7 +343,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { val m = FlowGraph { implicit b ⇒ val merge = new TripleCancellingZip[Long, Int, String] // format: OFF - Source(List(1L, 2L )) ~> merge.soonCancelledInput + Source(List(1L, 2L )) ~> merge.ssoonCancelledInputInput Source(List(1, 2, 3, 4)) ~> merge.stableInput1 Source(List("a", "b", "c" )) ~> merge.stableInput2 merge.out ~> output @@ -365,7 +365,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { val m = FlowGraph { implicit b ⇒ val merge = new TripleCancellingZip[Long, Int, String](cancelAfter = 1) // format: OFF - Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.soonCancelledInput + Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.ssoonCancelledInputInput Source(List(1, 2, 3, 4 )) ~> merge.stableInput1 Source(List("a", "b", "c" )) ~> merge.stableInput2 merge.out ~> output @@ -380,7 +380,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { sub.request(10) s.expectNext((1L, 1, "a")) s.expectNext((2L, 2, "b")) - // soonCancelledInput is now cancelled and continues with default (null) value + // ssoonCancelledInputInput is now cancelled and continues with default (null) value s.expectNext((null.asInstanceOf[Long], 3, "c")) s.expectComplete() } @@ -517,7 +517,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectNext(4) s2.sendComplete() - // complete when all inputs have completed + // finish when all inputs have completed s.expectComplete() } @@ -546,16 +546,16 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectNext("onInput: e") s.expectNext("onInput: c") s.expectNext("onInput: f") - s.expectNext("onComplete: 2") + s.expectNext("onUpstreamFinish: 2") s.expectNext("onInput: d") - s.expectNext("onComplete: 1") + s.expectNext("onUpstreamFinish: 1") autoPublisher.sendNext("x") s.expectComplete() } - "complete when all inputs cancelled" in { + "finish when all inputs cancelled" in { val publisher1 = PublisherProbe[String] val publisher2 = PublisherProbe[String] val publisher3 = PublisherProbe[String] @@ -591,7 +591,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } - "handle error" in { + "handle failure" in { val m = FlowGraph { implicit b ⇒ val merge = new TestMerge Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.input1 @@ -608,13 +608,13 @@ class GraphFlexiMergeSpec extends AkkaSpec { // IllegalArgumentException is swallowed by the CompletionHandler s.expectNext("onInput: a") s.expectNext("onInput: c") - s.expectNext("onComplete: 2") + s.expectNext("onUpstreamFinish: 2") s.expectNext("onInput: b") - s.expectNext("onComplete: 1") + s.expectNext("onUpstreamFinish: 1") s.expectComplete() } - "propagate error" in { + "propagate failure" in { val publisher = PublisherProbe[String] val m = FlowGraph { implicit b ⇒ val merge = new TestMerge @@ -630,7 +630,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectErrorOrSubscriptionFollowedByError().getMessage should be("ERROR") } - "emit error" in { + "emit failure" in { val m = FlowGraph { implicit b ⇒ val merge = new TestMerge Source(List("a", "err")) ~> merge.input1 @@ -649,7 +649,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("err") } - "emit error for user thrown exception" in { + "emit failure for user thrown exception" in { val m = FlowGraph { implicit b ⇒ val merge = new TestMerge Source(List("a", "exc")) ~> merge.input1 @@ -668,10 +668,10 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectError().getMessage should be("exc") } - "emit error for user thrown exception in onComplete" in { + "emit failure for user thrown exception in onUpstreamFinish" in { val m = FlowGraph { implicit b ⇒ val merge = new TestMerge - Source(List("a", "onComplete-exc")) ~> merge.input1 + Source(List("a", "onUpstreamFinish-exc")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 Source.empty[String] ~> merge.input3 merge.out ~> out1 @@ -684,10 +684,10 @@ class GraphFlexiMergeSpec extends AkkaSpec { sub.request(10) s.expectNext("onInput: a") s.expectNext("onInput: b") - s.expectError().getMessage should be("onComplete-exc") + s.expectError().getMessage should be("onUpstreamFinish-exc") } - "emit error for user thrown exception in onComplete 2" in { + "emit failure for user thrown exception in onUpstreamFinish 2" in { val publisher = PublisherProbe[String] val m = FlowGraph { implicit b ⇒ val merge = new TestMerge @@ -698,7 +698,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { }.run() val autoPublisher = new AutoPublisher(publisher) - autoPublisher.sendNext("onComplete-exc") + autoPublisher.sendNext("onUpstreamFinish-exc") autoPublisher.sendNext("a") val s = SubscriberProbe[String] @@ -709,13 +709,13 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectNext("onInput: a") autoPublisher.sendComplete() - s.expectError().getMessage should be("onComplete-exc") + s.expectError().getMessage should be("onUpstreamFinish-exc") } - "support complete from onInput" in { + "support finish from onInput" in { val m = FlowGraph { implicit b ⇒ val merge = new TestMerge - Source(List("a", "complete")) ~> merge.input1 + Source(List("a", "finish")) ~> merge.input1 Source(List("b", "c")) ~> merge.input2 Source.empty[String] ~> merge.input3 merge.out ~> out1 @@ -746,10 +746,10 @@ class GraphFlexiMergeSpec extends AkkaSpec { val sub = s.expectSubscription() sub.request(10) s.expectNext("onInput: a") - s.expectNext("onComplete: 0") + s.expectNext("onUpstreamFinish: 0") s.expectNext("onInput: b") s.expectNext("onInput: c") - s.expectNext("onComplete: 1") + s.expectNext("onUpstreamFinish: 1") s.expectComplete() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala index f64b71d131..a97219d401 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala @@ -103,15 +103,15 @@ object GraphFlexiRouteSpec { override def initialState = State[String](DemandFromAny(handles)) { (ctx, preferred, element) ⇒ if (element == "err") - ctx.error(new RuntimeException("err") with NoStackTrace) + ctx.fail(new RuntimeException("err") with NoStackTrace) else if (element == "err-output1") - ctx.error(output1, new RuntimeException("err-1") with NoStackTrace) + ctx.fail(output1, new RuntimeException("err-1") with NoStackTrace) else if (element == "exc") throw new RuntimeException("exc") with NoStackTrace - else if (element == "onComplete-exc") + else if (element == "onUpstreamFinish-exc") throwFromOnComplete = true - else if (element == "complete") - ctx.complete() + else if (element == "finish") + ctx.finish() else ctx.emit(preferred, "onInput: " + element) @@ -119,15 +119,15 @@ object GraphFlexiRouteSpec { } override def initialCompletionHandling = CompletionHandling( - onComplete = { ctx ⇒ + onUpstreamFinish = { ctx ⇒ if (throwFromOnComplete) - throw new RuntimeException("onComplete-exc") with NoStackTrace + throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace handles.foreach { output ⇒ if (ctx.isDemandAvailable(output)) - ctx.emit(output, "onComplete") + ctx.emit(output, "onUpstreamFinish") } }, - onError = { (ctx, cause) ⇒ + onUpstreamFailure = { (ctx, cause) ⇒ cause match { case _: IllegalArgumentException ⇒ // swallow case _ ⇒ @@ -137,10 +137,10 @@ object GraphFlexiRouteSpec { } } }, - onCancel = { (ctx, cancelledOutput) ⇒ + onDownstreamFinish = { (ctx, cancelledOutput) ⇒ handles.foreach { output ⇒ if (output != cancelledOutput && ctx.isDemandAvailable(output)) - ctx.emit(output, "onCancel: " + cancelledOutput.portIndex) + ctx.emit(output, "onDownstreamFinish: " + cancelledOutput.portIndex) } SameState }) @@ -267,11 +267,11 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "support complete of downstreams and cancel of upstream" in { + "support finish of downstreams and cancel of upstream" in { val fixture = new TestFixture import fixture._ - autoPublisher.sendNext("complete") + autoPublisher.sendNext("finish") sub1.request(1) s1.expectNext("onInput: a") @@ -316,7 +316,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s1.expectError().getMessage should be("err-1") autoPublisher.sendComplete() - s2.expectNext("onComplete") + s2.expectNext("onUpstreamFinish") s2.expectComplete() } @@ -339,7 +339,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { autoPublisher.subscription.expectCancellation() } - "emit error for user thrown exception in onComplete" in { + "emit error for user thrown exception in onUpstreamFinish" in { val fixture = new TestFixture import fixture._ @@ -350,11 +350,11 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub1.request(5) sub2.request(5) - autoPublisher.sendNext("onComplete-exc") + autoPublisher.sendNext("onUpstreamFinish-exc") autoPublisher.sendComplete() - s1.expectError().getMessage should be("onComplete-exc") - s2.expectError().getMessage should be("onComplete-exc") + s1.expectError().getMessage should be("onUpstreamFinish-exc") + s2.expectError().getMessage should be("onUpstreamFinish-exc") } "handle cancel from output" in { @@ -370,7 +370,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) sub1.cancel() - s2.expectNext("onCancel: 0") + s2.expectNext("onDownstreamFinish: 0") s1.expectNoMsg(200.millis) autoPublisher.sendNext("c") @@ -380,7 +380,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { s2.expectComplete() } - "handle complete from upstream input" in { + "handle finish from upstream input" in { val fixture = new TestFixture import fixture._ @@ -393,8 +393,8 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) autoPublisher.sendComplete() - s1.expectNext("onComplete") - s2.expectNext("onComplete") + s1.expectNext("onUpstreamFinish") + s2.expectNext("onUpstreamFinish") s1.expectComplete() s2.expectComplete() @@ -433,7 +433,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub2.request(2) sub1.cancel() - s2.expectNext("onCancel: 0") + s2.expectNext("onDownstreamFinish: 0") sub2.cancel() autoPublisher.subscription.expectCancellation() @@ -450,7 +450,7 @@ class GraphFlexiRouteSpec extends AkkaSpec { sub1.request(2) sub2.request(2) - autoPublisher.sendNext("complete") + autoPublisher.sendNext("finish") s1.expectComplete() s2.expectComplete() autoPublisher.subscription.expectCancellation() diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index c84fe9f789..e4a32943dc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -44,7 +44,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, override protected val inputBunch = new FanIn.InputBunch(inputPorts, settings.maxInputBufferSize, this) { override def onError(input: Int, e: Throwable): Unit = { - changeBehavior(try completion.onError(ctx, inputMapping(input), e) + changeBehavior(try completion.onUpstreamFailure(ctx, inputMapping(input), e) catch { case NonFatal(e) ⇒ fail(e); mergeLogic.SameState }) @@ -63,13 +63,13 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, primaryOutputs.enqueueOutputElement(elem) } - override def complete(): Unit = { + override def finish(): Unit = { inputBunch.cancel() primaryOutputs.complete() context.stop(self) } - override def error(cause: Throwable): Unit = fail(cause) + override def fail(cause: Throwable): Unit = FlexiMergeImpl.this.fail(cause) override def cancel(input: InputHandle): Unit = inputBunch.cancel(input.portIndex) @@ -171,7 +171,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, triggerCompletion(inputHandle) private def triggerCompletion(inputHandle: InputHandle): Unit = - changeBehavior(try completion.onComplete(ctx, inputHandle) + changeBehavior(try completion.onUpstreamFinish(ctx, inputHandle) catch { case NonFatal(e) ⇒ fail(e); mergeLogic.SameState }) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala index 02773cffd3..f71ba133db 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala @@ -45,7 +45,7 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings, override protected val outputBunch = new OutputBunch(outputPorts, self, this) { override def onCancel(output: Int): Unit = - changeBehavior(try completion.onCancel(ctx, outputMapping(output)) + changeBehavior(try completion.onDownstreamFinish(ctx, outputMapping(output)) catch { case NonFatal(e) ⇒ fail(e); routeLogic.SameState }) @@ -53,12 +53,12 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings, override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) { override def onError(e: Throwable): Unit = { - try completion.onError(ctx, e) catch { case NonFatal(e) ⇒ fail(e) } + try completion.onUpstreamFailure(ctx, e) catch { case NonFatal(e) ⇒ fail(e) } fail(e) } override def onComplete(): Unit = { - try completion.onComplete(ctx) catch { case NonFatal(e) ⇒ fail(e) } + try completion.onUpstreamFinish(ctx) catch { case NonFatal(e) ⇒ fail(e) } super.onComplete() } } @@ -72,18 +72,18 @@ private[akka] class FlexiRouteImpl(_settings: MaterializerSettings, outputBunch.enqueue(output.portIndex, elem) } - override def complete(): Unit = { + override def finish(): Unit = { primaryInputs.cancel() outputBunch.complete() context.stop(self) } - override def complete(output: OutputHandle): Unit = + override def finish(output: OutputHandle): Unit = outputBunch.complete(output.portIndex) - override def error(cause: Throwable): Unit = fail(cause) + override def fail(cause: Throwable): Unit = FlexiRouteImpl.this.fail(cause) - override def error(output: OutputHandle, cause: Throwable): Unit = + override def fail(output: OutputHandle, cause: Throwable): Unit = outputBunch.error(output.portIndex, cause) override def changeCompletionHandling(newCompletion: CompletionT): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index aafe839b17..6b24496dee 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -125,12 +125,12 @@ object FlexiMerge { /** * Complete this stream succesfully. Upstream subscriptions will be cancelled. */ - def complete(): Unit + def finish(): Unit /** * Complete this stream with failure. Upstream subscriptions will be cancelled. */ - def error(cause: Throwable): Unit + def fail(cause: Throwable): Unit /** * Cancel a specific upstream input stream. @@ -144,21 +144,21 @@ object FlexiMerge { } /** - * How to handle completion or error from upstream input. + * How to handle completion or failure from upstream input. * - * The `onComplete` method is called when an upstream input was completed sucessfully. + * The `onUpstreamFinish` method is called when an upstream input was completed sucessfully. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. - * A completion can be propagated downstream with [[MergeLogicContext#complete]], + * A completion can be propagated downstream with [[MergeLogicContext#finish]], * or it can be swallowed to continue with remaining inputs. * - * The `onError` method is called when an upstream input was completed sucessfully. + * The `onUpstreamFailure` method is called when an upstream input was completed with failure. * It returns next behavior or [[MergeLogic#sameState]] to keep current behavior. - * An error can be propagated downstream with [[MergeLogicContext#error]], + * A failure can be propagated downstream with [[MergeLogicContext#fail]], * or it can be swallowed to continue with remaining inputs. */ abstract class CompletionHandling[Out] { - def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out] - def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out] + def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out] + def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out] } /** @@ -176,7 +176,7 @@ object FlexiMerge { /** * The possibly stateful logic that reads from input via the defined [[State]] and - * handles completion and error via the defined [[CompletionHandling]]. + * handles completion and failure via the defined [[CompletionHandling]]. * * Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]]. */ @@ -224,10 +224,10 @@ object FlexiMerge { */ def defaultCompletionHandling[A]: CompletionHandling[Out] = new CompletionHandling[Out] { - override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = + override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = sameState - override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { - ctx.error(cause) + override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { + ctx.fail(cause) sameState } } @@ -238,12 +238,12 @@ object FlexiMerge { */ def eagerClose[A]: CompletionHandling[Out] = new CompletionHandling[Out] { - override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = { - ctx.complete() + override def onUpstreamFinish(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = { + ctx.finish() sameState } - override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { - ctx.error(cause) + override def onUpstreamFailure(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { + ctx.fail(cause) sameState } } @@ -282,13 +282,13 @@ object FlexiMerge { private def wrapCompletionHandling( delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling = CompletionHandling( - onComplete = (ctx, inputHandle) ⇒ { - val newDelegateState = delegateCompletionHandling.onComplete( + onUpstreamFinish = (ctx, inputHandle) ⇒ { + val newDelegateState = delegateCompletionHandling.onUpstreamFinish( new MergeLogicContextWrapper(ctx), asJava(inputHandle)) wrapState(newDelegateState) }, - onError = (ctx, inputHandle, cause) ⇒ { - val newDelegateState = delegateCompletionHandling.onError( + onUpstreamFailure = (ctx, inputHandle, cause) ⇒ { + val newDelegateState = delegateCompletionHandling.onUpstreamFailure( new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause) wrapState(newDelegateState) }) @@ -299,8 +299,8 @@ object FlexiMerge { class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] { override def isDemandAvailable: Boolean = delegate.isDemandAvailable override def emit(elem: Out): Unit = delegate.emit(elem) - override def complete(): Unit = delegate.complete() - override def error(cause: Throwable): Unit = delegate.error(cause) + override def finish(): Unit = delegate.finish() + override def fail(cause: Throwable): Unit = delegate.fail(cause) override def cancel(input: InputHandle): Unit = delegate.cancel(input) override def changeCompletionHandling(completion: FlexiMerge.CompletionHandling[Out]): Unit = delegate.changeCompletionHandling(wrapCompletionHandling(completion)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala index c08e1490f5..f294a9c29e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -95,22 +95,22 @@ object FlexiRoute { /** * Complete the given downstream successfully. */ - def complete(output: OutputHandle): Unit + def finish(output: OutputHandle): Unit /** * Complete all downstreams successfully and cancel upstream. */ - def complete(): Unit + def finish(): Unit /** * Complete the given downstream with failure. */ - def error(output: OutputHandle, cause: Throwable): Unit + def fail(output: OutputHandle, cause: Throwable): Unit /** * Complete all downstreams with failure and cancel upstream. */ - def error(cause: Throwable): Unit + def fail(cause: Throwable): Unit /** * Replace current [[CompletionHandling]]. @@ -119,22 +119,22 @@ object FlexiRoute { } /** - * How to handle completion or error from upstream input and how to + * How to handle completion or failure from upstream input and how to * handle cancel from downstream output. * - * The `onComplete` method is called the upstream input was completed successfully. + * The `onUpstreamFinish` method is called the upstream input was completed successfully. * It returns next behavior or [[#sameState]] to keep current behavior. * - * The `onError` method is called when the upstream input was completed with failure. + * The `onUpstreamFailure` method is called when the upstream input was completed with failure. * It returns next behavior or [[#SameState]] to keep current behavior. * - * The `onCancel` method is called when a downstream output cancels. + * The `onDownstreamFinish` method is called when a downstream output cancels. * It returns next behavior or [[#sameState]] to keep current behavior. */ abstract class CompletionHandling[In] { - def onComplete(ctx: RouteLogicContext[In, Any]): Unit - def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit - def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] + def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit + def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit + def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] } /** @@ -153,7 +153,7 @@ object FlexiRoute { /** * The possibly stateful logic that reads from the input and enables emitting to downstream - * via the defined [[State]]. Handles completion, error and cancel via the defined + * via the defined [[State]]. Handles completion, failure and cancel via the defined * [[CompletionHandling]]. * * Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]]. @@ -195,9 +195,9 @@ object FlexiRoute { */ def defaultCompletionHandling: CompletionHandling[In] = new CompletionHandling[In] { - override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = () - override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () - override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = + override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = () + override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () + override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = sameState } @@ -207,10 +207,10 @@ object FlexiRoute { */ def eagerClose[A]: CompletionHandling[In] = new CompletionHandling[In] { - override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = () - override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () - override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = { - ctx.complete() + override def onUpstreamFinish(ctx: RouteLogicContext[In, Any]): Unit = () + override def onUpstreamFailure(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () + override def onDownstreamFinish(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = { + ctx.finish() sameState } } @@ -249,14 +249,14 @@ object FlexiRoute { private def wrapCompletionHandling[Out]( delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling = CompletionHandling( - onComplete = ctx ⇒ { - delegateCompletionHandling.onComplete(new RouteLogicContextWrapper(ctx)) + onUpstreamFinish = ctx ⇒ { + delegateCompletionHandling.onUpstreamFinish(new RouteLogicContextWrapper(ctx)) }, - onError = (ctx, cause) ⇒ { - delegateCompletionHandling.onError(new RouteLogicContextWrapper(ctx), cause) + onUpstreamFailure = (ctx, cause) ⇒ { + delegateCompletionHandling.onUpstreamFailure(new RouteLogicContextWrapper(ctx), cause) }, - onCancel = (ctx, outputHandle) ⇒ { - val newDelegateState = delegateCompletionHandling.onCancel( + onDownstreamFinish = (ctx, outputHandle) ⇒ { + val newDelegateState = delegateCompletionHandling.onDownstreamFinish( new RouteLogicContextWrapper(ctx), asJava(outputHandle)) wrapState(newDelegateState) }) @@ -267,10 +267,10 @@ object FlexiRoute { class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] { override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output) override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem) - override def complete(): Unit = delegate.complete() - override def complete(output: OutputHandle): Unit = delegate.complete(output) - override def error(cause: Throwable): Unit = delegate.error(cause) - override def error(output: OutputHandle, cause: Throwable): Unit = delegate.error(output, cause) + override def finish(): Unit = delegate.finish() + override def finish(output: OutputHandle): Unit = delegate.finish(output) + override def fail(cause: Throwable): Unit = delegate.fail(cause) + override def fail(output: OutputHandle, cause: Throwable): Unit = delegate.fail(output, cause) override def changeCompletionHandling(completion: FlexiRoute.CompletionHandling[In]): Unit = delegate.changeCompletionHandling(wrapCompletionHandling(completion)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index 3920b6619b..c60ef898f8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -109,7 +109,7 @@ object FlexiMerge { /** * The possibly stateful logic that reads from input via the defined [[MergeLogic#State]] and - * handles completion and error via the defined [[MergeLogic#CompletionHandling]]. + * handles completion and failure via the defined [[MergeLogic#CompletionHandling]]. * * Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]]. */ @@ -139,12 +139,12 @@ object FlexiMerge { /** * Complete this stream successfully. Upstream subscriptions will be cancelled. */ - def complete(): Unit + def finish(): Unit /** * Complete this stream with failure. Upstream subscriptions will be cancelled. */ - def error(cause: Throwable): Unit + def fail(cause: Throwable): Unit /** * Cancel a specific upstream input stream. @@ -184,37 +184,37 @@ object FlexiMerge { } /** - * How to handle completion or error from upstream input. + * How to handle completion or failure from upstream input. * - * The `onComplete` function is called when an upstream input was completed successfully. + * The `onUpstreamFinish` function is called when an upstream input was completed successfully. * It returns next behavior or [[#SameState]] to keep current behavior. - * A completion can be propagated downstream with [[MergeLogicContext#complete]], + * A completion can be propagated downstream with [[MergeLogicContext#finish]], * or it can be swallowed to continue with remaining inputs. * - * The `onError` function is called when an upstream input was completed with failure. + * The `onUpstreamFailure` function is called when an upstream input was completed with failure. * It returns next behavior or [[#SameState]] to keep current behavior. - * An error can be propagated downstream with [[MergeLogicContext#error]], + * A failure can be propagated downstream with [[MergeLogicContext#fail]], * or it can be swallowed to continue with remaining inputs. */ sealed case class CompletionHandling( - onComplete: (MergeLogicContext, InputHandle) ⇒ State[_], - onError: (MergeLogicContext, InputHandle, Throwable) ⇒ State[_]) + onUpstreamFinish: (MergeLogicContext, InputHandle) ⇒ State[_], + onUpstreamFailure: (MergeLogicContext, InputHandle, Throwable) ⇒ State[_]) /** * Will continue to operate until a read becomes unsatisfiable, then it completes. * Errors are immediately propagated. */ val defaultCompletionHandling: CompletionHandling = CompletionHandling( - onComplete = (_, _) ⇒ SameState, - onError = (ctx, _, cause) ⇒ { ctx.error(cause); SameState }) + onUpstreamFinish = (_, _) ⇒ SameState, + onUpstreamFailure = (ctx, _, cause) ⇒ { ctx.fail(cause); SameState }) /** * Completes as soon as any input completes. * Errors are immediately propagated. */ def eagerClose: CompletionHandling = CompletionHandling( - onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, - onError = (ctx, _, cause) ⇒ { ctx.error(cause); SameState }) + onUpstreamFinish = (ctx, _) ⇒ { ctx.finish(); SameState }, + onUpstreamFailure = (ctx, _, cause) ⇒ { ctx.fail(cause); SameState }) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 260296d555..aa796db99b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -74,7 +74,7 @@ object FlexiRoute { /** * The possibly stateful logic that reads from the input and enables emitting to downstream - * via the defined [[State]]. Handles completion, error and cancel via the defined + * via the defined [[State]]. Handles completion, failure and cancel via the defined * [[CompletionHandling]]. * * Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]]. @@ -105,22 +105,22 @@ object FlexiRoute { /** * Complete the given downstream successfully. */ - def complete(output: OutputHandle): Unit + def finish(output: OutputHandle): Unit /** * Complete all downstreams successfully and cancel upstream. */ - def complete(): Unit + def finish(): Unit /** * Complete the given downstream with failure. */ - def error(output: OutputHandle, cause: Throwable): Unit + def fail(output: OutputHandle, cause: Throwable): Unit /** * Complete all downstreams with failure and cancel upstream. */ - def error(cause: Throwable): Unit + def fail(cause: Throwable): Unit /** * Replace current [[CompletionHandling]]. @@ -156,38 +156,38 @@ object FlexiRoute { } /** - * How to handle completion or error from upstream input and how to + * How to handle completion or failure from upstream input and how to * handle cancel from downstream output. * - * The `onComplete` function is called the upstream input was completed successfully. + * The `onUpstreamFinish` function is called the upstream input was completed successfully. * - * The `onError` function is called when the upstream input was completed with failure. + * The `onUpstreamFailure` function is called when the upstream input was completed with failure. * - * The `onCancel` function is called when a downstream output cancels. + * The `onDownstreamFinish` function is called when a downstream output cancels. * It returns next behavior or [[#SameState]] to keep current behavior. */ sealed case class CompletionHandling( - onComplete: RouteLogicContext[Any] ⇒ Unit, - onError: (RouteLogicContext[Any], Throwable) ⇒ Unit, - onCancel: (RouteLogicContext[Any], OutputHandle) ⇒ State[_]) + onUpstreamFinish: RouteLogicContext[Any] ⇒ Unit, + onUpstreamFailure: (RouteLogicContext[Any], Throwable) ⇒ Unit, + onDownstreamFinish: (RouteLogicContext[Any], OutputHandle) ⇒ State[_]) /** * When an output cancels it continues with remaining outputs. * Error or completion from upstream are immediately propagated. */ val defaultCompletionHandling: CompletionHandling = CompletionHandling( - onComplete = _ ⇒ (), - onError = (ctx, cause) ⇒ (), - onCancel = (ctx, _) ⇒ SameState) + onUpstreamFinish = _ ⇒ (), + onUpstreamFailure = (ctx, cause) ⇒ (), + onDownstreamFinish = (ctx, _) ⇒ SameState) /** * Completes as soon as any output cancels. * Error or completion from upstream are immediately propagated. */ val eagerClose: CompletionHandling = CompletionHandling( - onComplete = _ ⇒ (), - onError = (ctx, cause) ⇒ (), - onCancel = (ctx, _) ⇒ { ctx.complete(); SameState }) + onUpstreamFinish = _ ⇒ (), + onUpstreamFailure = (ctx, cause) ⇒ (), + onDownstreamFinish = (ctx, _) ⇒ { ctx.finish(); SameState }) }