From 72d54626f3f573bffa049d5ea86b923922421b99 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sat, 13 Dec 2014 15:35:12 +0100 Subject: [PATCH] =per #15943 Avoid initite restart loop when recovery fails * also include the failing message and sequenceNr in the RecoveryFailure message * remove the "putting back" the message first in the mailbox --- .../scala/akka/persistence/Eventsourced.scala | 47 +++-------- .../scala/akka/persistence/Persistent.scala | 2 +- .../akka/persistence/PersistentActor.scala | 17 +++- .../akka/persistence/PersistentView.scala | 47 ++++------- .../PersistentActorFailureSpec.scala | 81 +++++++++++++++++-- .../akka/persistence/PersistentViewSpec.scala | 4 + 6 files changed, 123 insertions(+), 75 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index aa2ac03956..8a1ae46ecf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -7,7 +7,6 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable import scala.util.control.NonFatal -import akka.actor.ActorCell import akka.actor.ActorKilledException import akka.actor.ActorLogging import akka.actor.Stash @@ -47,20 +46,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement() - // FIXME useJournalBatching - // I have a feeling that this var can be eliminated, either by just removing the functionality or by - // checking pendingStashingPersistInvocations > 0 in doAroundReceive. - // - //On the first suggestion: when a write is currently pending, how much do we gain in latency - // by submitting the persist writes immediately instead of waiting until the acknowledgement - // comes in? The other thought is that sync and async persistence will rarely be mixed within - //the same Actor, in which case this flag actually does nothing (unless I am missing something). - private var journalBatch = Vector.empty[PersistentEnvelope] private val maxMessageBatchSize = extension.settings.journal.maxMessageBatchSize private var writeInProgress = false private var sequenceNr: Long = 0L - private var _lastSequenceNr: Long = 0L private var currentState: State = recoveryPending @@ -471,7 +460,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * * If processing of a replayed event fails, the exception is caught and - * stored for being thrown later and state is changed to `recoveryFailed`. + * stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`. * * All incoming messages are stashed. */ @@ -487,8 +476,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas Eventsourced.super.aroundReceive(recoveryBehavior, p) } catch { case NonFatal(t) ⇒ - val currentMsg = context.asInstanceOf[ActorCell].currentMessage - changeState(replayFailed(t, currentMsg)) // delay throwing exception to prepareRestart + changeState(replayFailed(recoveryBehavior, t, p)) } case ReplayMessagesSuccess ⇒ onReplaySuccess() // callback for subclass implementation @@ -497,23 +485,23 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case ReplayMessagesFailure(cause) ⇒ onReplayFailure(cause) // callback for subclass implementation // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? - Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)) + Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None)) case other ⇒ internalStash.stash() } } /** - * Consumes remaining replayed messages and then changes to `prepareRestart`. The - * message that caused the exception during replay, is re-added to the mailbox and - * re-received in `prepareRestart` state. + * Consumes remaining replayed messages and then emits RecoveryFailure to the + * `receiveRecover` behavior. */ - private def replayFailed(cause: Throwable, failureMessage: Envelope) = new State { + private def replayFailed(recoveryBehavior: Receive, cause: Throwable, failed: PersistentRepr) = new State { override def toString: String = "replay failed" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { + case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) case ReplayMessagesFailure(_) ⇒ replayCompleted() // journal couldn't tell the maximum stored sequence number, hence the next @@ -521,27 +509,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) case ReplayMessagesSuccess ⇒ replayCompleted() - case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) case r: Recover ⇒ // ignore case _ ⇒ internalStash.stash() } def replayCompleted(): Unit = { - changeState(prepareRestart(cause)) - mailbox.enqueueFirst(self, failureMessage) - } - } - - /** - * Re-receives the replayed message that caused an exception and re-throws that exception. - */ - private def prepareRestart(cause: Throwable) = new State { - override def toString: String = "prepare restart" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(_) ⇒ throw cause - case _ ⇒ // ignore + // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? + Eventsourced.super.aroundReceive(recoveryBehavior, + RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload)))) } } @@ -561,7 +536,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas internalStash.unstashAll() Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) case ReadHighestSequenceNrFailure(cause) ⇒ - Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)) + Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None)) case other ⇒ internalStash.stash() } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index cc58cd0d79..a93c7f5cce 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -21,7 +21,7 @@ import akka.persistence.serialization.Message * * In essence it is either an [[NonPersistentRepr]] or [[PersistentRepr]]. */ -private[persistence] sealed trait PersistentEnvelope { // FIXME PN: Rename to PersistentEnvelope +private[persistence] sealed trait PersistentEnvelope { def payload: Any def sender: ActorRef } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index b5ad2771e7..b2ff043146 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -23,9 +23,24 @@ case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) /** * Sent to a [[PersistentActor]] if a journal fails to replay messages or fetch that persistent actor's * highest sequence number. If not handled, the actor will be stopped. + * + * Contains the [[#sequenceNr]] of the message that could not be replayed, if it + * failed at a specific message. + * + * Contains the [[#payload]] of the message that could not be replayed, if it + * failed at a specific message. */ @SerialVersionUID(1L) -case class RecoveryFailure(cause: Throwable) +case class RecoveryFailure(cause: Throwable)(failingMessage: Option[(Long, Any)]) { + override def toString: String = failingMessage match { + case Some((sequenceNr, payload)) ⇒ s"RecoveryFailure(${cause.getMessage},$sequenceNr,$payload)" + case None ⇒ s"RecoveryFailure(${cause.getMessage})" + } + + def sequenceNr: Option[Long] = failingMessage.map { case (snr, _) ⇒ snr } + + def payload: Option[Any] = failingMessage.map { case (_, payload) ⇒ payload } +} abstract class RecoveryCompleted /** diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index cc9c656de8..0a113c4ae8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -8,7 +8,6 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor.AbstractActor import akka.actor.Actor -import akka.actor.ActorCell import akka.actor.ActorKilledException import akka.actor.Cancellable import akka.actor.Stash @@ -237,7 +236,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` - * message to the actor's `receiveRecover`. Then initiates a message replay, either starting + * message to the actor's `receive`. Then initiates a message replay, either starting * from the loaded snapshot or from scratch, and switches to `replayStarted` state. * All incoming messages are stashed. * @@ -264,7 +263,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory } /** - * Processes replayed messages, if any. The actor's `receiveRecover` is invoked with the replayed + * Processes replayed messages, if any. The actor's `receive` is invoked with the replayed * events. * * If replay succeeds it switches to `initializing` state and requests the highest stored sequence @@ -272,7 +271,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * * If processing of a replayed event fails, the exception is caught and - * stored for being thrown later and state is changed to `recoveryFailed`. + * stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`. * * All incoming messages are stashed. */ @@ -294,15 +293,14 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory PersistentView.super.aroundReceive(receive, p.payload) } catch { case NonFatal(t) ⇒ - val currentMsg = context.asInstanceOf[ActorCell].currentMessage - changeState(replayFailed(t, currentMsg)) // delay throwing exception to prepareRestart + changeState(replayFailed(t, p)) } case ReplayMessagesSuccess ⇒ onReplayComplete(await) case ReplayMessagesFailure(cause) ⇒ onReplayComplete(await) // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? - PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)) + PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(None)) case other ⇒ internalStash.stash() } @@ -312,50 +310,37 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory */ private def onReplayComplete(await: Boolean): Unit = { changeState(idle) - if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax))) + if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, + Update(await = false, autoUpdateReplayMax))) if (await) internalStash.unstashAll() } } /** - * Consumes remaining replayed messages and then changes to `prepareRestart`. The - * message that caused the exception during replay, is re-added to the mailbox and - * re-received in `prepareRestart` state. + * Consumes remaining replayed messages and then emits RecoveryFailure to the + * `receive` behavior. */ - private def replayFailed(cause: Throwable, failureMessage: Envelope) = new State { + private def replayFailed(cause: Throwable, failed: PersistentRepr) = new State { override def toString: String = "replay failed" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { + case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) case ReplayMessagesFailure(_) ⇒ - replayCompleted() + replayCompleted(receive) // journal couldn't tell the maximum stored sequence number, hence the next // replay must be a full replay (up to the highest stored sequence number) // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) - case ReplayMessagesSuccess ⇒ replayCompleted() - case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) + case ReplayMessagesSuccess ⇒ replayCompleted(receive) case r: Recover ⇒ // ignore case _ ⇒ internalStash.stash() } - def replayCompleted(): Unit = { - changeState(prepareRestart(cause)) - mailbox.enqueueFirst(self, failureMessage) - } - } - - /** - * Re-receives the replayed message that caused an exception and re-throws that exception. - */ - private def prepareRestart(cause: Throwable) = new State { - override def toString: String = "prepare restart" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(_) ⇒ throw cause - case _ ⇒ // ignore + def replayCompleted(receive: Receive): Unit = { + // FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped? + PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload)))) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index fb9cf589fb..a65a49b859 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -16,9 +16,12 @@ import scala.language.postfixOps import scala.Some import akka.actor.OneForOneStrategy import scala.util.control.NoStackTrace +import akka.testkit.TestProbe object PersistentActorFailureSpec { + import PersistentActorSpec.Cmd import PersistentActorSpec.Evt + import PersistentActorSpec.ExamplePersistentActor class FailingInmemJournal extends AsyncWriteProxy { import AsyncWriteProxy.SetStore @@ -29,6 +32,7 @@ object PersistentActorFailureSpec { super.preStart() self ! SetStore(context.actorOf(Props[FailingInmemStore])) } + } class FailingInmemStore extends InmemStore { @@ -39,8 +43,12 @@ object PersistentActorFailureSpec { val readFromStore = read(pid, fromSnr, toSnr, max) if (readFromStore.length == 0) sender() ! ReplaySuccess - else + else if (isCorrupt(readFromStore)) sender() ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr")) + else { + readFromStore.foreach(sender() ! _) + sender() ! ReplaySuccess + } } def isWrong(w: WriteMessages): Boolean = @@ -49,12 +57,20 @@ object PersistentActorFailureSpec { case x ⇒ false } + def isCorrupt(events: Seq[PersistentRepr]): Boolean = + events.exists { case PersistentRepr(Evt(s: String), _) ⇒ s.contains("corrupt") } + override def receive = failingReceive.orElse(super.receive) } class Supervisor(testActor: ActorRef) extends Actor { override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { - case e ⇒ testActor ! e; SupervisorStrategy.Stop + case e: ActorKilledException ⇒ + testActor ! e + SupervisorStrategy.Stop + case e ⇒ + testActor ! e + SupervisorStrategy.Restart } def receive = { @@ -62,6 +78,29 @@ object PersistentActorFailureSpec { case m ⇒ sender() ! m } } + + class FailingRecovery(name: String, recoveryFailureProbe: Option[ActorRef]) extends ExamplePersistentActor(name) { + def this(name: String) = this(name, None) + + override val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ persist(Evt(s"${data}"))(updateState) + } + + val failingRecover: Receive = { + case Evt(data) if data == "bad" ⇒ + throw new RuntimeException("Simulated exception from receiveRecover") + + case r @ RecoveryFailure(cause) if recoveryFailureProbe.isDefined ⇒ + recoveryFailureProbe.foreach { _ ! r } + throw new ActorKilledException(cause.getMessage) + } + + override def receiveRecover: Receive = failingRecover orElse super.receiveRecover + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + super.preRestart(reason, message) + } + } } class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( @@ -69,15 +108,25 @@ class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem" akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal" """))) with PersistenceSpec with ImplicitSender { - import PersistentActorSpec._ import PersistentActorFailureSpec._ + import PersistentActorSpec._ + + def prepareFailingRecovery(): Unit = { + val persistentActor = namedPersistentActor[FailingRecovery] + persistentActor ! Cmd("a") + persistentActor ! Cmd("b") + persistentActor ! Cmd("bad") + persistentActor ! Cmd("c") + persistentActor ! GetState + expectMsg(List("a", "b", "bad", "c")) + } "A persistent actor" must { "throw ActorKilledException if recovery from persisted events fail" in { val persistentActor = namedPersistentActor[Behavior1PersistentActor] - persistentActor ! Cmd("a") + persistentActor ! Cmd("corrupt") persistentActor ! GetState - expectMsg(List("a-1", "a-2")) + expectMsg(List("corrupt-1", "corrupt-2")) // recover by creating another with same name system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) @@ -99,7 +148,27 @@ class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem" persistentActor ! Cmd("wrong") expectMsg("wrong") // reply before persistAsync expectMsgType[ActorKilledException] - expectNoMsg(500.millis) + } + "throw ActorKilledException if receiveRecover fails" in { + prepareFailingRecovery() + + // recover by creating another with same name + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name) + expectMsgType[ActorRef] + expectMsgType[ActorKilledException] + } + "include failing event in RecoveryFailure message" in { + prepareFailingRecovery() + + // recover by creating another with same name + val probe = TestProbe() + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref)) + expectMsgType[ActorRef] + expectMsgType[ActorKilledException] + val recoveryFailure = probe.expectMsgType[RecoveryFailure] + recoveryFailure.payload should be(Some(Evt("bad"))) + recoveryFailure.sequenceNr should be(Some(3L)) } } } + diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index c1212782b4..b5d6f95f46 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -42,12 +42,16 @@ object PersistentViewSpec { case "boom" ⇒ throw new TestException("boom") + case RecoveryFailure(cause) ⇒ + throw cause // restart + case payload if isPersistent && shouldFailOn(payload) ⇒ throw new TestException("boom") case payload if isPersistent ⇒ last = s"replicated-${payload}-${lastSequenceNr}" probe ! last + } override def postRestart(reason: Throwable): Unit = {