diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 1c7c71c824..04e377d9f8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -71,10 +71,9 @@ class FlowScanSpec extends AkkaSpec { .expectNext(0) .expectComplete() } - "fail after emitting first element when upsrteam failed" in { + "fail when upstream failed" in { Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) .request(2) - .expectNext(0) .expectError(TE("")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6e46eabf08..47157464f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -324,12 +324,6 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex completeStage() } }) - override def onUpstreamFailure(ex: Throwable): Unit = setHandler(out, new OutHandler { - override def onPull(): Unit = { - push(out, aggregator) - failStage(ex) - } - }) }) override def onPull(): Unit = pull(in)