From 9d21207f419d4ef10e4ebb7f11fdbf5eaab055fe Mon Sep 17 00:00:00 2001 From: Tanju Erinmez Date: Mon, 14 Oct 2019 14:49:42 +0200 Subject: [PATCH] lastSequenceNr should reflect the snapshot sequence and not start with 0 when journal is empty (#27496) * lastSequenceNr should reflect the snapshot sequence number and not start at 0 when there was no journal to begin with. * Use LevelDBJournal in Test Case The test case now properly simulates a recovery from a snapshot without a journal. (cherry picked from commit df2a1d8a52e13aa09edd20bcf95ff617cd7acf9c) --- .../scala/akka/persistence/Eventsourced.scala | 3 +- ...SnapshotRecoveryWithEmptyJournalSpec.scala | 110 ++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryWithEmptyJournalSpec.scala diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 41f2a51fec..0b3229a01a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -744,9 +744,10 @@ private[persistence] trait Eventsourced finally context.stop(self) returnRecoveryPermit() } - case RecoverySuccess(highestSeqNr) => + case RecoverySuccess(highestJournalSeqNr) => timeoutCancellable.cancel() onReplaySuccess() // callback for subclass implementation + val highestSeqNr = Math.max(highestJournalSeqNr, lastSequenceNr) sequenceNr = highestSeqNr setLastSequenceNr(highestSeqNr) _recoveryRunning = false diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryWithEmptyJournalSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryWithEmptyJournalSpec.scala new file mode 100644 index 0000000000..42cb166beb --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryWithEmptyJournalSpec.scala @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.persistence + +import java.io.File + +import akka.actor._ +import akka.persistence.serialization.Snapshot +import akka.serialization.{ Serialization, SerializationExtension } +import akka.testkit._ +import org.apache.commons.io.FileUtils + +object SnapshotRecoveryWithEmptyJournalSpec { + val survivingSnapshotPath = "target/survivingSnapshotPath" + + case object TakeSnapshot + + class SaveSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + var state = List.empty[String] + + override def receiveRecover: Receive = { + case payload: String => state = s"${payload}-${lastSequenceNr}" :: state + case SnapshotOffer(_, snapshot: List[_]) => state = snapshot.asInstanceOf[List[String]] + } + + override def receiveCommand: PartialFunction[Any, Unit] = { + case payload: String => + persist(payload) { _ => + state = s"${payload}-${lastSequenceNr}" :: state + } + case TakeSnapshot => saveSnapshot(state) + case SaveSnapshotSuccess(md) => probe ! md.sequenceNr + case GetState => probe ! state.reverse + case o: DeleteMessagesSuccess => probe ! o + } + } + + class LoadSnapshotTestPersistentActor(name: String, _recovery: Recovery, probe: ActorRef) + extends NamedPersistentActor(name) { + override def recovery: Recovery = _recovery + + override def receiveRecover: Receive = { + case payload: String => probe ! s"${payload}-${lastSequenceNr}" + case offer: SnapshotOffer => probe ! offer + case other => probe ! other + } + + override def receiveCommand: PartialFunction[Any, Unit] = { + case "done" => probe ! "done" + case payload: String => + persist(payload) { _ => + probe ! s"${payload}-${lastSequenceNr}" + } + case offer: SnapshotOffer => probe ! offer + case other => probe ! other + } + } +} + +class SnapshotRecoveryWithEmptyJournalSpec + extends PersistenceSpec( + PersistenceSpec.config( + "leveldb", + "SnapshotRecoveryWithEmptyJournalSpec", + extraConfig = Some(s""" + akka.persistence.snapshot-store.local.dir = "${SnapshotRecoveryWithEmptyJournalSpec.survivingSnapshotPath}" + """))) + with ImplicitSender { + + import SnapshotRecoveryWithEmptyJournalSpec._ + + val persistenceId: String = namePrefix + val snapshotsDir: File = new File(survivingSnapshotPath) + + val serializationExtension: Serialization = SerializationExtension(system) + + // Prepare a hand made snapshot file as basis for the recovery start point + private def createSnapshotFile(sequenceNr: Long, ts: Long, data: Any): Unit = { + val snapshotFile = new File(snapshotsDir, s"snapshot-$persistenceId-$sequenceNr-$ts") + FileUtils.writeByteArrayToFile(snapshotFile, serializationExtension.serialize(Snapshot(data)).get) + } + + val givenSnapshotSequenceNr: Long = 4711L + val givenTimestamp: Long = 1000L + + override protected def atStartup(): Unit = { + super.atStartup() + createSnapshotFile(givenSnapshotSequenceNr - 1, givenTimestamp - 1, List("a-1")) + createSnapshotFile(givenSnapshotSequenceNr, givenTimestamp, List("a-1", "b-2")) + } + + "A persistent actor in a system that only has snapshots and no previous journal activity" must { + "recover its state and sequence number starting from the most recent snapshot and use subsequent sequence numbers to persist events to the journal" in { + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, Recovery(), testActor)) + expectMsgPF() { + case SnapshotOffer(SnapshotMetadata(`persistenceId`, `givenSnapshotSequenceNr`, timestamp), state) => + state should ===(List("a-1", "b-2")) + timestamp shouldEqual givenTimestamp + } + expectMsg(RecoveryCompleted) + + val persistentActor1 = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], persistenceId, testActor)) + persistentActor1 ! "c" + persistentActor1 ! TakeSnapshot + expectMsgAllOf(givenSnapshotSequenceNr + 1) + } + } +}