diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala index 77c8dc0e58..04b8105bba 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala @@ -3,10 +3,11 @@ */ package akka.stream.scaladsl -import akka.pattern +import akka.{ NotUsed, pattern } import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.testkit.Utils.TE import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ @@ -33,6 +34,32 @@ class FlowScanAsyncSpec extends StreamSpec { .expectNextOrComplete(0) } + "complete after zero-element has been consumed" in { + val (pub, sub) = + TestSource.probe[Int] + .via(Flow[Int].scanAsync(0)((acc, in) ⇒ Future.successful(acc + in))) + .toMat(TestSink.probe)(Keep.both) + .run() + + sub.request(10) + sub.expectNext(0) + pub.sendComplete() + sub.expectComplete() + } + + "fail after zero-element has been consumed" in { + val (pub, sub) = + TestSource.probe[Int] + .via(Flow[Int].scanAsync(0)((acc, in) ⇒ Future.successful(acc + in))) + .toMat(TestSink.probe)(Keep.both) + .run() + + sub.request(10) + sub.expectNext(0) + pub.sendError(TE("bang")) + sub.expectError(TE("bang")) + } + "work with a single source" in { Source.single(1) .via(sumScanFlow) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9e1342d9a8..98a6d28dde 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -432,7 +432,8 @@ private[stream] object Collect { private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler { - override def onPush(): Unit = () + override def onPush(): Unit = + throw new IllegalStateException("No push should happen before zero value has been consumed") override def onPull(): Unit = { push(out, current) @@ -507,7 +508,16 @@ private[stream] object Collect { } } - override def onUpstreamFinish(): Unit = {} + override def onUpstreamFinish(): Unit = { + if (current == zero) { + eventualCurrent.value match { + case Some(Success(`zero`)) ⇒ + // #24036 upstream completed without emitting anything but after zero was emitted downstream + completeStage() + case _ ⇒ // in all other cases we will get a complete when the future completes + } + } + } override val toString: String = s"ScanAsync.Logic(completed=${eventualCurrent.isCompleted})" }