Limit number of concurrent PersistentActor recoveries, #22638 (#22641)

When starting many persistent actors at the same time the journal
its data store is protected from being overloaded by limiting number
of recoveries that can be in progress at the same time.

(cherry picked from commit afc9df17a7faf2a239598788ff48f3bf2cd7b605)
This commit is contained in:
Patrik Nordwall 2017-04-24 18:00:56 +02:00
parent cdd56a21a9
commit c385f163d9
8 changed files with 402 additions and 8 deletions

View file

@ -10,6 +10,14 @@
# Default persistence extension settings.
akka.persistence {
# When starting many persistent actors at the same time the journal
# and its data store is protected from being overloaded by limiting number
# of recoveries that can be in progress at the same time. When
# exceeding the limit the actors will wait until other recoveries have
# been completed.
max-concurrent-recoveries = 50
# Fully qualified class name providing a default internal stash overflow strategy.
# It needs to be a subclass of akka.persistence.StashOverflowStrategyConfigurator.
# The default strategy throws StashOverflowException.

View file

@ -63,7 +63,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
private var sequenceNr: Long = 0L
private var _lastSequenceNr: Long = 0L
// safely null because we initialize it with a proper `recoveryStarted` state in aroundPreStart before any real action happens
// safely null because we initialize it with a proper `waitingRecoveryPermit` state in aroundPreStart before any real action happens
private var currentState: State = null
// Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands
@ -188,10 +188,15 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
// Fail fast on missing plugins.
val j = journal; val s = snapshotStore
startRecovery(recovery)
requestRecoveryPermit()
super.aroundPreStart()
}
private def requestRecoveryPermit(): Unit = {
extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, self)
changeState(waitingRecoveryPermit(recovery))
}
/** INTERNAL API. */
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
@ -217,7 +222,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
/** INTERNAL API. */
override protected[akka] def aroundPostRestart(reason: Throwable): Unit = {
startRecovery(recovery)
requestRecoveryPermit()
super.aroundPostRestart(reason)
}
@ -396,6 +401,28 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
def recoveryRunning: Boolean
}
/**
* Initial state. Before starting the actual recovery it must get a permit from the
* `RecoveryPermitter`. When starting many persistent actors at the same time
* the journal and its data store is protected from being overloaded by limiting number
* of recoveries that can be in progress at the same time. When receiving
* `RecoveryPermitGranted` it switches to `recoveryStarted` state
* All incoming messages are stashed.
*/
private def waitingRecoveryPermit(recovery: Recovery) = new State {
override def toString: String = s"waiting for recovery permit"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case RecoveryPermitter.RecoveryPermitGranted
startRecovery(recovery)
case other
stashInternally(other)
}
}
/**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receiveRecover`. Then initiates a message replay, either starting
@ -430,7 +457,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
override def toString: String = s"recovery started (replayMax = [$replayMax])"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
override def stateReceive(receive: Receive, message: Any) = try message match {
case LoadSnapshotResult(sso, toSnr)
timeoutCancellable.cancel()
sso.foreach {
@ -454,7 +481,15 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
case other
stashInternally(other)
} catch {
case NonFatal(e)
returnRecoveryPermit()
throw e
}
private def returnRecoveryPermit(): Unit =
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self)
}
/**
@ -482,7 +517,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
override def recoveryRunning: Boolean = _recoveryRunning
override def stateReceive(receive: Receive, message: Any) = message match {
override def stateReceive(receive: Receive, message: Any) = try message match {
case ReplayedMessage(p)
try {
eventSeenInInterval = true
@ -492,6 +527,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
case NonFatal(t)
timeoutCancellable.cancel()
try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
returnRecoveryPermit()
}
case RecoverySuccess(highestSeqNr)
timeoutCancellable.cancel()
@ -516,9 +552,18 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
// snapshot tick, ignore
case other
stashInternally(other)
} catch {
case NonFatal(e)
returnRecoveryPermit()
throw e
}
private def returnRecoveryPermit(): Unit =
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self)
private def transitToProcessingState(): Unit = {
returnRecoveryPermit()
if (eventBatch.nonEmpty) flushBatch()
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)

View file

@ -19,6 +19,7 @@ import scala.concurrent.duration._
import akka.util.Reflect
import scala.util.control.NonFatal
import akka.annotation.InternalApi
/**
* Persistence configuration.
@ -146,6 +147,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.persistence")
/**
* INTERNAL API: When starting many persistent actors at the same time the journal
* its data store is protected from being overloaded by limiting number
* of recoveries that can be in progress at the same time.
*/
@InternalApi private[akka] val recoveryPermitter: ActorRef = {
val maxPermits = config.getInt("max-concurrent-recoveries")
system.systemActorOf(RecoveryPermitter.props(maxPermits), "recoveryPermitter")
}
// Lazy, so user is not forced to configure defaults when she is not using them.
private lazy val defaultJournalPluginId = {
val configPath = config.getString("journal.plugin")

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import java.util.LinkedList
import akka.annotation.InternalApi
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Terminated
/**
* INTERNAL API
*/
@InternalApi private[akka] object RecoveryPermitter {
def props(maxPermits: Int): Props =
Props(new RecoveryPermitter(maxPermits))
case object RequestRecoveryPermit
case object RecoveryPermitGranted
case object ReturnRecoveryPermit
}
/**
* INTERNAL API: When starting many persistent actors at the same time the journal
* its data store is protected from being overloaded by limiting number
* of recoveries that can be in progress at the same time.
*/
@InternalApi private[akka] class RecoveryPermitter(maxPermits: Int) extends Actor with ActorLogging {
import RecoveryPermitter._
private var usedPermits = 0
private val pending = new LinkedList[ActorRef]
private var maxPendingStats = 0
def receive = {
case RequestRecoveryPermit
context.watch(sender())
if (usedPermits >= maxPermits) {
if (pending.isEmpty)
log.debug("Exceeded max-concurrent-recoveries [{}]. First pending {}", maxPermits, sender())
pending.offer(sender())
maxPendingStats = math.max(maxPendingStats, pending.size)
} else {
recoveryPermitGranted(sender())
}
case ReturnRecoveryPermit
returnRecoveryPermit(sender())
case Terminated(ref)
// pre-mature termination should be rare
if (!pending.remove(ref))
returnRecoveryPermit(ref)
}
private def returnRecoveryPermit(ref: ActorRef): Unit = {
usedPermits -= 1
context.unwatch(ref)
if (usedPermits < 0) throw new IllegalStateException("permits must not be negative")
if (!pending.isEmpty) {
val ref = pending.poll()
recoveryPermitGranted(ref)
}
if (pending.isEmpty && maxPendingStats > 0) {
log.debug(
"Drained pending recovery permit requests, max in progress was [{}], still [{}] in progress",
usedPermits + maxPendingStats, usedPermits)
maxPendingStats = 0
}
}
private def recoveryPermitGranted(ref: ActorRef): Unit = {
usedPermits += 1
ref ! RecoveryPermitGranted
}
}

View file

@ -0,0 +1,79 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
object ManyRecoveriesSpec {
def testProps(name: String, latch: Option[TestLatch]): Props =
Props(new TestPersistentActor(name, latch))
final case class Cmd(s: String)
final case class Evt(s: String)
class TestPersistentActor(name: String, latch: Option[TestLatch]) extends PersistentActor {
override def persistenceId = name
override def receiveRecover: Receive = {
case Evt(s)
latch.foreach(Await.ready(_, 10.seconds))
}
override def receiveCommand: Receive = {
case Cmd(s) persist(Evt(s)) { _
sender() ! s"$persistenceId-$s-${lastSequenceNr}"
}
case "stop"
context.stop(self)
}
}
}
class ManyRecoveriesSpec extends PersistenceSpec(ConfigFactory.parseString(
s"""
akka.actor.default-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 5
}
}
akka.persistence.max-concurrent-recoveries = 3
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.actor.warn-about-java-serializer-usage = off
""")) with ImplicitSender {
import ManyRecoveriesSpec._
"Many persistent actors" must {
"be able to recovery without overloading" in {
(1 to 100).foreach { n
system.actorOf(testProps(s"a$n", latch = None)) ! Cmd("A")
expectMsg(s"a$n-A-1")
}
// this would starve (block) all threads without max-concurrent-recoveries
val latch = TestLatch()
(1 to 100).foreach { n
system.actorOf(testProps(s"a$n", Some(latch))) ! Cmd("B")
}
// this should be able to progress even though above is blocking,
// 2 remaining non-blocked threads
(1 to 10).foreach { n
system.actorOf(TestActors.echoActorProps) ! n
expectMsg(n)
}
latch.countDown()
receiveN(100).toSet should ===((1 to 100).map(n s"a$n-B-2").toSet)
}
}
}

View file

@ -0,0 +1,158 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.event.Logging
import akka.event.Logging.Warning
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
import com.typesafe.config.ConfigFactory
import akka.testkit.TestProbe
import akka.testkit.TestActors
object RecoveryPermitterSpec {
def testProps(name: String, probe: ActorRef): Props =
Props(new TestPersistentActor(name, probe))
class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor {
override def persistenceId = name
override def postStop(): Unit = {
probe ! "postStop"
}
override def receiveRecover: Receive = {
case RecoveryCompleted
probe ! RecoveryCompleted
}
override def receiveCommand: Receive = {
case "stop"
context.stop(self)
}
}
}
class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(
s"""
akka.persistence.max-concurrent-recoveries = 3
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.actor.warn-about-java-serializer-usage = off
""")) with ImplicitSender {
import RecoveryPermitterSpec._
import RecoveryPermitter._
val permitter = Persistence(system).recoveryPermitter
val p1 = TestProbe()
val p2 = TestProbe()
val p3 = TestProbe()
val p4 = TestProbe()
val p5 = TestProbe()
def requestPermit(p: TestProbe): Unit = {
permitter.tell(RequestRecoveryPermit, p.ref)
p.expectMsg(RecoveryPermitGranted)
}
"RecoveryPermitter" must {
"grant permits up to the limit" in {
requestPermit(p1)
requestPermit(p2)
requestPermit(p3)
permitter.tell(RequestRecoveryPermit, p4.ref)
permitter.tell(RequestRecoveryPermit, p5.ref)
p4.expectNoMsg(100.millis)
p5.expectNoMsg(10.millis)
permitter.tell(ReturnRecoveryPermit, p2.ref)
p4.expectMsg(RecoveryPermitGranted)
p5.expectNoMsg(100.millis)
permitter.tell(ReturnRecoveryPermit, p1.ref)
p5.expectMsg(RecoveryPermitGranted)
permitter.tell(ReturnRecoveryPermit, p3.ref)
permitter.tell(ReturnRecoveryPermit, p4.ref)
permitter.tell(ReturnRecoveryPermit, p5.ref)
}
"grant recovery when all permits not used" in {
requestPermit(p1)
system.actorOf(testProps("p2", p2.ref))
p2.expectMsg(RecoveryCompleted)
permitter.tell(ReturnRecoveryPermit, p1.ref)
}
"delay recovery when all permits used" in {
requestPermit(p1)
requestPermit(p2)
requestPermit(p3)
val persistentActor = system.actorOf(testProps("p4", p4.ref))
p4.watch(persistentActor)
persistentActor ! "stop"
p4.expectNoMsg(200.millis)
permitter.tell(ReturnRecoveryPermit, p3.ref)
p4.expectMsg(RecoveryCompleted)
p4.expectMsg("postStop")
p4.expectTerminated(persistentActor)
permitter.tell(ReturnRecoveryPermit, p1.ref)
permitter.tell(ReturnRecoveryPermit, p2.ref)
}
"return permit when actor is pre-maturely terminated before holding permit" in {
requestPermit(p1)
requestPermit(p2)
requestPermit(p3)
val persistentActor = system.actorOf(testProps("p4", p4.ref))
p4.expectNoMsg(100.millis)
permitter.tell(RequestRecoveryPermit, p5.ref)
p5.expectNoMsg(100.millis)
// PoisonPill is not stashed
persistentActor ! PoisonPill
p4.expectMsg("postStop")
// persistentActor didn't hold a permit so still
p5.expectNoMsg(100.millis)
permitter.tell(ReturnRecoveryPermit, p1.ref)
p5.expectMsg(RecoveryPermitGranted)
permitter.tell(ReturnRecoveryPermit, p2.ref)
permitter.tell(ReturnRecoveryPermit, p3.ref)
permitter.tell(ReturnRecoveryPermit, p5.ref)
}
"return permit when actor is pre-maturely terminated when holding permit" in {
val actor = system.actorOf(TestActors.forwardActorProps(p1.ref))
permitter.tell(RequestRecoveryPermit, actor)
p1.expectMsg(RecoveryPermitGranted)
requestPermit(p2)
requestPermit(p3)
permitter.tell(RequestRecoveryPermit, p4.ref)
p4.expectNoMsg(100.millis)
actor ! PoisonPill
p4.expectMsg(RecoveryPermitGranted)
permitter.tell(ReturnRecoveryPermit, p2.ref)
permitter.tell(ReturnRecoveryPermit, p3.ref)
permitter.tell(ReturnRecoveryPermit, p4.ref)
}
}
}