From 004581a399c09baa2f0a4e382c886597b80fde0c Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Tue, 21 Jan 2020 03:15:30 -0500 Subject: [PATCH] Rename RecoveryPermitter.pending for telemetry (#28503) --- .../akka/persistence/RecoveryPermitter.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala index 610e863f14..a9a596b280 100644 --- a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala @@ -4,7 +4,7 @@ package akka.persistence -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef @@ -37,17 +37,19 @@ import akka.util.MessageBuffer import RecoveryPermitter._ private var usedPermits = 0 - private val pending = MessageBuffer.empty + + @InternalStableApi + private val pendingBuffer = MessageBuffer.empty private var maxPendingStats = 0 def receive = { case RequestRecoveryPermit => context.watch(sender()) if (usedPermits >= maxPermits) { - if (pending.isEmpty) + if (pendingBuffer.isEmpty) log.debug("Exceeded max-concurrent-recoveries [{}]. First pending {}", maxPermits, sender()) - pending.append(RequestRecoveryPermit, sender()) - maxPendingStats = math.max(maxPendingStats, pending.size) + pendingBuffer.append(RequestRecoveryPermit, sender()) + maxPendingStats = math.max(maxPendingStats, pendingBuffer.size) } else { recoveryPermitGranted(sender()) } @@ -57,9 +59,9 @@ import akka.util.MessageBuffer case Terminated(ref) => // pre-mature termination should be rare - val before = pending.size - pending.filterNot { case (_, r) => r == ref } - if (before == pending.size) + val before = pendingBuffer.size + pendingBuffer.filterNot { case (_, r) => r == ref } + if (before == pendingBuffer.size) onReturnRecoveryPermit(ref) // it wasn't pending, so return permit } @@ -67,12 +69,12 @@ import akka.util.MessageBuffer usedPermits -= 1 context.unwatch(ref) if (usedPermits < 0) throw new IllegalStateException(s"permits must not be negative (returned by: ${ref})") - if (!pending.isEmpty) { - val ref = pending.head()._2 - pending.dropHead() + if (!pendingBuffer.isEmpty) { + val ref = pendingBuffer.head()._2 + pendingBuffer.dropHead() recoveryPermitGranted(ref) } - if (pending.isEmpty && maxPendingStats > 0) { + if (pendingBuffer.isEmpty && maxPendingStats > 0) { log.debug( "Drained pending recovery permit requests, max in progress was [{}], still [{}] in progress", usedPermits + maxPendingStats,