From af8a81f45f351ca955d8b2d0724051e50b4fe80d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 7 Dec 2017 10:09:15 +0100 Subject: [PATCH] MapAsync and already failed futures #24117 --- .../stream/scaladsl/FlowMapAsyncSpec.scala | 52 +++++++++++++++++++ .../scala/akka/stream/impl/fusing/Ops.scala | 13 +++-- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index b5873fb63e..915890a88d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -80,6 +80,23 @@ class FlowMapAsyncSpec extends StreamSpec { c.expectNoMsg(200.millis) } + "signal future already failed" in assertAllStagesStopped { + val latch = TestLatch(1) + val c = TestSubscriber.manualProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).mapAsync(4)(n ⇒ + if (n == 3) Future.failed[Int](new TE("err1")) + else Future { + Await.ready(latch, 10.seconds) + n + } + ).to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectError().getMessage should be("err1") + latch.countDown() + } + "signal future failure" in assertAllStagesStopped { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() @@ -229,6 +246,22 @@ class FlowMapAsyncSpec extends StreamSpec { c.expectComplete() } + "resume after already failed future" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5) + .mapAsync(4)(n ⇒ + if (n == 3) Future.failed(new TE("err3")) + else Future.successful(n) + ) + .withAttributes(supervisionStrategy(resumingDecider)) + .to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + for (n ← List(1, 2, 4, 5)) c.expectNext(n) + c.expectComplete() + } + "resume after multiple failures" in assertAllStagesStopped { val futures: List[Future[String]] = List( Future.failed(Utils.TE("failure1")), @@ -371,6 +404,25 @@ class FlowMapAsyncSpec extends StreamSpec { failCount.get() should ===(1) } + "not invoke the decider twice for the same already failed future" in { + import system.dispatcher + val failCount = new AtomicInteger(0) + val result = Source(List(true, false)) + .mapAsync(1)(elem ⇒ + if (elem) Future.failed(TE("this has gone too far")) + else Future.successful(elem) + ).addAttributes(supervisionStrategy { + case TE("this has gone too far") ⇒ + failCount.incrementAndGet() + Supervision.resume + case _ ⇒ Supervision.stop + }) + .runWith(Sink.seq) + + result.futureValue should ===(Seq(false)) + failCount.get() should ===(1) + } + "not invoke the decider twice for the same failure to produce a future" in { import system.dispatcher val failCount = new AtomicInteger(0) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 072a2202e5..94bdc4d1ac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1172,7 +1172,7 @@ private[stream] object Collect { private val futureCB = getAsyncCallback[Holder[Out]](holder ⇒ holder.elem match { case Success(_) ⇒ pushNextIfPossible() - case Failure(NonFatal(ex)) ⇒ + case Failure(ex) ⇒ holder.supervisionDirectiveFor(decider, ex) match { // fail fast as if supervision says so case Supervision.Stop ⇒ failStage(ex) @@ -1195,9 +1195,14 @@ private[stream] object Collect { future.value match { case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) case Some(v) ⇒ - // #20217 the future is already here, avoid scheduling it on the dispatcher + // #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and + // run the logic directly on this thread holder.setElem(v) - pushNextIfPossible() + v match { + // this optimization also requires us to stop the stage to fail fast if the decider says so: + case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop ⇒ failStage(ex) + case _ ⇒ pushNextIfPossible() + } } } catch { @@ -1225,6 +1230,8 @@ private[stream] object Collect { case Failure(NonFatal(ex)) ⇒ holder.supervisionDirectiveFor(decider, ex) match { + // this could happen if we are looping in pushNextIfPossible and end up on a failed future before the + // onComplete callback has run case Supervision.Stop ⇒ failStage(ex) case _ ⇒ // try next element