Rename RecoveryPermitter.pending for telemetry (#28503)

This commit is contained in:
Yury Gribkov 2020-01-21 03:15:30 -05:00 committed by Johan Andrén
parent 6072f6d263
commit 004581a399

View file

@ -4,7 +4,7 @@
package akka.persistence package akka.persistence
import akka.annotation.InternalApi import akka.annotation.{ InternalApi, InternalStableApi }
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.actor.ActorRef import akka.actor.ActorRef
@ -37,17 +37,19 @@ import akka.util.MessageBuffer
import RecoveryPermitter._ import RecoveryPermitter._
private var usedPermits = 0 private var usedPermits = 0
private val pending = MessageBuffer.empty
@InternalStableApi
private val pendingBuffer = MessageBuffer.empty
private var maxPendingStats = 0 private var maxPendingStats = 0
def receive = { def receive = {
case RequestRecoveryPermit => case RequestRecoveryPermit =>
context.watch(sender()) context.watch(sender())
if (usedPermits >= maxPermits) { if (usedPermits >= maxPermits) {
if (pending.isEmpty) if (pendingBuffer.isEmpty)
log.debug("Exceeded max-concurrent-recoveries [{}]. First pending {}", maxPermits, sender()) log.debug("Exceeded max-concurrent-recoveries [{}]. First pending {}", maxPermits, sender())
pending.append(RequestRecoveryPermit, sender()) pendingBuffer.append(RequestRecoveryPermit, sender())
maxPendingStats = math.max(maxPendingStats, pending.size) maxPendingStats = math.max(maxPendingStats, pendingBuffer.size)
} else { } else {
recoveryPermitGranted(sender()) recoveryPermitGranted(sender())
} }
@ -57,9 +59,9 @@ import akka.util.MessageBuffer
case Terminated(ref) => case Terminated(ref) =>
// pre-mature termination should be rare // pre-mature termination should be rare
val before = pending.size val before = pendingBuffer.size
pending.filterNot { case (_, r) => r == ref } pendingBuffer.filterNot { case (_, r) => r == ref }
if (before == pending.size) if (before == pendingBuffer.size)
onReturnRecoveryPermit(ref) // it wasn't pending, so return permit onReturnRecoveryPermit(ref) // it wasn't pending, so return permit
} }
@ -67,12 +69,12 @@ import akka.util.MessageBuffer
usedPermits -= 1 usedPermits -= 1
context.unwatch(ref) context.unwatch(ref)
if (usedPermits < 0) throw new IllegalStateException(s"permits must not be negative (returned by: ${ref})") if (usedPermits < 0) throw new IllegalStateException(s"permits must not be negative (returned by: ${ref})")
if (!pending.isEmpty) { if (!pendingBuffer.isEmpty) {
val ref = pending.head()._2 val ref = pendingBuffer.head()._2
pending.dropHead() pendingBuffer.dropHead()
recoveryPermitGranted(ref) recoveryPermitGranted(ref)
} }
if (pending.isEmpty && maxPendingStats > 0) { if (pendingBuffer.isEmpty && maxPendingStats > 0) {
log.debug( log.debug(
"Drained pending recovery permit requests, max in progress was [{}], still [{}] in progress", "Drained pending recovery permit requests, max in progress was [{}], still [{}] in progress",
usedPermits + maxPendingStats, usedPermits + maxPendingStats,