From 896ea53dd3f8a5b79a98c2563b93cc3334766595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 3 Jun 2016 14:17:41 +0200 Subject: [PATCH] recovery timeout for persistent actors #20698 --- .../scala/akka/cluster/QuickRestartSpec.scala | 6 +- .../src/main/resources/reference.conf | 36 +++++---- .../scala/akka/persistence/Eventsourced.scala | 79 ++++++++++++------- .../scala/akka/persistence/Persistence.scala | 3 + .../akka/persistence/PersistentActor.scala | 5 ++ .../PersistentActorRecoveryTimeoutSpec.scala | 76 ++++++++++++++++++ 6 files changed, 161 insertions(+), 44 deletions(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala index 0a4e68143a..713c02160a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala @@ -65,11 +65,13 @@ abstract class QuickRestartSpec runOn(second) { restartingSystem = if (restartingSystem == null) - ActorSystem(system.name, + ActorSystem( + system.name, ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]") .withFallback(system.settings.config)) else - ActorSystem(system.name, + ActorSystem( + system.name, ConfigFactory.parseString(s""" akka.cluster.roles = [round-$n] akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index c246b5c89c..4619f42756 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -92,11 +92,11 @@ akka.persistence { } } } - + # Fallback settings for journal plugin configurations. # These settings are used if they are not defined in plugin config section. journal-plugin-fallback { - + # Fully qualified class name providing journal plugin api implementation. # It is mandatory to specify this property. # The class must have a constructor without parameters or constructor with @@ -105,40 +105,46 @@ akka.persistence { # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" - + # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" - + # Removed: used to be the Maximum size of a persistent message batch written to the journal. # Now this setting is without function, PersistentActor will write as many messages # as it has accumulated since the last write. max-message-batch-size = 200 - + + # If there is more time in between individual events gotten from the journal + # recovery than this the recovery will fail. + # Note that it also affects reading the snapshot before replaying events on + # top of it, even though it is configured for the journal. + recovery-event-timeout = 30s + circuit-breaker { max-failures = 10 call-timeout = 10s reset-timeout = 30s } - - # The replay filter can detect a corrupt event stream by inspecting - # sequence numbers and writerUuid when replaying events. + + # The replay filter can detect a corrupt event stream by inspecting + # sequence numbers and writerUuid when replaying events. replay-filter { # What the filter should do when detecting invalid events. # Supported values: - # `repair-by-discard-old` : discard events from old writers, + # `repair-by-discard-old` : discard events from old writers, # warning is logged # `fail` : fail the replay, error is logged # `warn` : log warning but emit events untouched # `off` : disable this feature completely mode = repair-by-discard-old - + # It uses a look ahead buffer for analyzing the events. # This defines the size (in number of events) of the buffer. window-size = 100 - + # How many old writerUuid to remember max-old-writers = 10 - + # Set this to `on` to enable detailed debug logging of each # replayed event. debug = off @@ -148,8 +154,8 @@ akka.persistence { # Fallback settings for snapshot store plugin configurations # These settings are used if they are not defined in plugin config section. snapshot-store-plugin-fallback { - - # Fully qualified class name providing snapshot store plugin api + + # Fully qualified class name providing snapshot store plugin api # implementation. It is mandatory to specify this property if # snapshot store is enabled. # The class must have a constructor without parameters or constructor with @@ -158,7 +164,7 @@ akka.persistence { # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" - + circuit-breaker { max-failures = 5 call-timeout = 20s diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 6b8c1d1b20..2208a638bc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -9,11 +9,13 @@ import java.util.UUID import scala.collection.immutable import scala.util.control.NonFatal -import akka.actor.DeadLetter -import akka.actor.StashOverflowException +import akka.actor.{ DeadLetter, ReceiveTimeout, StashOverflowException } +import akka.util.Helpers.ConfigOps import akka.event.Logging import akka.event.LoggingAdapter +import scala.concurrent.duration.{ Duration, FiniteDuration } + /** * INTERNAL API */ @@ -461,6 +463,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas */ private def recoveryStarted(replayMax: Long) = new State { + // protect against replay stalling forever because of journal overloaded and such + private val previousRecieveTimeout = context.receiveTimeout + context.setReceiveTimeout(extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")) + private val recoveryBehavior: Receive = { val _receiveRecover = receiveRecover @@ -471,6 +477,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas _receiveRecover(s) case RecoveryCompleted if _receiveRecover.isDefinedAt(RecoveryCompleted) ⇒ _receiveRecover(RecoveryCompleted) + } } @@ -485,8 +492,13 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas // Since we are recovering we can ignore the receive behavior from the stack Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot)) } - changeState(recovering(recoveryBehavior)) + changeState(recovering(recoveryBehavior, previousRecieveTimeout)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) + case ReceiveTimeout ⇒ + try onRecoveryFailure( + new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${context.receiveTimeout.toSeconds}s"), + event = None) + finally context.stop(self) case other ⇒ stashInternally(other) } @@ -502,32 +514,45 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * * All incoming messages are stashed. */ - private def recovering(recoveryBehavior: Receive) = new State { - override def toString: String = "replay started" - override def recoveryRunning: Boolean = true + private def recovering(recoveryBehavior: Receive, previousReceiveTimeout: Duration) = + new State { + override def toString: String = "replay started" - override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(p) ⇒ - try { - updateLastSequenceNr(p) - Eventsourced.super.aroundReceive(recoveryBehavior, p) - } catch { - case NonFatal(t) ⇒ - try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) - } - case RecoverySuccess(highestSeqNr) ⇒ - onReplaySuccess() // callback for subclass implementation - changeState(processingCommands) - sequenceNr = highestSeqNr - setLastSequenceNr(highestSeqNr) - internalStash.unstashAll() - Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) - case ReplayMessagesFailure(cause) ⇒ - try onRecoveryFailure(cause, event = None) finally context.stop(self) - case other ⇒ - stashInternally(other) + override def recoveryRunning: Boolean = true + + override def stateReceive(receive: Receive, message: Any) = message match { + case ReplayedMessage(p) ⇒ + try { + updateLastSequenceNr(p) + Eventsourced.super.aroundReceive(recoveryBehavior, p) + } catch { + case NonFatal(t) ⇒ + try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) + } + case RecoverySuccess(highestSeqNr) ⇒ + resetRecieveTimeout() + onReplaySuccess() // callback for subclass implementation + changeState(processingCommands) + sequenceNr = highestSeqNr + setLastSequenceNr(highestSeqNr) + internalStash.unstashAll() + Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) + case ReplayMessagesFailure(cause) ⇒ + resetRecieveTimeout() + try onRecoveryFailure(cause, event = None) finally context.stop(self) + case ReceiveTimeout ⇒ + try onRecoveryFailure( + new RecoveryTimedOut(s"Recovery timed out, didn't get event within ${context.receiveTimeout.toSeconds}s, highest sequence number seen ${sequenceNr}"), + event = None) + finally context.stop(self) + case other ⇒ + stashInternally(other) + } + + private def resetRecieveTimeout(): Unit = { + context.setReceiveTimeout(previousReceiveTimeout) + } } - } private def flushBatch() { if (eventBatch.nonEmpty) { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index c8dd4b7e49..f76893c117 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -6,15 +6,18 @@ package akka.persistence import java.util.concurrent.atomic.AtomicReference import java.util.function.Consumer + import akka.actor._ import akka.event.{ Logging, LoggingAdapter } import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } import akka.util.Collections.EmptyImmutableSeq import akka.util.Helpers.ConfigOps import com.typesafe.config.Config + import scala.annotation.tailrec import scala.concurrent.duration._ import akka.util.Reflect + import scala.util.control.NonFatal /** diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index b95689458b..520c9c3b24 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -4,11 +4,14 @@ package akka.persistence import java.lang.{ Iterable ⇒ JIterable } + import akka.actor._ import akka.japi.Procedure import akka.japi.Util import com.typesafe.config.Config +import scala.util.control.NoStackTrace + abstract class RecoveryCompleted /** * Sent to a [[PersistentActor]] when the journal replay has been finished. @@ -98,6 +101,8 @@ object Recovery { val none: Recovery = Recovery(toSequenceNr = 0L) } +final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace + /** * This defines how to handle the current received message which failed to stash, when the size of * Stash exceeding the capacity of Stash. diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala new file mode 100644 index 0000000000..5f5c7e964b --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala @@ -0,0 +1,76 @@ +package akka.persistence + +import akka.actor.Status.Failure +import akka.actor.{ Actor, ActorRef, Props } +import akka.persistence.journal.SteppingInmemJournal +import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object PersistentActorRecoveryTimeoutSpec { + val journalId = "persistent-actor-recovery-timeout-spec" + + def config = + SteppingInmemJournal.config(PersistentActorRecoveryTimeoutSpec.journalId).withFallback( + ConfigFactory.parseString( + """ + |akka.persistence.journal.stepping-inmem.recovery-event-timeout=100ms + """.stripMargin)).withFallback(PersistenceSpec.config("stepping-inmem", "PersistentActorRecoveryTimeoutSpec")) + + class TestActor(probe: ActorRef) extends NamedPersistentActor("recovery-timeout-actor") { + override def receiveRecover: Receive = Actor.emptyBehavior + + override def receiveCommand: Receive = { + case x ⇒ persist(x) { _ ⇒ + sender() ! x + } + } + + override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = { + probe ! Failure(cause) + } + } + +} + +class PersistentActorRecoveryTimeoutSpec extends AkkaSpec(PersistentActorRecoveryTimeoutSpec.config) with ImplicitSender { + + import PersistentActorRecoveryTimeoutSpec.journalId + + "The recovery timeout" should { + + "fail recovery if timeout is not met when recovering" in { + val probe = TestProbe() + val persisting = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestActor], probe.ref)) + + awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds) + val journal = SteppingInmemJournal.getRef(journalId) + + // initial read highest + SteppingInmemJournal.step(journal) + + persisting ! "A" + SteppingInmemJournal.step(journal) + expectMsg("A") + + watch(persisting) + system.stop(persisting) + expectTerminated(persisting) + + // now replay, but don't give the journal any tokens to replay events + // so that we cause the timeout to trigger + val replaying = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestActor], probe.ref)) + watch(replaying) + + // initial read highest + SteppingInmemJournal.step(journal) + + probe.expectMsgType[Failure].cause shouldBe a[RecoveryTimedOut] + expectTerminated(replaying) + + } + + } + +}