=str - Improves performance of mapAsync
* Removes allocations * Removes indirections & shortens code paths
This commit is contained in:
parent
52de0bcaa4
commit
598799d5ae
2 changed files with 82 additions and 96 deletions
|
|
@ -43,8 +43,8 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
|
||||
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
|
||||
|
||||
|
|
@ -67,7 +67,7 @@ private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLine
|
|||
} catch {
|
||||
case NonFatal(ex) ⇒ decider(ex) match {
|
||||
case Supervision.Stop ⇒ failStage(ex)
|
||||
case _ ⇒ pull(in)
|
||||
case _ ⇒ pull(in)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -699,7 +699,15 @@ private[akka] final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) ext
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object MapAsync {
|
||||
final class Holder[T](var elem: T)
|
||||
final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] ⇒ Unit) {
|
||||
override def apply(t: Try[T]): Unit = {
|
||||
elem = t match {
|
||||
case Success(null) ⇒ Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException)
|
||||
case other ⇒ other
|
||||
}
|
||||
cb.invoke(this)
|
||||
}
|
||||
}
|
||||
val NotYetThere = Failure(new Exception)
|
||||
}
|
||||
|
||||
|
|
@ -711,78 +719,60 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
|
|||
|
||||
import MapAsync._
|
||||
|
||||
private val in = Inlet[In]("in")
|
||||
private val out = Outlet[Out]("out")
|
||||
private val in = Inlet[In]("MapAsync.in")
|
||||
private val out = Outlet[Out]("MapAsync.out")
|
||||
|
||||
override def initialAttributes = DefaultAttributes.mapAsync
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
|
||||
override def toString = s"MapAsync.Logic(buffer=$buffer)"
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def toString = s"MapAsync.Logic(buffer=$buffer)"
|
||||
|
||||
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
|
||||
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
|
||||
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
var buffer: BufferImpl[Holder[Out]] = _
|
||||
val futureCB =
|
||||
getAsyncCallback[Holder[Out]](
|
||||
_.elem match {
|
||||
case Failure(e) if decider(e) == Supervision.Stop ⇒ failStage(e)
|
||||
case _ ⇒ if (isAvailable(out)) pushOne()
|
||||
})
|
||||
|
||||
var buffer: BufferImpl[Holder[Try[Out]]] = _
|
||||
def todo = buffer.used
|
||||
private[this] def todo = buffer.used
|
||||
|
||||
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
|
||||
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
|
||||
|
||||
@tailrec private def pushOne(): Unit =
|
||||
if (buffer.isEmpty) {
|
||||
if (isClosed(in)) completeStage()
|
||||
else if (!hasBeenPulled(in)) pull(in)
|
||||
} else if (buffer.peek.elem == NotYetThere) {
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
} else buffer.dequeue().elem match {
|
||||
case Failure(ex) ⇒ pushOne()
|
||||
case Success(elem) ⇒
|
||||
push(out, elem)
|
||||
@tailrec private def pushOne(): Unit =
|
||||
if (buffer.isEmpty) {
|
||||
if (isClosed(in)) completeStage()
|
||||
else if (!hasBeenPulled(in)) pull(in)
|
||||
} else if (buffer.peek.elem == NotYetThere) {
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
}
|
||||
} else buffer.dequeue().elem match {
|
||||
case Success(elem) ⇒
|
||||
push(out, elem)
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
case Failure(ex) ⇒ pushOne()
|
||||
}
|
||||
|
||||
def failOrPull(holder: Holder[Try[Out]], f: Failure[Out]) =
|
||||
if (decider(f.exception) == Supervision.Stop) failStage(f.exception)
|
||||
else {
|
||||
holder.elem = f
|
||||
if (isAvailable(out)) pushOne()
|
||||
}
|
||||
|
||||
val futureCB =
|
||||
getAsyncCallback[(Holder[Try[Out]], Try[Out])]({
|
||||
case (holder, f: Failure[_]) ⇒ failOrPull(holder, f)
|
||||
case (holder, s @ Success(elem)) ⇒
|
||||
if (elem == null) {
|
||||
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
||||
failOrPull(holder, Failure(ex))
|
||||
} else {
|
||||
holder.elem = s
|
||||
if (isAvailable(out)) pushOne()
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
val future = f(grab(in))
|
||||
val holder = new Holder[Try[Out]](NotYetThere)
|
||||
val holder = new Holder[Out](NotYetThere, futureCB)
|
||||
buffer.enqueue(holder)
|
||||
future.onComplete(result ⇒ futureCB.invoke(holder -> result))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
}
|
||||
if (todo < parallelism) tryPull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (todo == 0) completeStage()
|
||||
}
|
||||
})
|
||||
override def onUpstreamFinish(): Unit = if (todo == 0) completeStage()
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = pushOne()
|
||||
})
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -791,71 +781,67 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
|
|||
private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
||||
extends GraphStage[FlowShape[In, Out]] {
|
||||
|
||||
private val in = Inlet[In]("in")
|
||||
private val out = Outlet[Out]("out")
|
||||
private val in = Inlet[In]("MapAsyncUnordered.in")
|
||||
private val out = Outlet[Out]("MapAsyncUnordered.out")
|
||||
|
||||
override def initialAttributes = DefaultAttributes.mapAsyncUnordered
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
|
||||
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
||||
|
||||
val decider =
|
||||
inheritedAttributes.get[SupervisionStrategy]
|
||||
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
val decider =
|
||||
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
|
||||
var inFlight = 0
|
||||
var buffer: BufferImpl[Out] = _
|
||||
def todo = inFlight + buffer.used
|
||||
var inFlight = 0
|
||||
var buffer: BufferImpl[Out] = _
|
||||
private[this] def todo = inFlight + buffer.used
|
||||
|
||||
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
|
||||
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
|
||||
|
||||
def failOrPull(ex: Throwable) =
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
else if (isClosed(in) && todo == 0) completeStage()
|
||||
else if (!hasBeenPulled(in)) tryPull(in)
|
||||
private val futureCB =
|
||||
getAsyncCallback((result: Try[Out]) ⇒ {
|
||||
inFlight -= 1
|
||||
result match {
|
||||
case Success(elem) if elem != null ⇒
|
||||
if (isAvailable(out)) {
|
||||
if (!hasBeenPulled(in)) tryPull(in)
|
||||
push(out, elem)
|
||||
} else buffer.enqueue(elem)
|
||||
case other ⇒
|
||||
val ex = other match {
|
||||
case Failure(t) ⇒ t
|
||||
case Success(s) if s == null ⇒ ReactiveStreamsCompliance.elementMustNotBeNullException
|
||||
}
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
else if (isClosed(in) && todo == 0) completeStage()
|
||||
else if (!hasBeenPulled(in)) tryPull(in)
|
||||
}
|
||||
}).invoke _
|
||||
|
||||
val futureCB =
|
||||
getAsyncCallback((result: Try[Out]) ⇒ {
|
||||
inFlight -= 1
|
||||
result match {
|
||||
case Failure(ex) ⇒ failOrPull(ex)
|
||||
case Success(elem) ⇒
|
||||
if (elem == null) {
|
||||
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
||||
failOrPull(ex)
|
||||
} else if (isAvailable(out)) {
|
||||
if (!hasBeenPulled(in)) tryPull(in)
|
||||
push(out, elem)
|
||||
} else buffer.enqueue(elem)
|
||||
}
|
||||
}).invoke _
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
val future = f(grab(in))
|
||||
inFlight += 1
|
||||
future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
}
|
||||
if (todo < parallelism) tryPull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (todo == 0) completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (!buffer.isEmpty) push(out, buffer.dequeue())
|
||||
else if (isClosed(in) && todo == 0) completeStage()
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue