Gracefully handle exception throw in SnapshotOffer, #26699 (#26700)

This commit is contained in:
Arnaud Burlet 2019-05-17 10:47:11 +02:00 committed by Patrik Nordwall
parent 5db334f2b1
commit e6be88d35f
2 changed files with 79 additions and 3 deletions

View file

@ -646,9 +646,16 @@ private[persistence] trait Eventsourced
case SelectedSnapshot(metadata, snapshot) => case SelectedSnapshot(metadata, snapshot) =>
val offer = SnapshotOffer(metadata, snapshot) val offer = SnapshotOffer(metadata, snapshot)
if (recoveryBehavior.isDefinedAt(offer)) { if (recoveryBehavior.isDefinedAt(offer)) {
try {
setLastSequenceNr(metadata.sequenceNr) setLastSequenceNr(metadata.sequenceNr)
// Since we are recovering we can ignore the receive behavior from the stack // Since we are recovering we can ignore the receive behavior from the stack
Eventsourced.super.aroundReceive(recoveryBehavior, offer) Eventsourced.super.aroundReceive(recoveryBehavior, offer)
} catch {
case NonFatal(t) =>
try onRecoveryFailure(t, None)
finally context.stop(self)
returnRecoveryPermit()
}
} else { } else {
unhandled(offer) unhandled(offer)
} }

View file

@ -0,0 +1,69 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence
import akka.actor.{ ActorLogging, ActorRef, Props }
import akka.event.Logging
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
object SnapshotDecodeFailureSpec {
case class Cmd(payload: String)
class SaveSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case Cmd(payload) => persist(payload)(_ => saveSnapshot(payload))
case SaveSnapshotSuccess(md) => probe ! md.sequenceNr
}
def receiveRecover = {
case _ =>
}
}
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef)
extends NamedPersistentActor(name)
with ActorLogging {
def receiveCommand = {
case _ =>
}
def receiveRecover = {
case SnapshotOffer(_, _) => throw new Exception("kanbudong")
case other => probe ! other
}
}
}
class SnapshotDecodeFailureSpec
extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotDecodeFailureSpec"))
with ImplicitSender {
import SnapshotDecodeFailureSpec._
override protected def beforeEach(): Unit = {
super.beforeEach()
val persistentActor = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], name, testActor))
persistentActor ! Cmd("payload")
expectMsg(1)
}
"A persistentActor with a failing snapshot loading" must {
"fail recovery and stop actor when no snapshot could be loaded" in {
system.eventStream.subscribe(testActor, classOf[Logging.Error])
system.eventStream.publish(TestEvent.Mute(EventFilter[java.lang.Exception](start = "kanbudong")))
try {
val lPersistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, testActor))
expectMsgType[Logging.Error].message.toString should startWith(
"Persistence failure when replaying events for persistenceId")
watch(lPersistentActor)
expectTerminated(lPersistentActor)
} finally {
system.eventStream.unsubscribe(testActor, classOf[Logging.Error])
system.eventStream.publish(TestEvent.UnMute(EventFilter.error(start = "kanbudong")))
}
}
}
}