From 0cd94e2c8c801664d7702f1a8616ceb7cd08db52 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Aug 2015 12:42:25 +0200 Subject: [PATCH] =per #18112 Use timestamp in deleteSnapshots It was actually used for finding the right metadata, but the local store deleted all files with matching seqNr. Note that we use 0L as undefined value for the timestamp when deleting single snapshot (and therefore it makes sense to delete all in that case) --- .../persistence/snapshot/SnapshotStoreSpec.scala | 14 ++++++++++++++ .../snapshot/local/LocalSnapshotStore.scala | 13 +++++++------ .../akka/persistence/fsm/PersistentFSMSpec.scala | 14 +++++++------- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala index 3f570e871d..c6042e11df 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala @@ -106,6 +106,20 @@ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) { snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref) senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue)) } + "not delete snapshots with non-matching upper timestamp bounds" in { + val md = metadata(3) + val criteria = SnapshotSelectionCriteria(md.sequenceNr, md.timestamp - 1) + val cmd = DeleteSnapshots(pid, criteria) + val sub = TestProbe() + + subscribe[DeleteSnapshots](sub.ref) + snapshotStore.tell(cmd, senderProbe.ref) + sub.expectMsg(cmd) + senderProbe.expectMsg(DeleteSnapshotsSuccess(criteria)) + + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue)) + } "save and overwrite snapshot with same sequence number" in { val md = metadata(4) snapshotStore.tell(SaveSnapshot(md, s"s-5-modified"), senderProbe.ref) diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 4dc1ff6cf5..59b1eaa2fd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -81,8 +81,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo } private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = { - // pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps - snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector + snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata)).toVector } @scala.annotation.tailrec @@ -156,13 +155,15 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo } } - private final class SnapshotSeqNrFilenameFilter(persistenceId: String, sequenceNr: Long) extends FilenameFilter { - private final def matches(pid: String, snr: String): Boolean = - pid.equals(URLEncoder.encode(persistenceId)) && Try(snr.toLong == sequenceNr).getOrElse(false) + private final class SnapshotSeqNrFilenameFilter(md: SnapshotMetadata) extends FilenameFilter { + private final def matches(pid: String, snr: String, tms: String): Boolean = { + pid.equals(URLEncoder.encode(md.persistenceId)) && + Try(snr.toLong == md.sequenceNr && (md.timestamp == 0L || tms.toLong == md.timestamp)).getOrElse(false) + } def accept(dir: File, name: String): Boolean = name match { - case FilenamePattern(pid, snr, tms) ⇒ matches(pid, snr) + case FilenamePattern(pid, snr, tms) ⇒ matches(pid, snr, tms) case _ ⇒ false } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index 6ed0997a77..e315752949 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -5,7 +5,7 @@ package akka.persistence.fsm import akka.actor._ -import akka.persistence.{PersistentActor, RecoveryCompleted, PersistenceSpec} +import akka.persistence.{ PersistentActor, RecoveryCompleted, PersistenceSpec } import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ import com.typesafe.config.Config @@ -266,13 +266,13 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) expectMsg(ItemAdded(Item("1", "Shirt", 59.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(ItemAdded(Item("2", "Shoes", 89.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(ItemAdded(Item("3", "Coat", 119.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(OrderExecuted) expectMsgType[StateChangeEvent] @@ -431,13 +431,13 @@ object PersistentFSMSpec { def props(persistenceId: String, reportActor: ActorRef) = Props(new WebStoreCustomerFSM(persistenceId, reportActor)) } - + class PersistentEventsStreamer(id: String, client: ActorRef) extends PersistentActor { override val persistenceId: String = id def receiveRecover = { - case RecoveryCompleted ⇒ // do nothing - case persistentEvent ⇒ client ! persistentEvent + case RecoveryCompleted ⇒ // do nothing + case persistentEvent ⇒ client ! persistentEvent } def receiveCommand = {