Propagate downstream cancellation failures (#27547)

* Add doc clarification about IOResult on sources
* Better error message for IOOperationIncompleteException

Co-Authored-By: Johannes Rudolph <johannes.rudolph@gmail.com>
This commit is contained in:
Johan Andrén 2019-09-05 20:55:48 +02:00 committed by Patrik Nordwall
parent 4020036eb6
commit ed955e0da4
40 changed files with 345 additions and 145 deletions

View file

@ -64,7 +64,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(OnNext(5)))
downstream.cancel()
lastEvents() should be(Set(Cancel))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)))
}
"work with only boundary ops" in new OneBoundedSetup[Int]() {
@ -127,7 +127,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)))
}
"implement take" in new OneBoundedSetup[Int](takeTwo) {
@ -144,7 +144,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1), Cancel, OnComplete))
lastEvents() should be(Set(OnNext(1), Cancel(SubscriptionWithCancelException.StageWasCompleted), OnComplete))
}
"implement take inside a chain" in new OneBoundedSetup[Int](
@ -167,7 +167,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(Cancel, OnComplete, OnNext(3)))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.StageWasCompleted), OnComplete, OnNext(3)))
}
"implement fold" in new OneBoundedSetup[Int](Fold(0, (agg: Int, x: Int) => agg + x)) {
@ -206,7 +206,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)))
}
"work if fold completes while not in a push position" in new OneBoundedSetup[Int](
@ -273,7 +273,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(OnNext(4), RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)))
}
"implement expand" in new OneBoundedSetup[Int](new Expand(Iterator.continually(_: Int))) {
@ -330,7 +330,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(OnNext(4), RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)))
}
@ -395,7 +395,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(OnNext(2)))
downstream.cancel()
lastEvents() should be(Set(Cancel))
lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)))
}
"implement doubler-conflate (doubler-batch)" in new OneBoundedSetup[Int](
@ -518,7 +518,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1), OnComplete, Cancel))
lastEvents() should be(Set(OnNext(1), OnComplete, Cancel(SubscriptionWithCancelException.StageWasCompleted)))
}