=str Remove futureCB in MapAsyncUnordered.

This commit is contained in:
He-Pin 2023-09-02 19:51:27 +08:00 committed by kerr
parent 194602c849
commit 9f2a9e657a

View file

@ -1402,13 +1402,14 @@ private[stream] object Collect {
new GraphStageLogic(shape) with InHandler with OutHandler {
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
val decider =
private val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var inFlight = 0
private var buffer: BufferImpl[Out] = _
private val invokeFutureCB: Try[Out] => Unit = getAsyncCallback(futureCompleted).invoke
private[this] def todo = inFlight + buffer.used
private[this] def todo: Int = inFlight + buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes)
@ -1432,9 +1433,6 @@ private[stream] object Collect {
}
}
private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Out] => Unit = futureCB.invoke
override def onPush(): Unit = {
try {
val future = f(grab(in))