!str #6577 Align API of Flexi with Stage
* error -> fail * complete -> finish * onComplete - onUpstreamFinish * onError - onUpstreamFailure * onCancel - onDownstreamFinish
This commit is contained in:
parent
1b73e09e2e
commit
8505e4935a
12 changed files with 201 additions and 201 deletions
|
|
@ -127,10 +127,10 @@ class FlexiDocSpec extends AkkaSpec {
|
|||
|
||||
override def initialCompletionHandling =
|
||||
CompletionHandling(
|
||||
onComplete = (ctx, input) => input match {
|
||||
onUpstreamFinish = (ctx, input) => input match {
|
||||
case `important` =>
|
||||
log.info("Important input completed, shutting down.")
|
||||
ctx.complete()
|
||||
ctx.finish()
|
||||
SameState
|
||||
|
||||
case replica =>
|
||||
|
|
@ -141,9 +141,9 @@ class FlexiDocSpec extends AkkaSpec {
|
|||
ctx.changeCompletionHandling(eagerClose)
|
||||
SameState
|
||||
},
|
||||
onError = (ctx, input, cause) => input match {
|
||||
onUpstreamFailure = (ctx, input, cause) => input match {
|
||||
case `important` =>
|
||||
ctx.error(cause)
|
||||
ctx.fail(cause)
|
||||
SameState
|
||||
|
||||
case replica =>
|
||||
|
|
@ -281,13 +281,13 @@ class FlexiDocSpec extends AkkaSpec {
|
|||
override def initialCompletionHandling =
|
||||
CompletionHandling(
|
||||
// upstream:
|
||||
onComplete = (ctx) => (),
|
||||
onError = (ctx, thr) => (),
|
||||
onUpstreamFinish = (ctx) => (),
|
||||
onUpstreamFailure = (ctx, thr) => (),
|
||||
// downstream:
|
||||
onCancel = (ctx, output) => output match {
|
||||
onDownstreamFinish = (ctx, output) => output match {
|
||||
case `important` =>
|
||||
// complete all downstreams, and cancel the upstream
|
||||
ctx.complete()
|
||||
// finish all downstreams, and cancel the upstream
|
||||
ctx.finish()
|
||||
SameState
|
||||
case _ =>
|
||||
SameState
|
||||
|
|
@ -336,9 +336,9 @@ class FlexiDocSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
val signalStatusOnTermination = CompletionHandling(
|
||||
onComplete = ctx => drainBuffer(ctx),
|
||||
onError = (ctx, cause) => drainBuffer(ctx),
|
||||
onCancel = (_, _) => SameState)
|
||||
onUpstreamFinish = ctx => drainBuffer(ctx),
|
||||
onUpstreamFailure = (ctx, cause) => drainBuffer(ctx),
|
||||
onDownstreamFinish = (_, _) => SameState)
|
||||
//#flexiroute-completion-upstream-completed-signalling
|
||||
|
||||
override def initialCompletionHandling = signalStatusOnTermination
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue