From 16cde39de8c331f85593bebca8d1cb9239f23b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 9 Jun 2016 14:58:32 +0200 Subject: [PATCH] Better recovery timeout for persistent actors #20738 --- .../scala/akka/persistence/Eventsourced.scala | 54 ++++++++++++----- .../PersistentActorRecoveryTimeoutSpec.scala | 60 +++++++++++++++++++ 2 files changed, 99 insertions(+), 15 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 2208a638bc..dd6fd10ad1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -31,6 +31,9 @@ private[persistence] object Eventsourced { private final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation /** does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */ private final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + + /** message used to detect that recovery timed out */ + private final case class RecoveryTick(snapshot: Boolean) } /** @@ -463,9 +466,12 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas */ private def recoveryStarted(replayMax: Long) = new State { - // protect against replay stalling forever because of journal overloaded and such - private val previousRecieveTimeout = context.receiveTimeout - context.setReceiveTimeout(extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")) + // protect against snapshot stalling forever because of journal overloaded and such + val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout") + val timeoutCancellable = { + import context.dispatcher + context.system.scheduler.scheduleOnce(timeout, self, RecoveryTick(snapshot = true)) + } private val recoveryBehavior: Receive = { val _receiveRecover = receiveRecover @@ -486,19 +492,22 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas override def stateReceive(receive: Receive, message: Any) = message match { case LoadSnapshotResult(sso, toSnr) ⇒ + timeoutCancellable.cancel() sso.foreach { case SelectedSnapshot(metadata, snapshot) ⇒ setLastSequenceNr(metadata.sequenceNr) // Since we are recovering we can ignore the receive behavior from the stack Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot)) } - changeState(recovering(recoveryBehavior, previousRecieveTimeout)) + changeState(recovering(recoveryBehavior, timeout)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) - case ReceiveTimeout ⇒ + + case RecoveryTick(true) ⇒ try onRecoveryFailure( - new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${context.receiveTimeout.toSeconds}s"), + new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout s"), event = None) finally context.stop(self) + case other ⇒ stashInternally(other) } @@ -514,8 +523,16 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * * All incoming messages are stashed. */ - private def recovering(recoveryBehavior: Receive, previousReceiveTimeout: Duration) = + private def recovering(recoveryBehavior: Receive, timeout: FiniteDuration) = new State { + + // protect against snapshot stalling forever because of journal overloaded and such + val timeoutCancellable = { + import context.dispatcher + context.system.scheduler.schedule(timeout, timeout, self, RecoveryTick(snapshot = false)) + } + var eventSeenInInterval = false + override def toString: String = "replay started" override def recoveryRunning: Boolean = true @@ -523,14 +540,16 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas override def stateReceive(receive: Receive, message: Any) = message match { case ReplayedMessage(p) ⇒ try { + eventSeenInInterval = true updateLastSequenceNr(p) Eventsourced.super.aroundReceive(recoveryBehavior, p) } catch { case NonFatal(t) ⇒ + timeoutCancellable.cancel() try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) } case RecoverySuccess(highestSeqNr) ⇒ - resetRecieveTimeout() + timeoutCancellable.cancel() onReplaySuccess() // callback for subclass implementation changeState(processingCommands) sequenceNr = highestSeqNr @@ -538,20 +557,21 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas internalStash.unstashAll() Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) case ReplayMessagesFailure(cause) ⇒ - resetRecieveTimeout() + timeoutCancellable.cancel() try onRecoveryFailure(cause, event = None) finally context.stop(self) - case ReceiveTimeout ⇒ + case RecoveryTick(false) if !eventSeenInInterval ⇒ + timeoutCancellable.cancel() try onRecoveryFailure( - new RecoveryTimedOut(s"Recovery timed out, didn't get event within ${context.receiveTimeout.toSeconds}s, highest sequence number seen ${sequenceNr}"), + new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout s, highest sequence number seen $sequenceNr"), event = None) finally context.stop(self) + case RecoveryTick(false) ⇒ + eventSeenInInterval = false + case RecoveryTick(true) ⇒ + // snapshot tick, ignore case other ⇒ stashInternally(other) } - - private def resetRecieveTimeout(): Unit = { - context.setReceiveTimeout(previousReceiveTimeout) - } } private def flushBatch() { @@ -615,6 +635,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case WriteMessagesFailed(_) ⇒ writeInProgress = false () // it will be stopped by the first WriteMessageFailure message + + case _: RecoveryTick => + // we may have one of these in the mailbox before the scheduled timeout + // is cancelled when recovery has completed, just consume it so the concrete actor never sees it } def onWriteMessageComplete(err: Boolean): Unit diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala index 5f5c7e964b..c4443831ad 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala @@ -32,6 +32,28 @@ object PersistentActorRecoveryTimeoutSpec { } } + class TestReceiveTimeoutActor(receiveTimeout: FiniteDuration, probe: ActorRef) extends NamedPersistentActor("recovery-timeout-actor-2") { + + override def preStart(): Unit = { + context.setReceiveTimeout(receiveTimeout) + } + + override def receiveRecover: Receive = { + case RecoveryCompleted ⇒ probe ! context.receiveTimeout + case _ ⇒ // we don't care + } + + override def receiveCommand: Receive = { + case x ⇒ persist(x) { _ ⇒ + sender() ! x + } + } + + override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = { + probe ! Failure(cause) + } + } + } class PersistentActorRecoveryTimeoutSpec extends AkkaSpec(PersistentActorRecoveryTimeoutSpec.config) with ImplicitSender { @@ -69,6 +91,44 @@ class PersistentActorRecoveryTimeoutSpec extends AkkaSpec(PersistentActorRecover probe.expectMsgType[Failure].cause shouldBe a[RecoveryTimedOut] expectTerminated(replaying) + // avoid having it stuck in the next test from the + // last read request above + SteppingInmemJournal.step(journal) + } + + "should not interfere with receive timeouts" in { + val timeout = 42.days + + val probe = TestProbe() + val persisting = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor], timeout, probe.ref)) + + awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds) + val journal = SteppingInmemJournal.getRef(journalId) + + // initial read highest + SteppingInmemJournal.step(journal) + + persisting ! "A" + SteppingInmemJournal.step(journal) + expectMsg("A") + + watch(persisting) + system.stop(persisting) + expectTerminated(persisting) + + // now replay, but don't give the journal any tokens to replay events + // so that we cause the timeout to trigger + val replaying = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor], timeout, probe.ref)) + + // initial read highest + SteppingInmemJournal.step(journal) + + // read journal + SteppingInmemJournal.step(journal) + + // we should get initial receive timeout back from actor when replay completes + probe.expectMsg(timeout) + } }