diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 464beb538c..ef95bb4d69 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -150,8 +150,14 @@ Recovery -------- By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. -New messages sent to a persistent actor during recovery do not interfere with replayed messages. New messages will -only be received by a persistent actor after recovery completes. +New messages sent to a persistent actor during recovery do not interfere with replayed messages. +They are stashed and received by a persistent actor after recovery phase completes. + +The number of concurrent recoveries of recoveries that can be in progress at the same time is limited +to not overload the system and the backend data store. When exceeding the limit the actors will wait +until other recoveries have been completed. This is configured by:: + + akka.persistence.max-concurrent-recoveries = 50 .. note:: Accessing the sender with ``getSender()`` for replayed messages will always result in a ``deadLetters`` reference, diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 1df3e8fc1d..88dca3c139 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -147,7 +147,13 @@ Recovery By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a persistent actor during recovery do not interfere with replayed messages. -They are cached and received by a persistent actor after recovery phase completes. +They are stashed and received by a persistent actor after recovery phase completes. + +The number of concurrent recoveries of recoveries that can be in progress at the same time is limited +to not overload the system and the backend data store. When exceeding the limit the actors will wait +until other recoveries have been completed. This is configured by:: + + akka.persistence.max-concurrent-recoveries = 50 .. note:: Accessing the ``sender()`` for replayed messages will always result in a ``deadLetters`` reference, diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 57fcaa6490..176329368b 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -10,6 +10,14 @@ # Default persistence extension settings. akka.persistence { + + # When starting many persistent actors at the same time the journal + # and its data store is protected from being overloaded by limiting number + # of recoveries that can be in progress at the same time. When + # exceeding the limit the actors will wait until other recoveries have + # been completed. + max-concurrent-recoveries = 50 + # Fully qualified class name providing a default internal stash overflow strategy. # It needs to be a subclass of akka.persistence.StashOverflowStrategyConfigurator. # The default strategy throws StashOverflowException. diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index d099294a63..956f510884 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -63,7 +63,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas private var sequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L - // safely null because we initialize it with a proper `recoveryStarted` state in aroundPreStart before any real action happens + // safely null because we initialize it with a proper `waitingRecoveryPermit` state in aroundPreStart before any real action happens private var currentState: State = null // Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands @@ -188,10 +188,15 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas // Fail fast on missing plugins. val j = journal; val s = snapshotStore - startRecovery(recovery) + requestRecoveryPermit() super.aroundPreStart() } + private def requestRecoveryPermit(): Unit = { + extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, self) + changeState(waitingRecoveryPermit(recovery)) + } + /** INTERNAL API. */ override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { @@ -217,7 +222,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas /** INTERNAL API. */ override protected[akka] def aroundPostRestart(reason: Throwable): Unit = { - startRecovery(recovery) + requestRecoveryPermit() super.aroundPostRestart(reason) } @@ -396,6 +401,28 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas def recoveryRunning: Boolean } + /** + * Initial state. Before starting the actual recovery it must get a permit from the + * `RecoveryPermitter`. When starting many persistent actors at the same time + * the journal and its data store is protected from being overloaded by limiting number + * of recoveries that can be in progress at the same time. When receiving + * `RecoveryPermitGranted` it switches to `recoveryStarted` state + * All incoming messages are stashed. + */ + private def waitingRecoveryPermit(recovery: Recovery) = new State { + + override def toString: String = s"waiting for recovery permit" + override def recoveryRunning: Boolean = true + + override def stateReceive(receive: Receive, message: Any) = message match { + case RecoveryPermitter.RecoveryPermitGranted ⇒ + startRecovery(recovery) + + case other ⇒ + stashInternally(other) + } + } + /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receiveRecover`. Then initiates a message replay, either starting @@ -430,7 +457,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas override def toString: String = s"recovery started (replayMax = [$replayMax])" override def recoveryRunning: Boolean = true - override def stateReceive(receive: Receive, message: Any) = message match { + override def stateReceive(receive: Receive, message: Any) = try message match { case LoadSnapshotResult(sso, toSnr) ⇒ timeoutCancellable.cancel() sso.foreach { @@ -454,7 +481,15 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case other ⇒ stashInternally(other) + } catch { + case NonFatal(e) ⇒ + returnRecoveryPermit() + throw e } + + private def returnRecoveryPermit(): Unit = + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) + } /** @@ -482,7 +517,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas override def recoveryRunning: Boolean = _recoveryRunning - override def stateReceive(receive: Receive, message: Any) = message match { + override def stateReceive(receive: Receive, message: Any) = try message match { case ReplayedMessage(p) ⇒ try { eventSeenInInterval = true @@ -492,6 +527,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case NonFatal(t) ⇒ timeoutCancellable.cancel() try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) + returnRecoveryPermit() } case RecoverySuccess(highestSeqNr) ⇒ timeoutCancellable.cancel() @@ -516,9 +552,18 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas // snapshot tick, ignore case other ⇒ stashInternally(other) + } catch { + case NonFatal(e) ⇒ + returnRecoveryPermit() + throw e } + private def returnRecoveryPermit(): Unit = + extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) + private def transitToProcessingState(): Unit = { + returnRecoveryPermit() + if (eventBatch.nonEmpty) flushBatch() if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 97205ae3f7..579f6b3398 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -19,6 +19,7 @@ import scala.concurrent.duration._ import akka.util.Reflect import scala.util.control.NonFatal +import akka.annotation.InternalApi /** * Persistence configuration. @@ -146,6 +147,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private val config = system.settings.config.getConfig("akka.persistence") + /** + * INTERNAL API: When starting many persistent actors at the same time the journal + * its data store is protected from being overloaded by limiting number + * of recoveries that can be in progress at the same time. + */ + @InternalApi private[akka] val recoveryPermitter: ActorRef = { + val maxPermits = config.getInt("max-concurrent-recoveries") + system.systemActorOf(RecoveryPermitter.props(maxPermits), "recoveryPermitter") + } + // Lazy, so user is not forced to configure defaults when she is not using them. private lazy val defaultJournalPluginId = { val configPath = config.getString("journal.plugin") diff --git a/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala new file mode 100644 index 0000000000..ae822285af --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/RecoveryPermitter.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import java.util.LinkedList +import akka.annotation.InternalApi +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Terminated + +/** + * INTERNAL API + */ +@InternalApi private[akka] object RecoveryPermitter { + def props(maxPermits: Int): Props = + Props(new RecoveryPermitter(maxPermits)) + + case object RequestRecoveryPermit + case object RecoveryPermitGranted + case object ReturnRecoveryPermit + +} + +/** + * INTERNAL API: When starting many persistent actors at the same time the journal + * its data store is protected from being overloaded by limiting number + * of recoveries that can be in progress at the same time. + */ +@InternalApi private[akka] class RecoveryPermitter(maxPermits: Int) extends Actor with ActorLogging { + import RecoveryPermitter._ + + private var usedPermits = 0 + private val pending = new LinkedList[ActorRef] + private var maxPendingStats = 0 + + def receive = { + case RequestRecoveryPermit ⇒ + context.watch(sender()) + if (usedPermits >= maxPermits) { + if (pending.isEmpty) + log.debug("Exceeded max-concurrent-recoveries [{}]. First pending {}", maxPermits, sender()) + pending.offer(sender()) + maxPendingStats = math.max(maxPendingStats, pending.size) + } else { + recoveryPermitGranted(sender()) + } + + case ReturnRecoveryPermit ⇒ + returnRecoveryPermit(sender()) + + case Terminated(ref) ⇒ + // pre-mature termination should be rare + if (!pending.remove(ref)) + returnRecoveryPermit(ref) + } + + private def returnRecoveryPermit(ref: ActorRef): Unit = { + usedPermits -= 1 + context.unwatch(ref) + if (usedPermits < 0) throw new IllegalStateException("permits must not be negative") + if (!pending.isEmpty) { + val ref = pending.poll() + recoveryPermitGranted(ref) + } + if (pending.isEmpty && maxPendingStats > 0) { + log.debug( + "Drained pending recovery permit requests, max in progress was [{}], still [{}] in progress", + usedPermits + maxPendingStats, usedPermits) + maxPendingStats = 0 + } + } + + private def recoveryPermitGranted(ref: ActorRef): Unit = { + usedPermits += 1 + ref ! RecoveryPermitGranted + } + +} diff --git a/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala new file mode 100644 index 0000000000..0fdba656b6 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import scala.concurrent.duration._ +import akka.actor._ +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import scala.concurrent.Await + +object ManyRecoveriesSpec { + + def testProps(name: String, latch: Option[TestLatch]): Props = + Props(new TestPersistentActor(name, latch)) + + final case class Cmd(s: String) + final case class Evt(s: String) + + class TestPersistentActor(name: String, latch: Option[TestLatch]) extends PersistentActor { + + override def persistenceId = name + + override def receiveRecover: Receive = { + case Evt(s) ⇒ + latch.foreach(Await.ready(_, 10.seconds)) + } + override def receiveCommand: Receive = { + case Cmd(s) ⇒ persist(Evt(s)) { _ ⇒ + sender() ! s"$persistenceId-$s-${lastSequenceNr}" + } + case "stop" ⇒ + context.stop(self) + } + } + +} + +class ManyRecoveriesSpec extends PersistenceSpec(ConfigFactory.parseString( + s""" + akka.actor.default-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 5 + } + } + akka.persistence.max-concurrent-recoveries = 3 + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.actor.warn-about-java-serializer-usage = off + """)) with ImplicitSender { + import ManyRecoveriesSpec._ + + "Many persistent actors" must { + "be able to recovery without overloading" in { + (1 to 100).foreach { n ⇒ + system.actorOf(testProps(s"a$n", latch = None)) ! Cmd("A") + expectMsg(s"a$n-A-1") + } + + // this would starve (block) all threads without max-concurrent-recoveries + val latch = TestLatch() + (1 to 100).foreach { n ⇒ + system.actorOf(testProps(s"a$n", Some(latch))) ! Cmd("B") + } + // this should be able to progress even though above is blocking, + // 2 remaining non-blocked threads + (1 to 10).foreach { n ⇒ + system.actorOf(TestActors.echoActorProps) ! n + expectMsg(n) + } + + latch.countDown() + receiveN(100).toSet should ===((1 to 100).map(n ⇒ s"a$n-B-2").toSet) + } + } + +} + diff --git a/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala new file mode 100644 index 0000000000..ffc04155b7 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import scala.concurrent.duration._ +import akka.actor._ +import akka.event.Logging +import akka.event.Logging.Warning +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } +import com.typesafe.config.ConfigFactory +import akka.testkit.TestProbe +import akka.testkit.TestActors + +object RecoveryPermitterSpec { + + def testProps(name: String, probe: ActorRef): Props = + Props(new TestPersistentActor(name, probe)) + + class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor { + + override def persistenceId = name + + override def postStop(): Unit = { + probe ! "postStop" + } + + override def receiveRecover: Receive = { + case RecoveryCompleted ⇒ + probe ! RecoveryCompleted + } + override def receiveCommand: Receive = { + case "stop" ⇒ + context.stop(self) + } + } + +} + +class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString( + s""" + akka.persistence.max-concurrent-recoveries = 3 + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.actor.warn-about-java-serializer-usage = off + """)) with ImplicitSender { + import RecoveryPermitterSpec._ + import RecoveryPermitter._ + + val permitter = Persistence(system).recoveryPermitter + val p1 = TestProbe() + val p2 = TestProbe() + val p3 = TestProbe() + val p4 = TestProbe() + val p5 = TestProbe() + + def requestPermit(p: TestProbe): Unit = { + permitter.tell(RequestRecoveryPermit, p.ref) + p.expectMsg(RecoveryPermitGranted) + } + + "RecoveryPermitter" must { + "grant permits up to the limit" in { + requestPermit(p1) + requestPermit(p2) + requestPermit(p3) + + permitter.tell(RequestRecoveryPermit, p4.ref) + permitter.tell(RequestRecoveryPermit, p5.ref) + p4.expectNoMsg(100.millis) + p5.expectNoMsg(10.millis) + + permitter.tell(ReturnRecoveryPermit, p2.ref) + p4.expectMsg(RecoveryPermitGranted) + p5.expectNoMsg(100.millis) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + p5.expectMsg(RecoveryPermitGranted) + + permitter.tell(ReturnRecoveryPermit, p3.ref) + permitter.tell(ReturnRecoveryPermit, p4.ref) + permitter.tell(ReturnRecoveryPermit, p5.ref) + } + + "grant recovery when all permits not used" in { + requestPermit(p1) + + system.actorOf(testProps("p2", p2.ref)) + p2.expectMsg(RecoveryCompleted) + permitter.tell(ReturnRecoveryPermit, p1.ref) + } + + "delay recovery when all permits used" in { + requestPermit(p1) + requestPermit(p2) + requestPermit(p3) + + val persistentActor = system.actorOf(testProps("p4", p4.ref)) + p4.watch(persistentActor) + persistentActor ! "stop" + p4.expectNoMsg(200.millis) + + permitter.tell(ReturnRecoveryPermit, p3.ref) + p4.expectMsg(RecoveryCompleted) + p4.expectMsg("postStop") + p4.expectTerminated(persistentActor) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + permitter.tell(ReturnRecoveryPermit, p2.ref) + } + + "return permit when actor is pre-maturely terminated before holding permit" in { + requestPermit(p1) + requestPermit(p2) + requestPermit(p3) + + val persistentActor = system.actorOf(testProps("p4", p4.ref)) + p4.expectNoMsg(100.millis) + + permitter.tell(RequestRecoveryPermit, p5.ref) + p5.expectNoMsg(100.millis) + + // PoisonPill is not stashed + persistentActor ! PoisonPill + p4.expectMsg("postStop") + + // persistentActor didn't hold a permit so still + p5.expectNoMsg(100.millis) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + p5.expectMsg(RecoveryPermitGranted) + + permitter.tell(ReturnRecoveryPermit, p2.ref) + permitter.tell(ReturnRecoveryPermit, p3.ref) + permitter.tell(ReturnRecoveryPermit, p5.ref) + } + + "return permit when actor is pre-maturely terminated when holding permit" in { + val actor = system.actorOf(TestActors.forwardActorProps(p1.ref)) + permitter.tell(RequestRecoveryPermit, actor) + p1.expectMsg(RecoveryPermitGranted) + + requestPermit(p2) + requestPermit(p3) + + permitter.tell(RequestRecoveryPermit, p4.ref) + p4.expectNoMsg(100.millis) + + actor ! PoisonPill + p4.expectMsg(RecoveryPermitGranted) + + permitter.tell(ReturnRecoveryPermit, p2.ref) + permitter.tell(ReturnRecoveryPermit, p3.ref) + permitter.tell(ReturnRecoveryPermit, p4.ref) + } + } + +} +