Str #20217 If the future in mapAsync is already available, don't dispatch it (#20375)

This commit is contained in:
David Knapp 2016-04-28 13:23:03 -07:00 committed by Konrad Malawski
parent 7913e25257
commit c72890b113

View file

@ -819,7 +819,14 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
val future = f(grab(in))
val holder = new Holder[Out](NotYetThere, futureCB)
buffer.enqueue(holder)
future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
// #20217 We dispatch the future if it's ready to optimize away
// scheduling it to an execution context
future.value match {
case None future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(f) holder.apply(f)
}
} catch {
case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex)
}
@ -882,7 +889,10 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
try {
val future = f(grab(in))
inFlight += 1
future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
future.value match {
case None future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(f) futureCB.apply(f)
}
} catch {
case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex)
}