From a299644b21dfee105b9f2019f08278d3de489f0f Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 5 Jul 2016 00:00:52 -0400 Subject: [PATCH] =str 20843 Scan failure should flow downstream ASAP --- .../src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala | 3 +-- .../src/main/scala/akka/stream/impl/fusing/Ops.scala | 6 ------ 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 1c7c71c824..04e377d9f8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -71,10 +71,9 @@ class FlowScanSpec extends AkkaSpec { .expectNext(0) .expectComplete() } - "fail after emitting first element when upsrteam failed" in { + "fail when upstream failed" in { Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) .request(2) - .expectNext(0) .expectError(TE("")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6e46eabf08..47157464f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -324,12 +324,6 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex completeStage() } }) - override def onUpstreamFailure(ex: Throwable): Unit = setHandler(out, new OutHandler { - override def onPull(): Unit = { - push(out, aggregator) - failStage(ex) - } - }) }) override def onPull(): Unit = pull(in)