Better recovery timeout for persistent actors #20738

This commit is contained in:
Johan Andrén 2016-06-09 14:58:32 +02:00
parent e40a2b21c4
commit 16cde39de8
2 changed files with 99 additions and 15 deletions

View file

@ -31,6 +31,9 @@ private[persistence] object Eventsourced {
private final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */
private final case class AsyncHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** message used to detect that recovery timed out */
private final case class RecoveryTick(snapshot: Boolean)
}
/**
@ -463,9 +466,12 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
*/
private def recoveryStarted(replayMax: Long) = new State {
// protect against replay stalling forever because of journal overloaded and such
private val previousRecieveTimeout = context.receiveTimeout
context.setReceiveTimeout(extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout"))
// protect against snapshot stalling forever because of journal overloaded and such
val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
val timeoutCancellable = {
import context.dispatcher
context.system.scheduler.scheduleOnce(timeout, self, RecoveryTick(snapshot = true))
}
private val recoveryBehavior: Receive = {
val _receiveRecover = receiveRecover
@ -486,19 +492,22 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
override def stateReceive(receive: Receive, message: Any) = message match {
case LoadSnapshotResult(sso, toSnr)
timeoutCancellable.cancel()
sso.foreach {
case SelectedSnapshot(metadata, snapshot)
setLastSequenceNr(metadata.sequenceNr)
// Since we are recovering we can ignore the receive behavior from the stack
Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot))
}
changeState(recovering(recoveryBehavior, previousRecieveTimeout))
changeState(recovering(recoveryBehavior, timeout))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case ReceiveTimeout
case RecoveryTick(true)
try onRecoveryFailure(
new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${context.receiveTimeout.toSeconds}s"),
new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout s"),
event = None)
finally context.stop(self)
case other
stashInternally(other)
}
@ -514,8 +523,16 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
*
* All incoming messages are stashed.
*/
private def recovering(recoveryBehavior: Receive, previousReceiveTimeout: Duration) =
private def recovering(recoveryBehavior: Receive, timeout: FiniteDuration) =
new State {
// protect against snapshot stalling forever because of journal overloaded and such
val timeoutCancellable = {
import context.dispatcher
context.system.scheduler.schedule(timeout, timeout, self, RecoveryTick(snapshot = false))
}
var eventSeenInInterval = false
override def toString: String = "replay started"
override def recoveryRunning: Boolean = true
@ -523,14 +540,16 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p)
try {
eventSeenInInterval = true
updateLastSequenceNr(p)
Eventsourced.super.aroundReceive(recoveryBehavior, p)
} catch {
case NonFatal(t)
timeoutCancellable.cancel()
try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
}
case RecoverySuccess(highestSeqNr)
resetRecieveTimeout()
timeoutCancellable.cancel()
onReplaySuccess() // callback for subclass implementation
changeState(processingCommands)
sequenceNr = highestSeqNr
@ -538,20 +557,21 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
internalStash.unstashAll()
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
case ReplayMessagesFailure(cause)
resetRecieveTimeout()
timeoutCancellable.cancel()
try onRecoveryFailure(cause, event = None) finally context.stop(self)
case ReceiveTimeout
case RecoveryTick(false) if !eventSeenInInterval
timeoutCancellable.cancel()
try onRecoveryFailure(
new RecoveryTimedOut(s"Recovery timed out, didn't get event within ${context.receiveTimeout.toSeconds}s, highest sequence number seen ${sequenceNr}"),
new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout s, highest sequence number seen $sequenceNr"),
event = None)
finally context.stop(self)
case RecoveryTick(false)
eventSeenInInterval = false
case RecoveryTick(true)
// snapshot tick, ignore
case other
stashInternally(other)
}
private def resetRecieveTimeout(): Unit = {
context.setReceiveTimeout(previousReceiveTimeout)
}
}
private def flushBatch() {
@ -615,6 +635,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
case WriteMessagesFailed(_)
writeInProgress = false
() // it will be stopped by the first WriteMessageFailure message
case _: RecoveryTick =>
// we may have one of these in the mailbox before the scheduled timeout
// is cancelled when recovery has completed, just consume it so the concrete actor never sees it
}
def onWriteMessageComplete(err: Boolean): Unit

View file

@ -32,6 +32,28 @@ object PersistentActorRecoveryTimeoutSpec {
}
}
class TestReceiveTimeoutActor(receiveTimeout: FiniteDuration, probe: ActorRef) extends NamedPersistentActor("recovery-timeout-actor-2") {
override def preStart(): Unit = {
context.setReceiveTimeout(receiveTimeout)
}
override def receiveRecover: Receive = {
case RecoveryCompleted probe ! context.receiveTimeout
case _ // we don't care
}
override def receiveCommand: Receive = {
case x persist(x) { _
sender() ! x
}
}
override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = {
probe ! Failure(cause)
}
}
}
class PersistentActorRecoveryTimeoutSpec extends AkkaSpec(PersistentActorRecoveryTimeoutSpec.config) with ImplicitSender {
@ -69,6 +91,44 @@ class PersistentActorRecoveryTimeoutSpec extends AkkaSpec(PersistentActorRecover
probe.expectMsgType[Failure].cause shouldBe a[RecoveryTimedOut]
expectTerminated(replaying)
// avoid having it stuck in the next test from the
// last read request above
SteppingInmemJournal.step(journal)
}
"should not interfere with receive timeouts" in {
val timeout = 42.days
val probe = TestProbe()
val persisting = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor], timeout, probe.ref))
awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds)
val journal = SteppingInmemJournal.getRef(journalId)
// initial read highest
SteppingInmemJournal.step(journal)
persisting ! "A"
SteppingInmemJournal.step(journal)
expectMsg("A")
watch(persisting)
system.stop(persisting)
expectTerminated(persisting)
// now replay, but don't give the journal any tokens to replay events
// so that we cause the timeout to trigger
val replaying = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor], timeout, probe.ref))
// initial read highest
SteppingInmemJournal.step(journal)
// read journal
SteppingInmemJournal.step(journal)
// we should get initial receive timeout back from actor when replay completes
probe.expectMsg(timeout)
}
}