fix GraphConcatSpec
This commit is contained in:
parent
026e4f0078
commit
62bf71ccca
1 changed files with 10 additions and 18 deletions
|
|
@ -102,14 +102,17 @@ class GraphConcatSpec extends TwoStreamsSetup {
|
|||
subscriber1.expectSubscriptionAndError(TestException)
|
||||
|
||||
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
|
||||
subscriber2.expectSubscriptionAndError(TestException)
|
||||
subscriber2.expectSubscription().request(5)
|
||||
|
||||
var errorSignalled = false
|
||||
if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(1, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(2, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(3, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(4, TestException).isLeft
|
||||
if (!errorSignalled) subscriber2.expectError(TestException)
|
||||
}
|
||||
|
||||
"work with one nonempty and one delayed failed publisher" in assertAllStagesStopped {
|
||||
// This test and the next one are materialization order dependent and rely on the fact
|
||||
// that there are only 3 submodules in the graph that gets created and that an immutable
|
||||
// set (what they are stored in internally) of size 4 or less is an optimized version that
|
||||
// traverses in insertion order
|
||||
val subscriber = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
|
||||
subscriber.expectSubscription().request(5)
|
||||
|
||||
|
|
@ -118,23 +121,12 @@ class GraphConcatSpec extends TwoStreamsSetup {
|
|||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft
|
||||
if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException)
|
||||
if (!errorSignalled) subscriber.expectError(TestException)
|
||||
}
|
||||
|
||||
"work with one delayed failed and one nonempty publisher" in assertAllStagesStopped {
|
||||
// This test and the previous one are materialization order dependent and rely on the fact
|
||||
// that there are only 3 submodules in the graph that gets created and that an immutable
|
||||
// set (what they are stored in internally) of size 4 or less is an optimized version that
|
||||
// traverses in insertion order
|
||||
val subscriber = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
|
||||
subscriber.expectSubscription().request(5)
|
||||
|
||||
var errorSignalled = false
|
||||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(1, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft
|
||||
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft
|
||||
if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException)
|
||||
subscriber.expectSubscriptionAndError(TestException)
|
||||
}
|
||||
|
||||
"correctly handle async errors in secondary upstream" in assertAllStagesStopped {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue