diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 5dd7378947..c800fac4af 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -819,7 +819,14 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut val future = f(grab(in)) val holder = new Holder[Out](NotYetThere, futureCB) buffer.enqueue(holder) - future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + + // #20217 We dispatch the future if it's ready to optimize away + // scheduling it to an execution context + future.value match { + case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + case Some(f) ⇒ holder.apply(f) + } + } catch { case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex) } @@ -882,7 +889,10 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I try { val future = f(grab(in)) inFlight += 1 - future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + future.value match { + case None ⇒ future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + case Some(f) ⇒ futureCB.apply(f) + } } catch { case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex) }