From c385f163d9a32f89d1c6fbc4cd2686fabb3c13a5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 24 Apr 2017 18:00:56 +0200 Subject: [PATCH] Limit number of concurrent PersistentActor recoveries, #22638 (#22641) When starting many persistent actors at the same time the journal its data store is protected from being overloaded by limiting number of recoveries that can be in progress at the same time. (cherry picked from commit afc9df17a7faf2a239598788ff48f3bf2cd7b605) --- akka-docs/rst/java/persistence.rst | 10 +- akka-docs/rst/scala/persistence.rst | 8 +- .../src/main/resources/reference.conf | 8 + .../scala/akka/persistence/Eventsourced.scala | 55 +++++- .../scala/akka/persistence/Persistence.scala | 11 ++ .../akka/persistence/RecoveryPermitter.scala | 81 +++++++++ .../akka/persistence/ManyRecoveriesSpec.scala | 79 +++++++++ .../persistence/RecoveryPermitterSpec.scala | 158 ++++++++++++++++++ 8 files changed, 402 insertions(+), 8 deletions(-) create mode 100644 akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala create mode 100644 akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala create mode 100644 akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 624ad1381b..51590acd61 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -150,8 +150,14 @@ Recovery -------- By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. -New messages sent to a persistent actor during recovery do not interfere with replayed messages. New messages will -only be received by a persistent actor after recovery completes. +New messages sent to a persistent actor during recovery do not interfere with replayed messages. +They are stashed and received by a persistent actor after recovery phase completes. + +The number of concurrent recoveries of recoveries that can be in progress at the same time is limited +to not overload the system and the backend data store. When exceeding the limit the actors will wait +until other recoveries have been completed. This is configured by:: + + akka.persistence.max-concurrent-recoveries = 50 .. note:: Accessing the sender with ``getSender()`` for replayed messages will always result in a ``deadLetters`` reference, diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 1df3e8fc1d..88dca3c139 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -147,7 +147,13 @@ Recovery By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a persistent actor during recovery do not interfere with replayed messages. -They are cached and received by a persistent actor after recovery phase completes. +They are stashed and received by a persistent actor after recovery phase completes. + +The number of concurrent recoveries of recoveries that can be in progress at the same time is limited +to not overload the system and the backend data store. When exceeding the limit the actors will wait +until other recoveries have been completed. This is configured by:: + + akka.persistence.max-concurrent-recoveries = 50 .. note:: Accessing the ``sender()`` for replayed messages will always result in a ``deadLetters`` reference, diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 57fcaa6490..176329368b 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -10,6 +10,14 @@ # Default persistence extension settings. akka.persistence { + + # When starting many persistent actors at the same time the journal + # and its data store is protected from being overloaded by limiting number + # of recoveries that can be in progress at the same time. When + # exceeding the limit the actors will wait until other recoveries have + # been completed. + max-concurrent-recoveries = 50 + # Fully qualified class name providing a default internal stash overflow strategy. # It needs to be a subclass of akka.persistence.StashOverflowStrategyConfigurator. # The default strategy throws StashOverflowException. diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index d099294a63..956f510884 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -63,7 +63,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas private var sequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L - // safely null because we initialize it with a proper `recoveryStarted` state in aroundPreStart before any real action happens + // safely null because we initialize it with a proper `waitingRecoveryPermit` state in aroundPreStart before any real action happens private var currentState: State = null // Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands @@ -188,10 +188,15 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas // Fail fast on missing plugins. val j = journal; val s = snapshotStore - startRecovery(recovery) + requestRecoveryPermit() super.aroundPreStart() } + private def requestRecoveryPermit(): Unit = { + extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, self) + changeState(waitingRecoveryPermit(recovery)) + } + /** INTERNAL API. */ override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { @@ -217,7 +222,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas /** INTERNAL API. */ override protected[akka] def aroundPostRestart(reason: Throwable): Unit = { - startRecovery(recovery) + requestRecoveryPermit() super.aroundPostRestart(reason) } @@ -396,6 +401,28 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas def recoveryRunning: Boolean } + /** + * Initial state. Before starting the actual recovery it must get a permit from the + * `RecoveryPermitter`. When starting many persistent actors at the same time + * the journal and its data store is protected from being overloaded by limiting number + * of recoveries that can be in progress at the same time. When receiving + * `RecoveryPermitGranted` it switches to `recoveryStarted` state + * All incoming messages are stashed. + */ + private def waitingRecoveryPermit(recovery: Recovery) = new State { + + override def toString: String = s"waiting for recovery permit" + override def recoveryRunning: Boolean = true + + override def stateReceive(receive: Receive, message: Any) = message match { + case RecoveryPermitter.RecoveryPermitGranted ⇒ + startRecovery(recovery) + + case other ⇒ + stashInternally(other) + } + } + /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receiveRecover`. Then initiates a message replay, either starting @@ -430,7 +457,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas override def toString: String = s"recovery started (replayMax = [$replayMax])" override def recoveryRunning: Boolean = true - override def stateReceive(receive: Receive, message: Any) = message match { + override def stateReceive(receive: Receive, message: Any) = try message match { case LoadSnapshotResult(sso, toSnr) ⇒ timeoutCancellable.cancel() sso.foreach { @@ -454,7 +481,15 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case other ⇒ stashInternally(other) + } catch { + case NonFatal(e) ⇒ + returnRecoveryPermit() + throw e } + + private def returnRecoveryPermit(): Unit = + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) + } /** @@ -482,7 +517,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas override def recoveryRunning: Boolean = _recoveryRunning - override def stateReceive(receive: Receive, message: Any) = message match { + override def stateReceive(receive: Receive, message: Any) = try message match { case ReplayedMessage(p) ⇒ try { eventSeenInInterval = true @@ -492,6 +527,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case NonFatal(t) ⇒ timeoutCancellable.cancel() try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) + returnRecoveryPermit() } case RecoverySuccess(highestSeqNr) ⇒ timeoutCancellable.cancel() @@ -516,9 +552,18 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas // snapshot tick, ignore case other ⇒ stashInternally(other) + } catch { + case NonFatal(e) ⇒ + returnRecoveryPermit() + throw e } + private def returnRecoveryPermit(): Unit = + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) + private def transitToProcessingState(): Unit = { + returnRecoveryPermit() + if (eventBatch.nonEmpty) flushBatch() if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 97205ae3f7..579f6b3398 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -19,6 +19,7 @@ import scala.concurrent.duration._ import akka.util.Reflect import scala.util.control.NonFatal +import akka.annotation.InternalApi /** * Persistence configuration. @@ -146,6 +147,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private val config = system.settings.config.getConfig("akka.persistence") + /** + * INTERNAL API: When starting many persistent actors at the same time the journal + * its data store is protected from being overloaded by limiting number + * of recoveries that can be in progress at the same time. + */ + @InternalApi private[akka] val recoveryPermitter: ActorRef = { + val maxPermits = config.getInt("max-concurrent-recoveries") + system.systemActorOf(RecoveryPermitter.props(maxPermits), "recoveryPermitter") + } + // Lazy, so user is not forced to configure defaults when she is not using them. private lazy val defaultJournalPluginId = { val configPath = config.getString("journal.plugin") diff --git a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala new file mode 100644 index 0000000000..ae822285af --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import java.util.LinkedList +import akka.annotation.InternalApi +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Terminated + +/** + * INTERNAL API + */ +@InternalApi private[akka] object RecoveryPermitter { + def props(maxPermits: Int): Props = + Props(new RecoveryPermitter(maxPermits)) + + case object RequestRecoveryPermit + case object RecoveryPermitGranted + case object ReturnRecoveryPermit + +} + +/** + * INTERNAL API: When starting many persistent actors at the same time the journal + * its data store is protected from being overloaded by limiting number + * of recoveries that can be in progress at the same time. + */ +@InternalApi private[akka] class RecoveryPermitter(maxPermits: Int) extends Actor with ActorLogging { + import RecoveryPermitter._ + + private var usedPermits = 0 + private val pending = new LinkedList[ActorRef] + private var maxPendingStats = 0 + + def receive = { + case RequestRecoveryPermit ⇒ + context.watch(sender()) + if (usedPermits >= maxPermits) { + if (pending.isEmpty) + log.debug("Exceeded max-concurrent-recoveries [{}]. First pending {}", maxPermits, sender()) + pending.offer(sender()) + maxPendingStats = math.max(maxPendingStats, pending.size) + } else { + recoveryPermitGranted(sender()) + } + + case ReturnRecoveryPermit ⇒ + returnRecoveryPermit(sender()) + + case Terminated(ref) ⇒ + // pre-mature termination should be rare + if (!pending.remove(ref)) + returnRecoveryPermit(ref) + } + + private def returnRecoveryPermit(ref: ActorRef): Unit = { + usedPermits -= 1 + context.unwatch(ref) + if (usedPermits < 0) throw new IllegalStateException("permits must not be negative") + if (!pending.isEmpty) { + val ref = pending.poll() + recoveryPermitGranted(ref) + } + if (pending.isEmpty && maxPendingStats > 0) { + log.debug( + "Drained pending recovery permit requests, max in progress was [{}], still [{}] in progress", + usedPermits + maxPendingStats, usedPermits) + maxPendingStats = 0 + } + } + + private def recoveryPermitGranted(ref: ActorRef): Unit = { + usedPermits += 1 + ref ! RecoveryPermitGranted + } + +} diff --git a/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala new file mode 100644 index 0000000000..0fdba656b6 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import scala.concurrent.duration._ +import akka.actor._ +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import scala.concurrent.Await + +object ManyRecoveriesSpec { + + def testProps(name: String, latch: Option[TestLatch]): Props = + Props(new TestPersistentActor(name, latch)) + + final case class Cmd(s: String) + final case class Evt(s: String) + + class TestPersistentActor(name: String, latch: Option[TestLatch]) extends PersistentActor { + + override def persistenceId = name + + override def receiveRecover: Receive = { + case Evt(s) ⇒ + latch.foreach(Await.ready(_, 10.seconds)) + } + override def receiveCommand: Receive = { + case Cmd(s) ⇒ persist(Evt(s)) { _ ⇒ + sender() ! s"$persistenceId-$s-${lastSequenceNr}" + } + case "stop" ⇒ + context.stop(self) + } + } + +} + +class ManyRecoveriesSpec extends PersistenceSpec(ConfigFactory.parseString( + s""" + akka.actor.default-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 5 + } + } + akka.persistence.max-concurrent-recoveries = 3 + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.actor.warn-about-java-serializer-usage = off + """)) with ImplicitSender { + import ManyRecoveriesSpec._ + + "Many persistent actors" must { + "be able to recovery without overloading" in { + (1 to 100).foreach { n ⇒ + system.actorOf(testProps(s"a$n", latch = None)) ! Cmd("A") + expectMsg(s"a$n-A-1") + } + + // this would starve (block) all threads without max-concurrent-recoveries + val latch = TestLatch() + (1 to 100).foreach { n ⇒ + system.actorOf(testProps(s"a$n", Some(latch))) ! Cmd("B") + } + // this should be able to progress even though above is blocking, + // 2 remaining non-blocked threads + (1 to 10).foreach { n ⇒ + system.actorOf(TestActors.echoActorProps) ! n + expectMsg(n) + } + + latch.countDown() + receiveN(100).toSet should ===((1 to 100).map(n ⇒ s"a$n-B-2").toSet) + } + } + +} + diff --git a/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala new file mode 100644 index 0000000000..ffc04155b7 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import scala.concurrent.duration._ +import akka.actor._ +import akka.event.Logging +import akka.event.Logging.Warning +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } +import com.typesafe.config.ConfigFactory +import akka.testkit.TestProbe +import akka.testkit.TestActors + +object RecoveryPermitterSpec { + + def testProps(name: String, probe: ActorRef): Props = + Props(new TestPersistentActor(name, probe)) + + class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor { + + override def persistenceId = name + + override def postStop(): Unit = { + probe ! "postStop" + } + + override def receiveRecover: Receive = { + case RecoveryCompleted ⇒ + probe ! RecoveryCompleted + } + override def receiveCommand: Receive = { + case "stop" ⇒ + context.stop(self) + } + } + +} + +class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString( + s""" + akka.persistence.max-concurrent-recoveries = 3 + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.actor.warn-about-java-serializer-usage = off + """)) with ImplicitSender { + import RecoveryPermitterSpec._ + import RecoveryPermitter._ + + val permitter = Persistence(system).recoveryPermitter + val p1 = TestProbe() + val p2 = TestProbe() + val p3 = TestProbe() + val p4 = TestProbe() + val p5 = TestProbe() + + def requestPermit(p: TestProbe): Unit = { + permitter.tell(RequestRecoveryPermit, p.ref) + p.expectMsg(RecoveryPermitGranted) + } + + "RecoveryPermitter" must { + "grant permits up to the limit" in { + requestPermit(p1) + requestPermit(p2) + requestPermit(p3) + + permitter.tell(RequestRecoveryPermit, p4.ref) + permitter.tell(RequestRecoveryPermit, p5.ref) + p4.expectNoMsg(100.millis) + p5.expectNoMsg(10.millis) + + permitter.tell(ReturnRecoveryPermit, p2.ref) + p4.expectMsg(RecoveryPermitGranted) + p5.expectNoMsg(100.millis) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + p5.expectMsg(RecoveryPermitGranted) + + permitter.tell(ReturnRecoveryPermit, p3.ref) + permitter.tell(ReturnRecoveryPermit, p4.ref) + permitter.tell(ReturnRecoveryPermit, p5.ref) + } + + "grant recovery when all permits not used" in { + requestPermit(p1) + + system.actorOf(testProps("p2", p2.ref)) + p2.expectMsg(RecoveryCompleted) + permitter.tell(ReturnRecoveryPermit, p1.ref) + } + + "delay recovery when all permits used" in { + requestPermit(p1) + requestPermit(p2) + requestPermit(p3) + + val persistentActor = system.actorOf(testProps("p4", p4.ref)) + p4.watch(persistentActor) + persistentActor ! "stop" + p4.expectNoMsg(200.millis) + + permitter.tell(ReturnRecoveryPermit, p3.ref) + p4.expectMsg(RecoveryCompleted) + p4.expectMsg("postStop") + p4.expectTerminated(persistentActor) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + permitter.tell(ReturnRecoveryPermit, p2.ref) + } + + "return permit when actor is pre-maturely terminated before holding permit" in { + requestPermit(p1) + requestPermit(p2) + requestPermit(p3) + + val persistentActor = system.actorOf(testProps("p4", p4.ref)) + p4.expectNoMsg(100.millis) + + permitter.tell(RequestRecoveryPermit, p5.ref) + p5.expectNoMsg(100.millis) + + // PoisonPill is not stashed + persistentActor ! PoisonPill + p4.expectMsg("postStop") + + // persistentActor didn't hold a permit so still + p5.expectNoMsg(100.millis) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + p5.expectMsg(RecoveryPermitGranted) + + permitter.tell(ReturnRecoveryPermit, p2.ref) + permitter.tell(ReturnRecoveryPermit, p3.ref) + permitter.tell(ReturnRecoveryPermit, p5.ref) + } + + "return permit when actor is pre-maturely terminated when holding permit" in { + val actor = system.actorOf(TestActors.forwardActorProps(p1.ref)) + permitter.tell(RequestRecoveryPermit, actor) + p1.expectMsg(RecoveryPermitGranted) + + requestPermit(p2) + requestPermit(p3) + + permitter.tell(RequestRecoveryPermit, p4.ref) + p4.expectNoMsg(100.millis) + + actor ! PoisonPill + p4.expectMsg(RecoveryPermitGranted) + + permitter.tell(ReturnRecoveryPermit, p2.ref) + permitter.tell(ReturnRecoveryPermit, p3.ref) + permitter.tell(ReturnRecoveryPermit, p4.ref) + } + } + +} +