diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala index 3f570e871d..c6042e11df 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala @@ -106,6 +106,20 @@ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) { snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref) senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue)) } + "not delete snapshots with non-matching upper timestamp bounds" in { + val md = metadata(3) + val criteria = SnapshotSelectionCriteria(md.sequenceNr, md.timestamp - 1) + val cmd = DeleteSnapshots(pid, criteria) + val sub = TestProbe() + + subscribe[DeleteSnapshots](sub.ref) + snapshotStore.tell(cmd, senderProbe.ref) + sub.expectMsg(cmd) + senderProbe.expectMsg(DeleteSnapshotsSuccess(criteria)) + + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue)) + } "save and overwrite snapshot with same sequence number" in { val md = metadata(4) snapshotStore.tell(SaveSnapshot(md, s"s-5-modified"), senderProbe.ref) diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 4dc1ff6cf5..59b1eaa2fd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -81,8 +81,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo } private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = { - // pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps - snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector + snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata)).toVector } @scala.annotation.tailrec @@ -156,13 +155,15 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo } } - private final class SnapshotSeqNrFilenameFilter(persistenceId: String, sequenceNr: Long) extends FilenameFilter { - private final def matches(pid: String, snr: String): Boolean = - pid.equals(URLEncoder.encode(persistenceId)) && Try(snr.toLong == sequenceNr).getOrElse(false) + private final class SnapshotSeqNrFilenameFilter(md: SnapshotMetadata) extends FilenameFilter { + private final def matches(pid: String, snr: String, tms: String): Boolean = { + pid.equals(URLEncoder.encode(md.persistenceId)) && + Try(snr.toLong == md.sequenceNr && (md.timestamp == 0L || tms.toLong == md.timestamp)).getOrElse(false) + } def accept(dir: File, name: String): Boolean = name match { - case FilenamePattern(pid, snr, tms) ⇒ matches(pid, snr) + case FilenamePattern(pid, snr, tms) ⇒ matches(pid, snr, tms) case _ ⇒ false } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index 6ed0997a77..e315752949 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -5,7 +5,7 @@ package akka.persistence.fsm import akka.actor._ -import akka.persistence.{PersistentActor, RecoveryCompleted, PersistenceSpec} +import akka.persistence.{ PersistentActor, RecoveryCompleted, PersistenceSpec } import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ import com.typesafe.config.Config @@ -266,13 +266,13 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) expectMsg(ItemAdded(Item("1", "Shirt", 59.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(ItemAdded(Item("2", "Shoes", 89.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(ItemAdded(Item("3", "Coat", 119.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(OrderExecuted) expectMsgType[StateChangeEvent] @@ -431,13 +431,13 @@ object PersistentFSMSpec { def props(persistenceId: String, reportActor: ActorRef) = Props(new WebStoreCustomerFSM(persistenceId, reportActor)) } - + class PersistentEventsStreamer(id: String, client: ActorRef) extends PersistentActor { override val persistenceId: String = id def receiveRecover = { - case RecoveryCompleted ⇒ // do nothing - case persistentEvent ⇒ client ! persistentEvent + case RecoveryCompleted ⇒ // do nothing + case persistentEvent ⇒ client ! persistentEvent } def receiveCommand = {