=per #15943 Avoid initite restart loop when recovery fails

* also include the failing message and sequenceNr in the RecoveryFailure
  message
* remove the "putting back" the message first in the mailbox
This commit is contained in:
Patrik Nordwall 2014-12-13 15:35:12 +01:00
parent da7991a3d5
commit 72d54626f3
6 changed files with 123 additions and 75 deletions

View file

@ -7,7 +7,6 @@ package akka.persistence
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable import scala.collection.immutable
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.ActorCell
import akka.actor.ActorKilledException import akka.actor.ActorKilledException
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.actor.Stash import akka.actor.Stash
@ -47,20 +46,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement() private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement()
// FIXME useJournalBatching
// I have a feeling that this var can be eliminated, either by just removing the functionality or by
// checking pendingStashingPersistInvocations > 0 in doAroundReceive.
//
//On the first suggestion: when a write is currently pending, how much do we gain in latency
// by submitting the persist writes immediately instead of waiting until the acknowledgement
// comes in? The other thought is that sync and async persistence will rarely be mixed within
//the same Actor, in which case this flag actually does nothing (unless I am missing something).
private var journalBatch = Vector.empty[PersistentEnvelope] private var journalBatch = Vector.empty[PersistentEnvelope]
private val maxMessageBatchSize = extension.settings.journal.maxMessageBatchSize private val maxMessageBatchSize = extension.settings.journal.maxMessageBatchSize
private var writeInProgress = false private var writeInProgress = false
private var sequenceNr: Long = 0L private var sequenceNr: Long = 0L
private var _lastSequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L
private var currentState: State = recoveryPending private var currentState: State = recoveryPending
@ -471,7 +460,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`.
* *
* If processing of a replayed event fails, the exception is caught and * If processing of a replayed event fails, the exception is caught and
* stored for being thrown later and state is changed to `recoveryFailed`. * stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`.
* *
* All incoming messages are stashed. * All incoming messages are stashed.
*/ */
@ -487,8 +476,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
Eventsourced.super.aroundReceive(recoveryBehavior, p) Eventsourced.super.aroundReceive(recoveryBehavior, p)
} catch { } catch {
case NonFatal(t) case NonFatal(t)
val currentMsg = context.asInstanceOf[ActorCell].currentMessage changeState(replayFailed(recoveryBehavior, t, p))
changeState(replayFailed(t, currentMsg)) // delay throwing exception to prepareRestart
} }
case ReplayMessagesSuccess case ReplayMessagesSuccess
onReplaySuccess() // callback for subclass implementation onReplaySuccess() // callback for subclass implementation
@ -497,23 +485,23 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case ReplayMessagesFailure(cause) case ReplayMessagesFailure(cause)
onReplayFailure(cause) // callback for subclass implementation onReplayFailure(cause) // callback for subclass implementation
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)) Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None))
case other case other
internalStash.stash() internalStash.stash()
} }
} }
/** /**
* Consumes remaining replayed messages and then changes to `prepareRestart`. The * Consumes remaining replayed messages and then emits RecoveryFailure to the
* message that caused the exception during replay, is re-added to the mailbox and * `receiveRecover` behavior.
* re-received in `prepareRestart` state.
*/ */
private def replayFailed(cause: Throwable, failureMessage: Envelope) = new State { private def replayFailed(recoveryBehavior: Receive, cause: Throwable, failed: PersistentRepr) = new State {
override def toString: String = "replay failed" override def toString: String = "replay failed"
override def recoveryRunning: Boolean = true override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match { override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p) updateLastSequenceNr(p)
case ReplayMessagesFailure(_) case ReplayMessagesFailure(_)
replayCompleted() replayCompleted()
// journal couldn't tell the maximum stored sequence number, hence the next // journal couldn't tell the maximum stored sequence number, hence the next
@ -521,27 +509,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
// Recover(lastSequenceNr) is sent by preRestart // Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue) setLastSequenceNr(Long.MaxValue)
case ReplayMessagesSuccess replayCompleted() case ReplayMessagesSuccess replayCompleted()
case ReplayedMessage(p) updateLastSequenceNr(p)
case r: Recover // ignore case r: Recover // ignore
case _ internalStash.stash() case _ internalStash.stash()
} }
def replayCompleted(): Unit = { def replayCompleted(): Unit = {
changeState(prepareRestart(cause)) // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
mailbox.enqueueFirst(self, failureMessage) Eventsourced.super.aroundReceive(recoveryBehavior,
} RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload))))
}
/**
* Re-receives the replayed message that caused an exception and re-throws that exception.
*/
private def prepareRestart(cause: Throwable) = new State {
override def toString: String = "prepare restart"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(_) throw cause
case _ // ignore
} }
} }
@ -561,7 +536,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
internalStash.unstashAll() internalStash.unstashAll()
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
case ReadHighestSequenceNrFailure(cause) case ReadHighestSequenceNrFailure(cause)
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)) Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None))
case other case other
internalStash.stash() internalStash.stash()
} }

