=str Make use of Set instead of List to reduce contention.

Signed-off-by: He-Pin <hepin1989@gmail.com>
This commit is contained in:
He-Pin 2023-06-17 16:48:59 +08:00 committed by kerr
parent 5a8f811d53
commit 464517a547
2 changed files with 21 additions and 34 deletions

View file

@ -477,13 +477,13 @@ import pekko.stream.stage._
handler(evt) handler(evt)
if (promise ne GraphStageLogic.NoPromise) { if (promise ne GraphStageLogic.NoPromise) {
promise.success(Done) promise.success(Done)
logic.onFeedbackDispatched() logic.onFeedbackDispatched(promise)
} }
} catch { } catch {
case NonFatal(ex) => case NonFatal(ex) =>
if (promise ne GraphStageLogic.NoPromise) { if (promise ne GraphStageLogic.NoPromise) {
promise.failure(ex) promise.failure(ex)
logic.onFeedbackDispatched() logic.onFeedbackDispatched(promise)
} }
logic.failStage(ex) logic.failStage(ex)
} }

View file

@ -13,19 +13,22 @@
package org.apache.pekko.stream.stage package org.apache.pekko.stream.stage
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import scala.annotation.nowarn
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.{ immutable, mutable } import scala.collection.{ immutable, mutable }
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.annotation.nowarn
import org.apache.pekko import org.apache.pekko
import pekko.{ Done, NotUsed } import pekko.{ Done, NotUsed }
import pekko.actor._ import pekko.actor._
import pekko.annotation.InternalApi import pekko.annotation.InternalApi
import pekko.japi.function.{ Effect, Procedure } import pekko.japi.function.{ Effect, Procedure }
import pekko.stream.Attributes.SourceLocation
import pekko.stream._ import pekko.stream._
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } import pekko.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder }
import pekko.stream.impl.ActorSubscriberMessage import pekko.stream.impl.ActorSubscriberMessage
import pekko.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } import pekko.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource }
@ -1227,13 +1230,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Add this promise to the owning logic, so it can be completed afterPostStop if it was never handled otherwise. * Add this promise to the owning logic, so it can be completed afterPostStop if it was never handled otherwise.
* Returns whether the logic is still running. * Returns whether the logic is still running.
*/ */
@tailrec
def addToWaiting(): Boolean = { def addToWaiting(): Boolean = {
val previous = asyncCallbacksInProgress.get() val callbacks = asyncCallbacksInProgress.get()
if (previous != null) { // not stopped if (callbacks ne null) { // not stopped
val updated = promise :: previous callbacks.add(promise)
if (!asyncCallbacksInProgress.compareAndSet(previous, updated)) addToWaiting() asyncCallbacksInProgress.get ne null // logic may already stopped
else true
} else // logic was already stopped } else // logic was already stopped
false false
} }
@ -1282,7 +1283,9 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
private var callbacksWaitingForInterpreter: List[ConcurrentAsyncCallback[_]] = Nil private var callbacksWaitingForInterpreter: List[ConcurrentAsyncCallback[_]] = Nil
// is used for two purposes: keep track of running callbacks and signal that the // is used for two purposes: keep track of running callbacks and signal that the
// stage has stopped to fail incoming async callback invocations by being set to null // stage has stopped to fail incoming async callback invocations by being set to null
private val asyncCallbacksInProgress = new AtomicReference[List[Promise[Done]]](Nil) // Using ConcurrentHashMap's KeySetView as Set to track the inProgress async callbacks.
private val asyncCallbacksInProgress: AtomicReference[java.util.Set[Promise[Done]]] =
new AtomicReference(ConcurrentHashMap.newKeySet())
private var _stageActor: StageActor = _ private var _stageActor: StageActor = _
final def stageActor: StageActor = _stageActor match { final def stageActor: StageActor = _stageActor match {
@ -1374,35 +1377,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
} }
// make sure any invokeWithFeedback after this fails fast // make sure any invokeWithFeedback after this fails fast
// and fail current outstanding invokeWithFeedback promises // and fail current outstanding invokeWithFeedback promises
val inProgress = asyncCallbacksInProgress.getAndSet(null) val callbacks = asyncCallbacksInProgress.getAndSet(null)
if (inProgress.nonEmpty) { if ((callbacks ne null) && !callbacks.isEmpty) {
val exception = streamDetachedException val exception = streamDetachedException
inProgress.foreach(_.tryFailure(exception)) callbacks.forEach((t: Promise[Done]) => t.tryFailure(exception))
} }
cleanUpSubstreams(OptionVal.None) cleanUpSubstreams(OptionVal.None)
} }
private[this] var asyncCleanupCounter = 0L
/** Called from interpreter thread by GraphInterpreter.runAsyncInput */ /** Called from interpreter thread by GraphInterpreter.runAsyncInput */
private[stream] def onFeedbackDispatched(): Unit = { private[stream] def onFeedbackDispatched(promise: Promise[Done]): Unit = {
asyncCleanupCounter += 1 val callbacks = asyncCallbacksInProgress.get()
if (callbacks ne null) {
// 256 seemed to be a sweet spot in SendQueueBenchmark.queue benchmarks callbacks.remove(promise)
// It means that at most 255 completed promises are retained per logic that
// uses invokeWithFeedback callbacks.
//
// TODO: add periodical cleanup to get rid of those 255 promises as well
if (asyncCleanupCounter % 256 == 0) {
@tailrec def cleanup(): Unit = {
val previous = asyncCallbacksInProgress.get()
if (previous != null) {
val updated = previous.filterNot(_.isCompleted)
if (!asyncCallbacksInProgress.compareAndSet(previous, updated)) cleanup()
}
}
cleanup()
} }
} }