From f679e141681908f945a283bfcedc64f60e777733 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Tue, 5 Jul 2016 00:00:52 -0400 Subject: [PATCH 1/2] =str 20843 Scan.in cannot pull closed port --- .../akka/stream/scaladsl/FlowScanSpec.scala | 13 +++++++++++++ .../scala/akka/stream/impl/fusing/Ops.scala | 17 ++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 144c9dbb96..1c7c71c824 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -64,5 +64,18 @@ class FlowScanSpec extends AkkaSpec { Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe) .toStrict(1.second) should ===(Seq(0, 1, 4, 9, 16)) } + + "scan normally for empty source" in { + Source.empty[Int].scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) + .request(2) + .expectNext(0) + .expectComplete() + } + "fail after emitting first element when upsrteam failed" in { + Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) + .request(2) + .expectNext(0) + .expectError(TE("")) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 30cd433fb5..6e46eabf08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -315,7 +315,22 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex setHandlers(in, out, self) } }) - setHandler(in, totallyIgnorantInput) + + setHandler(in, new InHandler { + override def onPush(): Unit = () + override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler { + override def onPull(): Unit = { + push(out, aggregator) + completeStage() + } + }) + override def onUpstreamFailure(ex: Throwable): Unit = setHandler(out, new OutHandler { + override def onPull(): Unit = { + push(out, aggregator) + failStage(ex) + } + }) + }) override def onPull(): Unit = pull(in) override def onPush(): Unit = { From a299644b21dfee105b9f2019f08278d3de489f0f Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 5 Jul 2016 00:00:52 -0400 Subject: [PATCH 2/2] =str 20843 Scan failure should flow downstream ASAP --- .../src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala | 3 +-- .../src/main/scala/akka/stream/impl/fusing/Ops.scala | 6 ------ 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 1c7c71c824..04e377d9f8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -71,10 +71,9 @@ class FlowScanSpec extends AkkaSpec { .expectNext(0) .expectComplete() } - "fail after emitting first element when upsrteam failed" in { + "fail when upstream failed" in { Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) .request(2) - .expectNext(0) .expectError(TE("")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6e46eabf08..47157464f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -324,12 +324,6 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex completeStage() } }) - override def onUpstreamFailure(ex: Throwable): Unit = setHandler(out, new OutHandler { - override def onPull(): Unit = { - push(out, aggregator) - failStage(ex) - } - }) }) override def onPull(): Unit = pull(in)