lastSequenceNr should reflect the snapshot sequence and not start with 0 when journal is empty (#27496)
* lastSequenceNr should reflect the snapshot sequence number and not start at 0 when there was no journal to begin with. * Use LevelDBJournal in Test Case The test case now properly simulates a recovery from a snapshot without a journal. (cherry picked from commit df2a1d8a52e13aa09edd20bcf95ff617cd7acf9c)
This commit is contained in:
parent
78281ba92f
commit
9d21207f41
2 changed files with 112 additions and 1 deletions
|
|
@ -744,9 +744,10 @@ private[persistence] trait Eventsourced
|
|||
finally context.stop(self)
|
||||
returnRecoveryPermit()
|
||||
}
|
||||
case RecoverySuccess(highestSeqNr) =>
|
||||
case RecoverySuccess(highestJournalSeqNr) =>
|
||||
timeoutCancellable.cancel()
|
||||
onReplaySuccess() // callback for subclass implementation
|
||||
val highestSeqNr = Math.max(highestJournalSeqNr, lastSequenceNr)
|
||||
sequenceNr = highestSeqNr
|
||||
setLastSequenceNr(highestSeqNr)
|
||||
_recoveryRunning = false
|
||||
|
|
|
|||
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import java.io.File
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence.serialization.Snapshot
|
||||
import akka.serialization.{ Serialization, SerializationExtension }
|
||||
import akka.testkit._
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
object SnapshotRecoveryWithEmptyJournalSpec {
|
||||
val survivingSnapshotPath = "target/survivingSnapshotPath"
|
||||
|
||||
case object TakeSnapshot
|
||||
|
||||
class SaveSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
|
||||
var state = List.empty[String]
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case payload: String => state = s"${payload}-${lastSequenceNr}" :: state
|
||||
case SnapshotOffer(_, snapshot: List[_]) => state = snapshot.asInstanceOf[List[String]]
|
||||
}
|
||||
|
||||
override def receiveCommand: PartialFunction[Any, Unit] = {
|
||||
case payload: String =>
|
||||
persist(payload) { _ =>
|
||||
state = s"${payload}-${lastSequenceNr}" :: state
|
||||
}
|
||||
case TakeSnapshot => saveSnapshot(state)
|
||||
case SaveSnapshotSuccess(md) => probe ! md.sequenceNr
|
||||
case GetState => probe ! state.reverse
|
||||
case o: DeleteMessagesSuccess => probe ! o
|
||||
}
|
||||
}
|
||||
|
||||
class LoadSnapshotTestPersistentActor(name: String, _recovery: Recovery, probe: ActorRef)
|
||||
extends NamedPersistentActor(name) {
|
||||
override def recovery: Recovery = _recovery
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case payload: String => probe ! s"${payload}-${lastSequenceNr}"
|
||||
case offer: SnapshotOffer => probe ! offer
|
||||
case other => probe ! other
|
||||
}
|
||||
|
||||
override def receiveCommand: PartialFunction[Any, Unit] = {
|
||||
case "done" => probe ! "done"
|
||||
case payload: String =>
|
||||
persist(payload) { _ =>
|
||||
probe ! s"${payload}-${lastSequenceNr}"
|
||||
}
|
||||
case offer: SnapshotOffer => probe ! offer
|
||||
case other => probe ! other
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class SnapshotRecoveryWithEmptyJournalSpec
|
||||
extends PersistenceSpec(
|
||||
PersistenceSpec.config(
|
||||
"leveldb",
|
||||
"SnapshotRecoveryWithEmptyJournalSpec",
|
||||
extraConfig = Some(s"""
|
||||
akka.persistence.snapshot-store.local.dir = "${SnapshotRecoveryWithEmptyJournalSpec.survivingSnapshotPath}"
|
||||
""")))
|
||||
with ImplicitSender {
|
||||
|
||||
import SnapshotRecoveryWithEmptyJournalSpec._
|
||||
|
||||
val persistenceId: String = namePrefix
|
||||
val snapshotsDir: File = new File(survivingSnapshotPath)
|
||||
|
||||
val serializationExtension: Serialization = SerializationExtension(system)
|
||||
|
||||
// Prepare a hand made snapshot file as basis for the recovery start point
|
||||
private def createSnapshotFile(sequenceNr: Long, ts: Long, data: Any): Unit = {
|
||||
val snapshotFile = new File(snapshotsDir, s"snapshot-$persistenceId-$sequenceNr-$ts")
|
||||
FileUtils.writeByteArrayToFile(snapshotFile, serializationExtension.serialize(Snapshot(data)).get)
|
||||
}
|
||||
|
||||
val givenSnapshotSequenceNr: Long = 4711L
|
||||
val givenTimestamp: Long = 1000L
|
||||
|
||||
override protected def atStartup(): Unit = {
|
||||
super.atStartup()
|
||||
createSnapshotFile(givenSnapshotSequenceNr - 1, givenTimestamp - 1, List("a-1"))
|
||||
createSnapshotFile(givenSnapshotSequenceNr, givenTimestamp, List("a-1", "b-2"))
|
||||
}
|
||||
|
||||
"A persistent actor in a system that only has snapshots and no previous journal activity" must {
|
||||
"recover its state and sequence number starting from the most recent snapshot and use subsequent sequence numbers to persist events to the journal" in {
|
||||
system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, Recovery(), testActor))
|
||||
expectMsgPF() {
|
||||
case SnapshotOffer(SnapshotMetadata(`persistenceId`, `givenSnapshotSequenceNr`, timestamp), state) =>
|
||||
state should ===(List("a-1", "b-2"))
|
||||
timestamp shouldEqual givenTimestamp
|
||||
}
|
||||
expectMsg(RecoveryCompleted)
|
||||
|
||||
val persistentActor1 = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], persistenceId, testActor))
|
||||
persistentActor1 ! "c"
|
||||
persistentActor1 ! TakeSnapshot
|
||||
expectMsgAllOf(givenSnapshotSequenceNr + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue