diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index b71de5f6ba..64ebbaa2d3 100644 --- a/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -153,3 +153,8 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.Th ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.copy") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.this") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig#ThreadPoolExecutorServiceFactory.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.event.LoggingBus.org$apache$pekko$event$LoggingBus$_setter_$org$apache$pekko$event$LoggingBus$$guard_=") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.event.LoggingBus.org$apache$pekko$event$LoggingBus$$guard") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.util.ReentrantGuard") + + diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/affinity/AffinityPool.scala b/actor/src/main/scala/org/apache/pekko/dispatch/affinity/AffinityPool.scala index c34d39bc9d..21644fe05e 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/affinity/AffinityPool.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/affinity/AffinityPool.scala @@ -18,8 +18,7 @@ import java.util.Collections import java.util.concurrent._ import java.util.concurrent.TimeUnit.MICROSECONDS import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.locks.LockSupport - +import java.util.concurrent.locks.{ LockSupport, ReentrantLock } import scala.annotation.{ switch, tailrec } import scala.collection.{ immutable, mutable } @@ -29,7 +28,7 @@ import org.apache.pekko import pekko.annotation.{ ApiMayChange, InternalApi } import pekko.dispatch._ import pekko.event.Logging -import pekko.util.{ ImmutableIntMap, ReentrantGuard } +import pekko.util.ImmutableIntMap import pekko.util.Helpers.Requiring @InternalApi @@ -140,7 +139,7 @@ private[pekko] class AffinityPool( // adding a worker. We want the creation of the worker, addition // to the set and starting to worker to be an atomic action. Using // a concurrent set would not give us that - private val bookKeepingLock = new ReentrantGuard() + private val bookKeepingLock = new ReentrantLock() // condition used for awaiting termination private val terminationCondition = bookKeepingLock.newCondition() @@ -151,15 +150,19 @@ private[pekko] class AffinityPool( private[this] final val workQueues = Array.fill(parallelism)(new BoundedAffinityTaskQueue(affinityGroupSize)) private[this] final val workers = mutable.Set[AffinityPoolWorker]() - def start(): this.type = - bookKeepingLock.withGuard { + def start(): this.type = { + bookKeepingLock.lock() + try { if (poolState == Uninitialized) { poolState = Initializing workQueues.foreach(q => addWorker(workers, q)) poolState = Running } this + } finally { + bookKeepingLock.unlock() } + } // WARNING: Only call while holding the bookKeepingLock private def addWorker(workers: mutable.Set[AffinityPoolWorker], q: BoundedAffinityTaskQueue): Unit = { @@ -181,8 +184,9 @@ private[pekko] class AffinityPool( * responsible for adding one more worker to compensate for its * own termination */ - private def onWorkerExit(w: AffinityPoolWorker, abruptTermination: Boolean): Unit = - bookKeepingLock.withGuard { + private def onWorkerExit(w: AffinityPoolWorker, abruptTermination: Boolean): Unit = { + bookKeepingLock.lock() + try { workers.remove(w) if (abruptTermination && poolState == Running) addWorker(workers, w.q) @@ -190,7 +194,10 @@ private[pekko] class AffinityPool( poolState = ShutDown // transition to shutdown and try to transition to termination attemptPoolTermination() } + } finally { + bookKeepingLock.unlock() } + } override def execute(command: Runnable): Unit = { val queue = workQueues(queueSelector.getQueue(command, parallelism)) // Will throw NPE if command is null @@ -207,9 +214,12 @@ private[pekko] class AffinityPool( else awaitTermination(terminationCondition.awaitNanos(nanos)) } - bookKeepingLock.withGuard { + bookKeepingLock.lock() + try { // need to hold the lock to avoid monitor exception awaitTermination(unit.toNanos(timeout)) + } finally { + bookKeepingLock.unlock() } } @@ -220,22 +230,30 @@ private[pekko] class AffinityPool( terminationCondition.signalAll() } - override def shutdownNow(): java.util.List[Runnable] = - bookKeepingLock.withGuard { + override def shutdownNow(): java.util.List[Runnable] = { + bookKeepingLock.lock() + try { poolState = ShutDown workers.foreach(_.stop()) attemptPoolTermination() // like in the FJ executor, we do not provide facility to obtain tasks that were in queue Collections.emptyList[Runnable]() + } finally { + bookKeepingLock.unlock() } + } - override def shutdown(): Unit = - bookKeepingLock.withGuard { + override def shutdown(): Unit = { + bookKeepingLock.lock() + try { poolState = ShuttingDown // interrupts only idle workers.. so others can process their queues workers.foreach(_.stopIfIdle()) attemptPoolTermination() + } finally { + bookKeepingLock.unlock() } + } override def isShutdown: Boolean = poolState >= ShutDown diff --git a/actor/src/main/scala/org/apache/pekko/event/Logging.scala b/actor/src/main/scala/org/apache/pekko/event/Logging.scala index 866f66a365..9c380d3c6f 100644 --- a/actor/src/main/scala/org/apache/pekko/event/Logging.scala +++ b/actor/src/main/scala/org/apache/pekko/event/Logging.scala @@ -31,10 +31,12 @@ import pekko.actor.ActorSystem.Settings import pekko.annotation.{ DoNotInherit, InternalApi } import pekko.dispatch.RequiresMessageQueue import pekko.event.Logging._ -import pekko.util.{ Helpers, ReentrantGuard } +import pekko.util.Helpers import pekko.util.Timeout import pekko.util.unused +import java.util.concurrent.locks.ReentrantLock + /** * This trait brings log level handling to the EventStream: it reads the log * levels for the initial logging (StandardOutLogger) and the loggers & level @@ -50,7 +52,7 @@ trait LoggingBus extends ActorEventBus { import Logging._ - private val guard = new ReentrantGuard + private val guard = new ReentrantLock() private var loggers = Seq.empty[ActorRef] @volatile private var _logLevel: LogLevel = _ @@ -69,21 +71,26 @@ trait LoggingBus extends ActorEventBus { * will not participate in the automatic management of log level * subscriptions! */ - def setLogLevel(level: LogLevel): Unit = guard.withGuard { - val logLvl = _logLevel // saves (2 * AllLogLevel.size - 1) volatile reads (because of the loops below) - for { - l <- AllLogLevels - // subscribe if previously ignored and now requested - if l > logLvl && l <= level - log <- loggers - } subscribe(log, classFor(l)) - for { - l <- AllLogLevels - // unsubscribe if previously registered and now ignored - if l <= logLvl && l > level - log <- loggers - } unsubscribe(log, classFor(l)) - _logLevel = level + def setLogLevel(level: LogLevel): Unit = { + guard.lock() + try { + val logLvl = _logLevel // saves (2 * AllLogLevel.size - 1) volatile reads (because of the loops below) + for { + l <- AllLogLevels + // subscribe if previously ignored and now requested + if l > logLvl && l <= level + log <- loggers + } subscribe(log, classFor(l)) + for { + l <- AllLogLevels + // unsubscribe if previously registered and now ignored + if l <= logLvl && l > level + log <- loggers + } unsubscribe(log, classFor(l)) + _logLevel = level + } finally { + guard.unlock() + } } private def setUpStdoutLogger(config: Settings): Unit = { @@ -98,9 +105,12 @@ trait LoggingBus extends ActorEventBus { ErrorLevel } AllLogLevels.filter(level >= _).foreach(l => subscribe(StandardOutLogger, classFor(l))) - guard.withGuard { + guard.lock() + try { loggers :+= StandardOutLogger _logLevel = level + } finally { + guard.unlock() } } @@ -147,9 +157,12 @@ trait LoggingBus extends ActorEventBus { } .get } - guard.withGuard { + guard.lock() + try { loggers = myloggers _logLevel = level + } finally { + guard.unlock() } try { if (system.settings.DebugUnhandledMessage) diff --git a/actor/src/main/scala/org/apache/pekko/util/LockUtil.scala b/actor/src/main/scala/org/apache/pekko/util/LockUtil.scala index 5c8c834a9a..dd2cc5d66d 100644 --- a/actor/src/main/scala/org/apache/pekko/util/LockUtil.scala +++ b/actor/src/main/scala/org/apache/pekko/util/LockUtil.scala @@ -14,16 +14,6 @@ package org.apache.pekko.util import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantLock - -final class ReentrantGuard extends ReentrantLock { - - final def withGuard[T](body: => T): T = { - lock() - try body - finally unlock() - } -} /** * An atomic switch that can be either on or off