View file

@ -21,7 +21,7 @@ import akka.persistence.serialization.Message
* *
* In essence it is either an [[NonPersistentRepr]] or [[PersistentRepr]]. * In essence it is either an [[NonPersistentRepr]] or [[PersistentRepr]].
*/ */
private[persistence] sealed trait PersistentEnvelope { // FIXME PN: Rename to PersistentEnvelope private[persistence] sealed trait PersistentEnvelope {
def payload: Any def payload: Any
def sender: ActorRef def sender: ActorRef
} }

View file

@ -23,9 +23,24 @@ case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/** /**
* Sent to a [[PersistentActor]] if a journal fails to replay messages or fetch that persistent actor's * Sent to a [[PersistentActor]] if a journal fails to replay messages or fetch that persistent actor's
* highest sequence number. If not handled, the actor will be stopped. * highest sequence number. If not handled, the actor will be stopped.
*
* Contains the [[#sequenceNr]] of the message that could not be replayed, if it
* failed at a specific message.
*
* Contains the [[#payload]] of the message that could not be replayed, if it
* failed at a specific message.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class RecoveryFailure(cause: Throwable) case class RecoveryFailure(cause: Throwable)(failingMessage: Option[(Long, Any)]) {
override def toString: String = failingMessage match {
case Some((sequenceNr, payload)) s"RecoveryFailure(${cause.getMessage},$sequenceNr,$payload)"
case None s"RecoveryFailure(${cause.getMessage})"
}
def sequenceNr: Option[Long] = failingMessage.map { case (snr, _) snr }
def payload: Option[Any] = failingMessage.map { case (_, payload) payload }
}
abstract class RecoveryCompleted abstract class RecoveryCompleted
/** /**

View file

@ -8,7 +8,6 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.AbstractActor import akka.actor.AbstractActor
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorKilledException import akka.actor.ActorKilledException
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.Stash import akka.actor.Stash
@ -237,7 +236,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
/** /**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receiveRecover`. Then initiates a message replay, either starting * message to the actor's `receive`. Then initiates a message replay, either starting
* from the loaded snapshot or from scratch, and switches to `replayStarted` state. * from the loaded snapshot or from scratch, and switches to `replayStarted` state.
* All incoming messages are stashed. * All incoming messages are stashed.
* *
@ -264,7 +263,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
} }
/** /**
* Processes replayed messages, if any. The actor's `receiveRecover` is invoked with the replayed * Processes replayed messages, if any. The actor's `receive` is invoked with the replayed
* events. * events.
* *
* If replay succeeds it switches to `initializing` state and requests the highest stored sequence * If replay succeeds it switches to `initializing` state and requests the highest stored sequence
@ -272,7 +271,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`.
* *
* If processing of a replayed event fails, the exception is caught and * If processing of a replayed event fails, the exception is caught and
* stored for being thrown later and state is changed to `recoveryFailed`. * stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`.
* *
* All incoming messages are stashed. * All incoming messages are stashed.
*/ */
@ -294,15 +293,14 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
PersistentView.super.aroundReceive(receive, p.payload) PersistentView.super.aroundReceive(receive, p.payload)
} catch { } catch {
case NonFatal(t) case NonFatal(t)
val currentMsg = context.asInstanceOf[ActorCell].currentMessage changeState(replayFailed(t, p))
changeState(replayFailed(t, currentMsg)) // delay throwing exception to prepareRestart
} }
case ReplayMessagesSuccess case ReplayMessagesSuccess
onReplayComplete(await) onReplayComplete(await)
case ReplayMessagesFailure(cause) case ReplayMessagesFailure(cause)
onReplayComplete(await) onReplayComplete(await)
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)) PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(None))
case other case other
internalStash.stash() internalStash.stash()
} }
@ -312,50 +310,37 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
*/ */
private def onReplayComplete(await: Boolean): Unit = { private def onReplayComplete(await: Boolean): Unit = {
changeState(idle) changeState(idle)
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax))) if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self,
Update(await = false, autoUpdateReplayMax)))
if (await) internalStash.unstashAll() if (await) internalStash.unstashAll()
} }
} }
/** /**
* Consumes remaining replayed messages and then changes to `prepareRestart`. The * Consumes remaining replayed messages and then emits RecoveryFailure to the
* message that caused the exception during replay, is re-added to the mailbox and * `receive` behavior.
* re-received in `prepareRestart` state.
*/ */
private def replayFailed(cause: Throwable, failureMessage: Envelope) = new State { private def replayFailed(cause: Throwable, failed: PersistentRepr) = new State {
override def toString: String = "replay failed" override def toString: String = "replay failed"
override def recoveryRunning: Boolean = true override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match { override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p) updateLastSequenceNr(p)
case ReplayMessagesFailure(_) case ReplayMessagesFailure(_)
replayCompleted() replayCompleted(receive)
// journal couldn't tell the maximum stored sequence number, hence the next // journal couldn't tell the maximum stored sequence number, hence the next
// replay must be a full replay (up to the highest stored sequence number) // replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by preRestart // Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue) setLastSequenceNr(Long.MaxValue)
case ReplayMessagesSuccess replayCompleted() case ReplayMessagesSuccess replayCompleted(receive)
case ReplayedMessage(p) updateLastSequenceNr(p)
case r: Recover // ignore case r: Recover // ignore
case _ internalStash.stash() case _ internalStash.stash()
} }
def replayCompleted(): Unit = { def replayCompleted(receive: Receive): Unit = {
changeState(prepareRestart(cause)) // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
mailbox.enqueueFirst(self, failureMessage) PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload))))
}
}
/**
* Re-receives the replayed message that caused an exception and re-throws that exception.
*/
private def prepareRestart(cause: Throwable) = new State {
override def toString: String = "prepare restart"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(_) throw cause
case _ // ignore
} }
} }

