chore: Remove ReentrantGuard (#2161)

This commit is contained in:
He-Pin(kerr) 2025-09-09 00:44:23 +08:00 committed by GitHub
parent 91401ebca1
commit 7fb098985b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 68 additions and 42 deletions

View file

@ -153,3 +153,8 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.Th
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig#ThreadPoolExecutorServiceFactory.this")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.event.LoggingBus.org$apache$pekko$event$LoggingBus$_setter_$org$apache$pekko$event$LoggingBus$$guard_=")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.event.LoggingBus.org$apache$pekko$event$LoggingBus$$guard")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.util.ReentrantGuard")

View file

@ -18,8 +18,7 @@ import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.TimeUnit.MICROSECONDS
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.LockSupport
import java.util.concurrent.locks.{ LockSupport, ReentrantLock }
import scala.annotation.{ switch, tailrec }
import scala.collection.{ immutable, mutable }
@ -29,7 +28,7 @@ import org.apache.pekko
import pekko.annotation.{ ApiMayChange, InternalApi }
import pekko.dispatch._
import pekko.event.Logging
import pekko.util.{ ImmutableIntMap, ReentrantGuard }
import pekko.util.ImmutableIntMap
import pekko.util.Helpers.Requiring
@InternalApi
@ -140,7 +139,7 @@ private[pekko] class AffinityPool(
// adding a worker. We want the creation of the worker, addition
// to the set and starting to worker to be an atomic action. Using
// a concurrent set would not give us that
private val bookKeepingLock = new ReentrantGuard()
private val bookKeepingLock = new ReentrantLock()
// condition used for awaiting termination
private val terminationCondition = bookKeepingLock.newCondition()
@ -151,15 +150,19 @@ private[pekko] class AffinityPool(
private[this] final val workQueues = Array.fill(parallelism)(new BoundedAffinityTaskQueue(affinityGroupSize))
private[this] final val workers = mutable.Set[AffinityPoolWorker]()
def start(): this.type =
bookKeepingLock.withGuard {
def start(): this.type = {
bookKeepingLock.lock()
try {
if (poolState == Uninitialized) {
poolState = Initializing
workQueues.foreach(q => addWorker(workers, q))
poolState = Running
}
this
} finally {
bookKeepingLock.unlock()
}
}
// WARNING: Only call while holding the bookKeepingLock
private def addWorker(workers: mutable.Set[AffinityPoolWorker], q: BoundedAffinityTaskQueue): Unit = {
@ -181,8 +184,9 @@ private[pekko] class AffinityPool(
* responsible for adding one more worker to compensate for its
* own termination
*/
private def onWorkerExit(w: AffinityPoolWorker, abruptTermination: Boolean): Unit =
bookKeepingLock.withGuard {
private def onWorkerExit(w: AffinityPoolWorker, abruptTermination: Boolean): Unit = {
bookKeepingLock.lock()
try {
workers.remove(w)
if (abruptTermination && poolState == Running)
addWorker(workers, w.q)
@ -190,7 +194,10 @@ private[pekko] class AffinityPool(
poolState = ShutDown // transition to shutdown and try to transition to termination
attemptPoolTermination()
}
} finally {
bookKeepingLock.unlock()
}
}
override def execute(command: Runnable): Unit = {
val queue = workQueues(queueSelector.getQueue(command, parallelism)) // Will throw NPE if command is null
@ -207,9 +214,12 @@ private[pekko] class AffinityPool(
else awaitTermination(terminationCondition.awaitNanos(nanos))
}
bookKeepingLock.withGuard {
bookKeepingLock.lock()
try {
// need to hold the lock to avoid monitor exception
awaitTermination(unit.toNanos(timeout))
} finally {
bookKeepingLock.unlock()
}
}
@ -220,22 +230,30 @@ private[pekko] class AffinityPool(
terminationCondition.signalAll()
}
override def shutdownNow(): java.util.List[Runnable] =
bookKeepingLock.withGuard {
override def shutdownNow(): java.util.List[Runnable] = {
bookKeepingLock.lock()
try {
poolState = ShutDown
workers.foreach(_.stop())
attemptPoolTermination()
// like in the FJ executor, we do not provide facility to obtain tasks that were in queue
Collections.emptyList[Runnable]()
} finally {
bookKeepingLock.unlock()
}
}
override def shutdown(): Unit =
bookKeepingLock.withGuard {
override def shutdown(): Unit = {
bookKeepingLock.lock()
try {
poolState = ShuttingDown
// interrupts only idle workers.. so others can process their queues
workers.foreach(_.stopIfIdle())
attemptPoolTermination()
} finally {
bookKeepingLock.unlock()
}
}
override def isShutdown: Boolean = poolState >= ShutDown

View file

@ -31,10 +31,12 @@ import pekko.actor.ActorSystem.Settings
import pekko.annotation.{ DoNotInherit, InternalApi }
import pekko.dispatch.RequiresMessageQueue
import pekko.event.Logging._
import pekko.util.{ Helpers, ReentrantGuard }
import pekko.util.Helpers
import pekko.util.Timeout
import pekko.util.unused
import java.util.concurrent.locks.ReentrantLock
/**
* This trait brings log level handling to the EventStream: it reads the log
* levels for the initial logging (StandardOutLogger) and the loggers & level
@ -50,7 +52,7 @@ trait LoggingBus extends ActorEventBus {
import Logging._
private val guard = new ReentrantGuard
private val guard = new ReentrantLock()
private var loggers = Seq.empty[ActorRef]
@volatile private var _logLevel: LogLevel = _
@ -69,21 +71,26 @@ trait LoggingBus extends ActorEventBus {
* will not participate in the automatic management of log level
* subscriptions!
*/
def setLogLevel(level: LogLevel): Unit = guard.withGuard {
val logLvl = _logLevel // saves (2 * AllLogLevel.size - 1) volatile reads (because of the loops below)
for {
l <- AllLogLevels
// subscribe if previously ignored and now requested
if l > logLvl && l <= level
log <- loggers
} subscribe(log, classFor(l))
for {
l <- AllLogLevels
// unsubscribe if previously registered and now ignored
if l <= logLvl && l > level
log <- loggers
} unsubscribe(log, classFor(l))
_logLevel = level
def setLogLevel(level: LogLevel): Unit = {
guard.lock()
try {
val logLvl = _logLevel // saves (2 * AllLogLevel.size - 1) volatile reads (because of the loops below)
for {
l <- AllLogLevels
// subscribe if previously ignored and now requested
if l > logLvl && l <= level
log <- loggers
} subscribe(log, classFor(l))
for {
l <- AllLogLevels
// unsubscribe if previously registered and now ignored
if l <= logLvl && l > level
log <- loggers
} unsubscribe(log, classFor(l))
_logLevel = level
} finally {
guard.unlock()
}
}
private def setUpStdoutLogger(config: Settings): Unit = {
@ -98,9 +105,12 @@ trait LoggingBus extends ActorEventBus {
ErrorLevel
}
AllLogLevels.filter(level >= _).foreach(l => subscribe(StandardOutLogger, classFor(l)))
guard.withGuard {
guard.lock()
try {
loggers :+= StandardOutLogger
_logLevel = level
} finally {
guard.unlock()
}
}
@ -147,9 +157,12 @@ trait LoggingBus extends ActorEventBus {
}
.get
}
guard.withGuard {
guard.lock()
try {
loggers = myloggers
_logLevel = level
} finally {
guard.unlock()
}
try {
if (system.settings.DebugUnhandledMessage)

View file

@ -14,16 +14,6 @@
package org.apache.pekko.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
final class ReentrantGuard extends ReentrantLock {
final def withGuard[T](body: => T): T = {
lock()
try body
finally unlock()
}
}
/**
* An atomic switch that can be either on or off