diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index bea3be6b91..babee44add 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -102,14 +102,17 @@ class GraphConcatSpec extends TwoStreamsSetup { subscriber1.expectSubscriptionAndError(TestException) val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) - subscriber2.expectSubscriptionAndError(TestException) + subscriber2.expectSubscription().request(5) + + var errorSignalled = false + if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(1, TestException).isLeft + if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(2, TestException).isLeft + if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(3, TestException).isLeft + if (!errorSignalled) errorSignalled ||= subscriber2.expectNextOrError(4, TestException).isLeft + if (!errorSignalled) subscriber2.expectError(TestException) } "work with one nonempty and one delayed failed publisher" in assertAllStagesStopped { - // This test and the next one are materialization order dependent and rely on the fact - // that there are only 3 submodules in the graph that gets created and that an immutable - // set (what they are stored in internally) of size 4 or less is an optimized version that - // traverses in insertion order val subscriber = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) subscriber.expectSubscription().request(5) @@ -118,23 +121,12 @@ class GraphConcatSpec extends TwoStreamsSetup { if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft - if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) + if (!errorSignalled) subscriber.expectError(TestException) } "work with one delayed failed and one nonempty publisher" in assertAllStagesStopped { - // This test and the previous one are materialization order dependent and rely on the fact - // that there are only 3 submodules in the graph that gets created and that an immutable - // set (what they are stored in internally) of size 4 or less is an optimized version that - // traverses in insertion order val subscriber = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) - subscriber.expectSubscription().request(5) - - var errorSignalled = false - if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(1, TestException).isLeft - if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft - if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft - if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft - if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) + subscriber.expectSubscriptionAndError(TestException) } "correctly handle async errors in secondary upstream" in assertAllStagesStopped {