View file

@ -16,9 +16,12 @@ import scala.language.postfixOps
import scala.Some import scala.Some
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.testkit.TestProbe
object PersistentActorFailureSpec { object PersistentActorFailureSpec {
import PersistentActorSpec.Cmd
import PersistentActorSpec.Evt import PersistentActorSpec.Evt
import PersistentActorSpec.ExamplePersistentActor
class FailingInmemJournal extends AsyncWriteProxy { class FailingInmemJournal extends AsyncWriteProxy {
import AsyncWriteProxy.SetStore import AsyncWriteProxy.SetStore
@ -29,6 +32,7 @@ object PersistentActorFailureSpec {
super.preStart() super.preStart()
self ! SetStore(context.actorOf(Props[FailingInmemStore])) self ! SetStore(context.actorOf(Props[FailingInmemStore]))
} }
} }
class FailingInmemStore extends InmemStore { class FailingInmemStore extends InmemStore {
@ -39,8 +43,12 @@ object PersistentActorFailureSpec {
val readFromStore = read(pid, fromSnr, toSnr, max) val readFromStore = read(pid, fromSnr, toSnr, max)
if (readFromStore.length == 0) if (readFromStore.length == 0)
sender() ! ReplaySuccess sender() ! ReplaySuccess
else else if (isCorrupt(readFromStore))
sender() ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr")) sender() ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr"))
else {
readFromStore.foreach(sender() ! _)
sender() ! ReplaySuccess
}
} }
def isWrong(w: WriteMessages): Boolean = def isWrong(w: WriteMessages): Boolean =
@ -49,12 +57,20 @@ object PersistentActorFailureSpec {
case x false case x false
} }
def isCorrupt(events: Seq[PersistentRepr]): Boolean =
events.exists { case PersistentRepr(Evt(s: String), _) s.contains("corrupt") }
override def receive = failingReceive.orElse(super.receive) override def receive = failingReceive.orElse(super.receive)
} }
class Supervisor(testActor: ActorRef) extends Actor { class Supervisor(testActor: ActorRef) extends Actor {
override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e testActor ! e; SupervisorStrategy.Stop case e: ActorKilledException
testActor ! e
SupervisorStrategy.Stop
case e
testActor ! e
SupervisorStrategy.Restart
} }
def receive = { def receive = {
@ -62,6 +78,29 @@ object PersistentActorFailureSpec {
case m sender() ! m case m sender() ! m
} }
} }
class FailingRecovery(name: String, recoveryFailureProbe: Option[ActorRef]) extends ExamplePersistentActor(name) {
def this(name: String) = this(name, None)
override val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) persist(Evt(s"${data}"))(updateState)
}
val failingRecover: Receive = {
case Evt(data) if data == "bad"
throw new RuntimeException("Simulated exception from receiveRecover")
case r @ RecoveryFailure(cause) if recoveryFailureProbe.isDefined
recoveryFailureProbe.foreach { _ ! r }
throw new ActorKilledException(cause.getMessage)
}
override def receiveRecover: Receive = failingRecover orElse super.receiveRecover
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
}
}
} }
class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
@ -69,15 +108,25 @@ class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem"
akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal" akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal"
"""))) with PersistenceSpec with ImplicitSender { """))) with PersistenceSpec with ImplicitSender {
import PersistentActorSpec._
import PersistentActorFailureSpec._ import PersistentActorFailureSpec._
import PersistentActorSpec._
def prepareFailingRecovery(): Unit = {
val persistentActor = namedPersistentActor[FailingRecovery]
persistentActor ! Cmd("a")
persistentActor ! Cmd("b")
persistentActor ! Cmd("bad")
persistentActor ! Cmd("c")
persistentActor ! GetState
expectMsg(List("a", "b", "bad", "c"))
}
"A persistent actor" must { "A persistent actor" must {
"throw ActorKilledException if recovery from persisted events fail" in { "throw ActorKilledException if recovery from persisted events fail" in {
val persistentActor = namedPersistentActor[Behavior1PersistentActor] val persistentActor = namedPersistentActor[Behavior1PersistentActor]
persistentActor ! Cmd("a") persistentActor ! Cmd("corrupt")
persistentActor ! GetState persistentActor ! GetState
expectMsg(List("a-1", "a-2")) expectMsg(List("corrupt-1", "corrupt-2"))
// recover by creating another with same name // recover by creating another with same name
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name)
@ -99,7 +148,27 @@ class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem"
persistentActor ! Cmd("wrong") persistentActor ! Cmd("wrong")
expectMsg("wrong") // reply before persistAsync expectMsg("wrong") // reply before persistAsync
expectMsgType[ActorKilledException] expectMsgType[ActorKilledException]
expectNoMsg(500.millis) }
"throw ActorKilledException if receiveRecover fails" in {
prepareFailingRecovery()
// recover by creating another with same name
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name)
expectMsgType[ActorRef]
expectMsgType[ActorKilledException]
}
"include failing event in RecoveryFailure message" in {
prepareFailingRecovery()
// recover by creating another with same name
val probe = TestProbe()
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref))
expectMsgType[ActorRef]
expectMsgType[ActorKilledException]
val recoveryFailure = probe.expectMsgType[RecoveryFailure]
recoveryFailure.payload should be(Some(Evt("bad")))
recoveryFailure.sequenceNr should be(Some(3L))
} }
} }
} }

View file

@ -42,12 +42,16 @@ object PersistentViewSpec {
case "boom" case "boom"
throw new TestException("boom") throw new TestException("boom")
case RecoveryFailure(cause)
throw cause // restart
case payload if isPersistent && shouldFailOn(payload) case payload if isPersistent && shouldFailOn(payload)
throw new TestException("boom") throw new TestException("boom")
case payload if isPersistent case payload if isPersistent
last = s"replicated-${payload}-${lastSequenceNr}" last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last probe ! last
} }
override def postRestart(reason: Throwable): Unit = { override def postRestart(reason: Throwable): Unit = {