diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 98290ff466..b2adba74d8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -171,6 +171,21 @@ class FlowMapAsyncSpec extends AkkaSpec { c.expectComplete() } + "resume after multiple failures" in assertAllStagesStopped { + val futures: List[Future[String]] = List( + Future.failed(Utils.TE("failure1")), + Future.failed(Utils.TE("failure2")), + Future.failed(Utils.TE("failure3")), + Future.failed(Utils.TE("failure4")), + Future.failed(Utils.TE("failure5")), + Future.successful("happy!")) + + Await.result( + Source(futures) + .mapAsync(2)(identity).withAttributes(supervisionStrategy(resumingDecider)) + .runWith(Sink.head), 3.seconds) should ===("happy!") + } + "finish after future failure" in assertAllStagesStopped { import system.dispatcher Await.result(Source(1 to 3).mapAsync(1)(n ⇒ Future { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index d0faed508a..08bb4b4549 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -122,6 +122,21 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { .expectComplete() } + "resume after multiple failures" in assertAllStagesStopped { + val futures: List[Future[String]] = List( + Future.failed(Utils.TE("failure1")), + Future.failed(Utils.TE("failure2")), + Future.failed(Utils.TE("failure3")), + Future.failed(Utils.TE("failure4")), + Future.failed(Utils.TE("failure5")), + Future.successful("happy!")) + + Await.result( + Source(futures) + .mapAsyncUnordered(2)(identity).withAttributes(supervisionStrategy(resumingDecider)) + .runWith(Sink.head), 3.seconds) should ===("happy!") + } + "finish after future failure" in assertAllStagesStopped { import system.dispatcher Await.result(Source(1 to 3).mapAsyncUnordered(1)(n ⇒ Future { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9a5a01dc7b..bc38bb0eb0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -396,25 +396,27 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut } override def onPull(ctx: AsyncContext[Out, (Int, Try[Out])]) = { - @tailrec def rec(hasFreedUpSpace: Boolean): DownstreamDirective = + @tailrec def rec(): DownstreamDirective = if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish() else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) { - if (hasFreedUpSpace && ctx.isHoldingUpstream) ctx.holdDownstreamAndPull() + if (!elemsInFlight.isFull && ctx.isHoldingUpstream) ctx.holdDownstreamAndPull() else ctx.holdDownstream() } else elemsInFlight.dequeue() match { - case Failure(ex) ⇒ rec(true) + case Failure(ex) ⇒ rec() case Success(elem) ⇒ if (ctx.isHoldingUpstream) ctx.pushAndPull(elem) else ctx.push(elem) } - rec(false) + rec() } override def onAsyncInput(input: (Int, Try[Out]), ctx: AsyncContext[Out, Notification]) = { @tailrec def rec(): Directive = if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish() - else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) ctx.ignore() - else elemsInFlight.dequeue() match { + else if (elemsInFlight.isEmpty || (elemsInFlight.peek eq NotYetThere)) { + if (!elemsInFlight.isFull && ctx.isHoldingUpstream) ctx.pull() + else ctx.ignore() + } else elemsInFlight.dequeue() match { case Failure(ex) ⇒ rec() case Success(elem) ⇒ if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)