From 3157b0199b32b98bfef8c110e7e0f37d02566766 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 3 Mar 2020 17:17:50 +0100 Subject: [PATCH] MapAsync wouldn't complete when upstream does in all scenarios #28657 --- .../stream/scaladsl/FlowMapAsyncSpec.scala | 37 +++++++++++++++++++ .../scaladsl/FlowMapAsyncUnorderedSpec.scala | 35 ++++++++++++++++++ .../scala/akka/stream/impl/fusing/Ops.scala | 21 ++++++----- 3 files changed, 84 insertions(+), 9 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 53ffaa012d..7ca062ea17 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -295,6 +295,43 @@ class FlowMapAsyncSpec extends StreamSpec { 3.seconds) should ===("happy!") } + "complete without requiring further demand (parallelism = 1)" in assertAllStagesStopped { + import system.dispatcher + Source + .single(1) + .mapAsync(1)(v => Future { Thread.sleep(20); v }) + .runWith(TestSink.probe[Int]) + .request(1) + .expectNext(1) + .expectComplete() + } + + "complete without requiring further demand with already completed future (parallelism = 1)" in assertAllStagesStopped { + Source + .single(1) + .mapAsync(1)(v => Future.successful(v)) + .runWith(TestSink.probe[Int]) + .request(1) + .expectNext(1) + .expectComplete() + } + + "complete without requiring further demand (parallelism = 2)" in assertAllStagesStopped { + import system.dispatcher + val probe = + Source(1 :: 2 :: Nil).mapAsync(2)(v => Future { Thread.sleep(20); v }).runWith(TestSink.probe[Int]) + + probe.request(2).expectNextN(2) + probe.expectComplete() + } + + "complete without requiring further demand with already completed future (parallelism = 2)" in assertAllStagesStopped { + val probe = Source(1 :: 2 :: Nil).mapAsync(2)(v => Future.successful(v)).runWith(TestSink.probe[Int]) + + probe.request(2).expectNextN(2) + probe.expectComplete() + } + "finish after future failure" in assertAllStagesStopped { import system.dispatcher Await.result( diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index f9e26b90e4..7280800e3c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -52,6 +52,41 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec { c.expectComplete() } + "complete without requiring further demand (parallelism = 1)" in assertAllStagesStopped { + import system.dispatcher + Source + .single(1) + .mapAsyncUnordered(1)(v => Future { Thread.sleep(20); v }) + .runWith(TestSink.probe[Int]) + .requestNext(1) + .expectComplete() + } + + "complete without requiring further demand with already completed future (parallelism = 1)" in assertAllStagesStopped { + Source + .single(1) + .mapAsyncUnordered(1)(v => Future.successful(v)) + .runWith(TestSink.probe[Int]) + .requestNext(1) + .expectComplete() + } + + "complete without requiring further demand (parallelism = 2)" in assertAllStagesStopped { + import system.dispatcher + val probe = + Source(1 :: 2 :: Nil).mapAsyncUnordered(2)(v => Future { Thread.sleep(20); v }).runWith(TestSink.probe[Int]) + + probe.request(2).expectNextN(2) + probe.expectComplete() + } + + "complete without requiring further demand with already completed future (parallelism = 2)" in assertAllStagesStopped { + val probe = Source(1 :: 2 :: Nil).mapAsyncUnordered(2)(v => Future.successful(v)).runWith(TestSink.probe[Int]) + + probe.request(2).expectNextN(2) + probe.expectComplete() + } + "not run more futures than requested elements" in { val probe = TestProbe() val c = TestSubscriber.manualProbe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 355ccfa37a..5dff353598 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1315,10 +1315,8 @@ private[stream] object Collect { @tailrec private def pushNextIfPossible(): Unit = - if (buffer.isEmpty) { - if (isClosed(in)) completeStage() - else pullIfNeeded() - } else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order + if (buffer.isEmpty) pullIfNeeded() + else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order else if (isAvailable(out)) { val holder = buffer.dequeue() holder.elem match { @@ -1343,7 +1341,9 @@ private[stream] object Collect { } private def pullIfNeeded(): Unit = { - if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in) + if (isClosed(in) && buffer.isEmpty) completeStage() + else if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in) + // else already pulled and waiting for next element } setHandlers(in, out, this) @@ -1378,19 +1378,21 @@ private[stream] object Collect { override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes) def futureCompleted(result: Try[Out]): Unit = { + def isCompleted = isClosed(in) && todo == 0 inFlight -= 1 result match { case Success(elem) if elem != null => if (isAvailable(out)) { if (!hasBeenPulled(in)) tryPull(in) push(out, elem) + if (isCompleted) completeStage() } else buffer.enqueue(elem) case Success(null) => - if (isClosed(in) && todo == 0) completeStage() + if (isCompleted) completeStage() else if (!hasBeenPulled(in)) tryPull(in) case Failure(ex) => if (decider(ex) == Supervision.Stop) failStage(ex) - else if (isClosed(in) && todo == 0) completeStage() + else if (isCompleted) completeStage() else if (!hasBeenPulled(in)) tryPull(in) } } @@ -1418,9 +1420,10 @@ private[stream] object Collect { override def onPull(): Unit = { if (!buffer.isEmpty) push(out, buffer.dequeue()) - else if (isClosed(in) && todo == 0) completeStage() - if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) + val leftTodo = todo + if (isClosed(in) && leftTodo == 0) completeStage() + else if (leftTodo < parallelism && !hasBeenPulled(in)) tryPull(in) } setHandlers(in, out, this)