diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 144c9dbb96..04e377d9f8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -64,5 +64,17 @@ class FlowScanSpec extends AkkaSpec { Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe) .toStrict(1.second) should ===(Seq(0, 1, 4, 9, 16)) } + + "scan normally for empty source" in { + Source.empty[Int].scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) + .request(2) + .expectNext(0) + .expectComplete() + } + "fail when upstream failed" in { + Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int]) + .request(2) + .expectError(TE("")) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 32e64cc9d1..413915914d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -315,7 +315,16 @@ final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta setHandlers(in, out, self) } }) - setHandler(in, totallyIgnorantInput) + + setHandler(in, new InHandler { + override def onPush(): Unit = () + override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler { + override def onPull(): Unit = { + push(out, aggregator) + completeStage() + } + }) + }) override def onPull(): Unit = pull(in) override def onPush(): Unit = {