diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala index 5eb567274e..77c8dc0e58 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala @@ -30,8 +30,7 @@ class FlowScanAsyncSpec extends StreamSpec { .via(sumScanFlow) .runWith(TestSink.probe[Int]) .request(1) - .expectNext(0) - .expectComplete() + .expectNextOrComplete(0) } "work with a single source" in { @@ -73,7 +72,8 @@ class FlowScanAsyncSpec extends StreamSpec { Source.failed[Int](expected) .via(sumScanFlow) .runWith(TestSink.probe[Int]) - .expectSubscriptionAndError(expected) + .request(2) + .expectNextOrError(0, expected) } "with the restarting decider" should { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 76bedd0eaf..e28a9600aa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -71,10 +71,14 @@ class FlowScanSpec extends StreamSpec { .expectNext(0) .expectComplete() } + "fail when upstream failed" in { - Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) + val ex = TE("") + Source.failed[Int](ex) + .scan(0) { case (a, b) ⇒ a + b } + .runWith(TestSink.probe[Int]) .request(2) - .expectError(TE("")) + .expectNextOrError(0, ex) } } }